diff --git a/.bleep b/.bleep index 9ab32a4..f05f155 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -e68f6024370efed50aebc8741171956acabf9c35 \ No newline at end of file +4c6da000f956f3c13473dee0c6302ac0126418dd \ No newline at end of file diff --git a/pingora-core/src/protocols/http/compression/brotli.rs b/pingora-core/src/protocols/http/compression/brotli.rs index 956f87d..89f7b4e 100644 --- a/pingora-core/src/protocols/http/compression/brotli.rs +++ b/pingora-core/src/protocols/http/compression/brotli.rs @@ -42,7 +42,6 @@ impl Decompressor { impl Encode for Decompressor { fn encode(&mut self, input: &[u8], end: bool) -> Result { - // reserve at most 16k const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024; // Brotli compress ratio can be 3.5 to 4.5 const ESTIMATED_COMPRESSION_RATIO: usize = 4; diff --git a/pingora-core/src/protocols/http/compression/gzip.rs b/pingora-core/src/protocols/http/compression/gzip.rs index d64c961..f7f997d 100644 --- a/pingora-core/src/protocols/http/compression/gzip.rs +++ b/pingora-core/src/protocols/http/compression/gzip.rs @@ -12,15 +12,65 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::Encode; +use super::{Encode, COMPRESSION_ERROR}; use bytes::Bytes; -use flate2::write::GzEncoder; -use pingora_error::Result; +use flate2::write::{GzDecoder, GzEncoder}; +use pingora_error::{OrErr, Result}; use std::io::Write; use std::time::{Duration, Instant}; -// TODO: unzip +pub struct Decompressor { + decompress: GzDecoder>, + total_in: usize, + total_out: usize, + duration: Duration, +} + +impl Decompressor { + pub fn new() -> Self { + Decompressor { + decompress: GzDecoder::new(vec![]), + total_in: 0, + total_out: 0, + duration: Duration::new(0, 0), + } + } +} + +impl Encode for Decompressor { + fn encode(&mut self, input: &[u8], end: bool) -> Result { + const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024; + const ESTIMATED_COMPRESSION_RATIO: usize = 3; // estimated 2.5-3x compression + let start = Instant::now(); + self.total_in += input.len(); + // cap the buf size amplification, there is a DoS risk of always allocate + // 3x the memory of the input buffer + let reserve_size = if input.len() < MAX_INIT_COMPRESSED_SIZE_CAP { + input.len() * ESTIMATED_COMPRESSION_RATIO + } else { + input.len() + }; + self.decompress.get_mut().reserve(reserve_size); + self.decompress + .write_all(input) + .or_err(COMPRESSION_ERROR, "while decompress Gzip")?; + // write to vec will never fail, only possible error is that the input data + // was not actually gzip compressed + if end { + self.decompress + .try_finish() + .or_err(COMPRESSION_ERROR, "while decompress Gzip")?; + } + self.total_out += self.decompress.get_ref().len(); + self.duration += start.elapsed(); + Ok(std::mem::take(self.decompress.get_mut()).into()) // into() Bytes will drop excess capacity + } + + fn stat(&self) -> (&'static str, usize, usize, Duration) { + ("de-gzip", self.total_in, self.total_out, self.duration) + } +} pub struct Compressor { // TODO: enum for other compression algorithms @@ -66,6 +116,20 @@ impl Encode for Compressor { } use std::ops::{Deref, DerefMut}; +impl Deref for Decompressor { + type Target = GzDecoder>; + + fn deref(&self) -> &Self::Target { + &self.decompress + } +} + +impl DerefMut for Decompressor { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.decompress + } +} + impl Deref for Compressor { type Target = GzEncoder>; @@ -100,4 +164,21 @@ mod tests_stream { assert!(compressor.get_ref().is_empty()); } + + #[test] + fn gunzip_data() { + let mut decompressor = Decompressor::new(); + + let compressed_bytes = &[ + 0x1f, 0x8b, 0x08, 0, 0, 0, 0, 0, 0, 255, 75, 76, 74, 78, 73, 77, 75, 7, 0, 166, 106, + 42, 49, 7, 0, 0, 0, + ]; + let decompressed = decompressor.encode(compressed_bytes, true).unwrap(); + + assert_eq!(&decompressed[..], b"abcdefg"); + assert_eq!(decompressor.total_in, compressed_bytes.len()); + assert_eq!(decompressor.total_out, decompressed.len()); + + assert!(decompressor.get_ref().is_empty()); + } } diff --git a/pingora-core/src/protocols/http/compression/mod.rs b/pingora-core/src/protocols/http/compression/mod.rs index ad1cd0b..6236a0d 100644 --- a/pingora-core/src/protocols/http/compression/mod.rs +++ b/pingora-core/src/protocols/http/compression/mod.rs @@ -67,10 +67,10 @@ pub struct ResponseCompressionCtx(CtxInner); enum CtxInner { HeaderPhase { - decompress_enable: bool, // Store the preferred list to compare with content-encoding accept_encoding: Vec, encoding_levels: [u32; Algorithm::COUNT], + decompress_enable: [bool; Algorithm::COUNT], }, BodyPhase(Option>), } @@ -81,9 +81,9 @@ impl ResponseCompressionCtx { /// The `decompress_enable` flag will tell the ctx to decompress if needed. pub fn new(compression_level: u32, decompress_enable: bool) -> Self { Self(CtxInner::HeaderPhase { - decompress_enable, accept_encoding: Vec::new(), encoding_levels: [compression_level; Algorithm::COUNT], + decompress_enable: [decompress_enable; Algorithm::COUNT], }) } @@ -93,9 +93,9 @@ impl ResponseCompressionCtx { match &self.0 { CtxInner::HeaderPhase { decompress_enable, - accept_encoding: _, encoding_levels: levels, - } => levels.iter().any(|l| *l != 0) || *decompress_enable, + .. + } => levels.iter().any(|l| *l != 0) || decompress_enable.iter().any(|d| *d), CtxInner::BodyPhase(c) => c.is_some(), } } @@ -104,11 +104,7 @@ impl ResponseCompressionCtx { /// algorithm name, in bytes, out bytes, time took for the compression pub fn get_info(&self) -> Option<(&'static str, usize, usize, Duration)> { match &self.0 { - CtxInner::HeaderPhase { - decompress_enable: _, - accept_encoding: _, - encoding_levels: _, - } => None, + CtxInner::HeaderPhase { .. } => None, CtxInner::BodyPhase(c) => c.as_ref().map(|c| c.stat()), } } @@ -119,9 +115,8 @@ impl ResponseCompressionCtx { pub fn adjust_level(&mut self, new_level: u32) { match &mut self.0 { CtxInner::HeaderPhase { - decompress_enable: _, - accept_encoding: _, encoding_levels: levels, + .. } => { *levels = [new_level; Algorithm::COUNT]; } @@ -135,9 +130,8 @@ impl ResponseCompressionCtx { pub fn adjust_algorithm_level(&mut self, algorithm: Algorithm, new_level: u32) { match &mut self.0 { CtxInner::HeaderPhase { - decompress_enable: _, - accept_encoding: _, encoding_levels: levels, + .. } => { levels[algorithm.index()] = new_level; } @@ -145,17 +139,29 @@ impl ResponseCompressionCtx { } } - /// Adjust the decompression flag. + /// Adjust the decompression flag for all compression algorithms. /// # Panic /// This function will panic if it has already started encoding the response body. pub fn adjust_decompression(&mut self, enabled: bool) { match &mut self.0 { CtxInner::HeaderPhase { - decompress_enable, - accept_encoding: _, - encoding_levels: _, + decompress_enable, .. } => { - *decompress_enable = enabled; + *decompress_enable = [enabled; Algorithm::COUNT]; + } + CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"), + } + } + + /// Adjust the decompression flag for a specific algorithm. + /// # Panic + /// This function will panic if it has already started encoding the response body. + pub fn adjust_algorithm_decompression(&mut self, algorithm: Algorithm, enabled: bool) { + match &mut self.0 { + CtxInner::HeaderPhase { + decompress_enable, .. + } => { + decompress_enable[algorithm.index()] = enabled; } CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"), } @@ -208,7 +214,9 @@ impl ResponseCompressionCtx { let encoder = match action { Action::Noop => None, Action::Compress(algorithm) => algorithm.compressor(levels[algorithm.index()]), - Action::Decompress(algorithm) => algorithm.decompressor(*decompress_enable), + Action::Decompress(algorithm) => { + algorithm.decompressor(decompress_enable[algorithm.index()]) + } }; if encoder.is_some() { adjust_response_header(resp, &action); @@ -317,6 +325,7 @@ impl Algorithm { None } else { match self { + Self::Gzip => Some(Box::new(gzip::Decompressor::new())), Self::Brotli => Some(Box::new(brotli::Decompressor::new())), _ => None, // not implemented }