mirror of
https://github.com/cloudflare/pingora.git
synced 2024-09-20 02:31:35 +02:00
Add un-gzip support and allow decompress by algorithm
This commit is contained in:
parent
c84a43c815
commit
f9530a59c1
4 changed files with 114 additions and 25 deletions
2
.bleep
2
.bleep
|
@ -1 +1 @@
|
|||
e68f6024370efed50aebc8741171956acabf9c35
|
||||
4c6da000f956f3c13473dee0c6302ac0126418dd
|
|
@ -42,7 +42,6 @@ impl Decompressor {
|
|||
|
||||
impl Encode for Decompressor {
|
||||
fn encode(&mut self, input: &[u8], end: bool) -> Result<Bytes> {
|
||||
// reserve at most 16k
|
||||
const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024;
|
||||
// Brotli compress ratio can be 3.5 to 4.5
|
||||
const ESTIMATED_COMPRESSION_RATIO: usize = 4;
|
||||
|
|
|
@ -12,15 +12,65 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use super::Encode;
|
||||
use super::{Encode, COMPRESSION_ERROR};
|
||||
|
||||
use bytes::Bytes;
|
||||
use flate2::write::GzEncoder;
|
||||
use pingora_error::Result;
|
||||
use flate2::write::{GzDecoder, GzEncoder};
|
||||
use pingora_error::{OrErr, Result};
|
||||
use std::io::Write;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
// TODO: unzip
|
||||
pub struct Decompressor {
|
||||
decompress: GzDecoder<Vec<u8>>,
|
||||
total_in: usize,
|
||||
total_out: usize,
|
||||
duration: Duration,
|
||||
}
|
||||
|
||||
impl Decompressor {
|
||||
pub fn new() -> Self {
|
||||
Decompressor {
|
||||
decompress: GzDecoder::new(vec![]),
|
||||
total_in: 0,
|
||||
total_out: 0,
|
||||
duration: Duration::new(0, 0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode for Decompressor {
|
||||
fn encode(&mut self, input: &[u8], end: bool) -> Result<Bytes> {
|
||||
const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024;
|
||||
const ESTIMATED_COMPRESSION_RATIO: usize = 3; // estimated 2.5-3x compression
|
||||
let start = Instant::now();
|
||||
self.total_in += input.len();
|
||||
// cap the buf size amplification, there is a DoS risk of always allocate
|
||||
// 3x the memory of the input buffer
|
||||
let reserve_size = if input.len() < MAX_INIT_COMPRESSED_SIZE_CAP {
|
||||
input.len() * ESTIMATED_COMPRESSION_RATIO
|
||||
} else {
|
||||
input.len()
|
||||
};
|
||||
self.decompress.get_mut().reserve(reserve_size);
|
||||
self.decompress
|
||||
.write_all(input)
|
||||
.or_err(COMPRESSION_ERROR, "while decompress Gzip")?;
|
||||
// write to vec will never fail, only possible error is that the input data
|
||||
// was not actually gzip compressed
|
||||
if end {
|
||||
self.decompress
|
||||
.try_finish()
|
||||
.or_err(COMPRESSION_ERROR, "while decompress Gzip")?;
|
||||
}
|
||||
self.total_out += self.decompress.get_ref().len();
|
||||
self.duration += start.elapsed();
|
||||
Ok(std::mem::take(self.decompress.get_mut()).into()) // into() Bytes will drop excess capacity
|
||||
}
|
||||
|
||||
fn stat(&self) -> (&'static str, usize, usize, Duration) {
|
||||
("de-gzip", self.total_in, self.total_out, self.duration)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Compressor {
|
||||
// TODO: enum for other compression algorithms
|
||||
|
@ -66,6 +116,20 @@ impl Encode for Compressor {
|
|||
}
|
||||
|
||||
use std::ops::{Deref, DerefMut};
|
||||
impl Deref for Decompressor {
|
||||
type Target = GzDecoder<Vec<u8>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.decompress
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Decompressor {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.decompress
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Compressor {
|
||||
type Target = GzEncoder<Vec<u8>>;
|
||||
|
||||
|
@ -100,4 +164,21 @@ mod tests_stream {
|
|||
|
||||
assert!(compressor.get_ref().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gunzip_data() {
|
||||
let mut decompressor = Decompressor::new();
|
||||
|
||||
let compressed_bytes = &[
|
||||
0x1f, 0x8b, 0x08, 0, 0, 0, 0, 0, 0, 255, 75, 76, 74, 78, 73, 77, 75, 7, 0, 166, 106,
|
||||
42, 49, 7, 0, 0, 0,
|
||||
];
|
||||
let decompressed = decompressor.encode(compressed_bytes, true).unwrap();
|
||||
|
||||
assert_eq!(&decompressed[..], b"abcdefg");
|
||||
assert_eq!(decompressor.total_in, compressed_bytes.len());
|
||||
assert_eq!(decompressor.total_out, decompressed.len());
|
||||
|
||||
assert!(decompressor.get_ref().is_empty());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,10 +67,10 @@ pub struct ResponseCompressionCtx(CtxInner);
|
|||
|
||||
enum CtxInner {
|
||||
HeaderPhase {
|
||||
decompress_enable: bool,
|
||||
// Store the preferred list to compare with content-encoding
|
||||
accept_encoding: Vec<Algorithm>,
|
||||
encoding_levels: [u32; Algorithm::COUNT],
|
||||
decompress_enable: [bool; Algorithm::COUNT],
|
||||
},
|
||||
BodyPhase(Option<Box<dyn Encode + Send + Sync>>),
|
||||
}
|
||||
|
@ -81,9 +81,9 @@ impl ResponseCompressionCtx {
|
|||
/// The `decompress_enable` flag will tell the ctx to decompress if needed.
|
||||
pub fn new(compression_level: u32, decompress_enable: bool) -> Self {
|
||||
Self(CtxInner::HeaderPhase {
|
||||
decompress_enable,
|
||||
accept_encoding: Vec::new(),
|
||||
encoding_levels: [compression_level; Algorithm::COUNT],
|
||||
decompress_enable: [decompress_enable; Algorithm::COUNT],
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -93,9 +93,9 @@ impl ResponseCompressionCtx {
|
|||
match &self.0 {
|
||||
CtxInner::HeaderPhase {
|
||||
decompress_enable,
|
||||
accept_encoding: _,
|
||||
encoding_levels: levels,
|
||||
} => levels.iter().any(|l| *l != 0) || *decompress_enable,
|
||||
..
|
||||
} => levels.iter().any(|l| *l != 0) || decompress_enable.iter().any(|d| *d),
|
||||
CtxInner::BodyPhase(c) => c.is_some(),
|
||||
}
|
||||
}
|
||||
|
@ -104,11 +104,7 @@ impl ResponseCompressionCtx {
|
|||
/// algorithm name, in bytes, out bytes, time took for the compression
|
||||
pub fn get_info(&self) -> Option<(&'static str, usize, usize, Duration)> {
|
||||
match &self.0 {
|
||||
CtxInner::HeaderPhase {
|
||||
decompress_enable: _,
|
||||
accept_encoding: _,
|
||||
encoding_levels: _,
|
||||
} => None,
|
||||
CtxInner::HeaderPhase { .. } => None,
|
||||
CtxInner::BodyPhase(c) => c.as_ref().map(|c| c.stat()),
|
||||
}
|
||||
}
|
||||
|
@ -119,9 +115,8 @@ impl ResponseCompressionCtx {
|
|||
pub fn adjust_level(&mut self, new_level: u32) {
|
||||
match &mut self.0 {
|
||||
CtxInner::HeaderPhase {
|
||||
decompress_enable: _,
|
||||
accept_encoding: _,
|
||||
encoding_levels: levels,
|
||||
..
|
||||
} => {
|
||||
*levels = [new_level; Algorithm::COUNT];
|
||||
}
|
||||
|
@ -135,9 +130,8 @@ impl ResponseCompressionCtx {
|
|||
pub fn adjust_algorithm_level(&mut self, algorithm: Algorithm, new_level: u32) {
|
||||
match &mut self.0 {
|
||||
CtxInner::HeaderPhase {
|
||||
decompress_enable: _,
|
||||
accept_encoding: _,
|
||||
encoding_levels: levels,
|
||||
..
|
||||
} => {
|
||||
levels[algorithm.index()] = new_level;
|
||||
}
|
||||
|
@ -145,17 +139,29 @@ impl ResponseCompressionCtx {
|
|||
}
|
||||
}
|
||||
|
||||
/// Adjust the decompression flag.
|
||||
/// Adjust the decompression flag for all compression algorithms.
|
||||
/// # Panic
|
||||
/// This function will panic if it has already started encoding the response body.
|
||||
pub fn adjust_decompression(&mut self, enabled: bool) {
|
||||
match &mut self.0 {
|
||||
CtxInner::HeaderPhase {
|
||||
decompress_enable,
|
||||
accept_encoding: _,
|
||||
encoding_levels: _,
|
||||
decompress_enable, ..
|
||||
} => {
|
||||
*decompress_enable = enabled;
|
||||
*decompress_enable = [enabled; Algorithm::COUNT];
|
||||
}
|
||||
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Adjust the decompression flag for a specific algorithm.
|
||||
/// # Panic
|
||||
/// This function will panic if it has already started encoding the response body.
|
||||
pub fn adjust_algorithm_decompression(&mut self, algorithm: Algorithm, enabled: bool) {
|
||||
match &mut self.0 {
|
||||
CtxInner::HeaderPhase {
|
||||
decompress_enable, ..
|
||||
} => {
|
||||
decompress_enable[algorithm.index()] = enabled;
|
||||
}
|
||||
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
|
||||
}
|
||||
|
@ -208,7 +214,9 @@ impl ResponseCompressionCtx {
|
|||
let encoder = match action {
|
||||
Action::Noop => None,
|
||||
Action::Compress(algorithm) => algorithm.compressor(levels[algorithm.index()]),
|
||||
Action::Decompress(algorithm) => algorithm.decompressor(*decompress_enable),
|
||||
Action::Decompress(algorithm) => {
|
||||
algorithm.decompressor(decompress_enable[algorithm.index()])
|
||||
}
|
||||
};
|
||||
if encoder.is_some() {
|
||||
adjust_response_header(resp, &action);
|
||||
|
@ -317,6 +325,7 @@ impl Algorithm {
|
|||
None
|
||||
} else {
|
||||
match self {
|
||||
Self::Gzip => Some(Box::new(gzip::Decompressor::new())),
|
||||
Self::Brotli => Some(Box::new(brotli::Decompressor::new())),
|
||||
_ => None, // not implemented
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue