Add a write timeout to write body buf and an option to set a minimum send rate

This commit is contained in:
Andrew Hauck 2024-06-13 15:54:44 -07:00 committed by Edward Wang
parent 6e83d51ab1
commit fbf3a95749
7 changed files with 230 additions and 4 deletions

2
.bleep
View file

@ -1 +1 @@
1fe0ed665dfcf6222a4d08f6120172be64d27eb9
19506db280fe5f52641e5c1ffd48e4b62f536f18

View file

@ -89,7 +89,9 @@ impl HttpSession {
/// Set the write timeout for writing header and body.
///
/// The timeout is per write operation, not on the overall time writing the entire request
/// The timeout is per write operation, not on the overall time writing the entire request.
///
/// This is a noop for h2.
pub fn set_write_timeout(&mut self, timeout: Duration) {
match self {
HttpSession::H1(h1) => h1.write_timeout = Some(timeout),

View file

@ -25,6 +25,7 @@ use http::{header::AsHeaderName, HeaderMap};
use log::error;
use pingora_error::Result;
use pingora_http::{RequestHeader, ResponseHeader};
use std::time::Duration;
/// HTTP server session object for both HTTP/1.x and HTTP/2
pub enum Session {
@ -188,6 +189,35 @@ impl Session {
}
}
/// Sets the downstream write timeout. This will trigger if we're unable
/// to write to the stream after `duration`. If a `min_send_rate` is
/// configured then the `min_send_rate` calculated timeout has higher priority.
///
/// This is a noop for h2.
pub fn set_write_timeout(&mut self, timeout: Duration) {
match self {
Self::H1(s) => s.set_write_timeout(timeout),
Self::H2(_) => {}
}
}
/// Sets the minimum downstream send rate in bytes per second. This
/// is used to calculate a write timeout in seconds based on the size
/// of the buffer being written. If a `min_send_rate` is configured it
/// has higher priority over a set `write_timeout`. The minimum send
/// rate must be greater than zero.
///
/// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate`
/// is greater than zero, a send rate of zero is a noop.
///
/// This is a noop for h2.
pub fn set_min_send_rate(&mut self, rate: usize) {
match self {
Self::H1(s) => s.set_min_send_rate(rate),
Self::H2(_) => {}
}
}
/// Return a digest of the request including the method, path and Host header
// TODO: make this use a `Formatter`
pub fn request_summary(&self) -> String {

View file

@ -72,6 +72,8 @@ pub struct HttpSession {
upgraded: bool,
/// Digest to track underlying connection metrics
digest: Box<Digest>,
/// Minimum send rate to the client
min_send_rate: Option<usize>,
}
impl HttpSession {
@ -106,6 +108,7 @@ impl HttpSession {
retry_buffer: None,
upgraded: false,
digest,
min_send_rate: None,
}
}
@ -511,6 +514,18 @@ impl HttpSession {
is_buf_keepalive(self.get_header(header::CONNECTION))
}
// calculate write timeout from min_send_rate if set, otherwise return write_timeout
fn write_timeout(&self, buf_len: usize) -> Option<Duration> {
let Some(min_send_rate) = self.min_send_rate.filter(|r| *r > 0) else {
return self.write_timeout;
};
// min timeout is 1s
let ms = (buf_len.max(min_send_rate) as f64 / min_send_rate as f64) * 1000.0;
// truncates unrealistically large values (we'll be out of memory before this happens)
Some(Duration::from_millis(ms as u64))
}
/// Apply keepalive settings according to the client
/// For HTTP 1.1, assume keepalive as long as there is no `Connection: Close` request header.
/// For HTTP 1.0, only keepalive if there is an explicit header `Connection: keep-alive`.
@ -579,7 +594,7 @@ impl HttpSession {
/// to be written, e.g., writing more bytes than what the `Content-Length` header suggests
pub async fn write_body(&mut self, buf: &[u8]) -> Result<Option<usize>> {
// TODO: check if the response header is written
match self.write_timeout {
match self.write_timeout(buf.len()) {
Some(t) => match timeout(t, self.do_write_body(buf)).await {
Ok(res) => res,
Err(_) => Error::e_explain(WriteTimedout, format!("writing body, timeout: {t:?}")),
@ -588,7 +603,7 @@ impl HttpSession {
}
}
async fn write_body_buf(&mut self) -> Result<Option<usize>> {
async fn do_write_body_buf(&mut self) -> Result<Option<usize>> {
// Don't flush empty chunks, they are considered end of body for chunks
if self.body_write_buf.is_empty() {
return Ok(None);
@ -609,6 +624,16 @@ impl HttpSession {
written
}
async fn write_body_buf(&mut self) -> Result<Option<usize>> {
match self.write_timeout(self.body_write_buf.len()) {
Some(t) => match timeout(t, self.do_write_body_buf()).await {
Ok(res) => res,
Err(_) => Error::e_explain(WriteTimedout, format!("writing body, timeout: {t:?}")),
},
None => self.do_write_body_buf().await,
}
}
fn maybe_force_close_body_reader(&mut self) {
if self.upgraded && !self.body_reader.body_done() {
// response is done, reset the request body to close
@ -778,6 +803,27 @@ impl HttpSession {
}
}
/// Sets the downstream write timeout. This will trigger if we're unable
/// to write to the stream after `duration`. If a `min_send_rate` is
/// configured then the `min_send_rate` calculated timeout has higher priority.
pub fn set_write_timeout(&mut self, timeout: Duration) {
self.write_timeout = Some(timeout);
}
/// Sets the minimum downstream send rate in bytes per second. This
/// is used to calculate a write timeout in seconds based on the size
/// of the buffer being written. If a `min_send_rate` is configured it
/// has higher priority over a set `write_timeout`. The minimum send
/// rate must be greater than zero.
///
/// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate`
/// is greater than zero, a send rate of zero is a noop.
pub fn set_min_send_rate(&mut self, min_send_rate: usize) {
if min_send_rate > 0 {
self.min_send_rate = Some(min_send_rate);
}
}
/// Return the [Digest] of the connection.
pub fn digest(&self) -> &Digest {
&self.digest
@ -1583,6 +1629,30 @@ mod tests_stream {
assert!(written.is_none());
}
#[tokio::test]
#[should_panic(expected = "There is still data left to write.")]
async fn test_write_body_buf_write_timeout() {
let wire1 = b"HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\n";
let wire2 = b"abc";
let mock_io = Builder::new()
.write(wire1)
.wait(Duration::from_millis(500))
.write(wire2)
.build();
let mut http_stream = HttpSession::new(Box::new(mock_io));
http_stream.write_timeout = Some(Duration::from_millis(100));
let mut new_response = ResponseHeader::build(StatusCode::OK, None).unwrap();
new_response.append_header("Content-Length", "3").unwrap();
http_stream.update_resp_headers = false;
http_stream
.write_response_header_ref(&new_response)
.await
.unwrap();
http_stream.body_write_buf = BytesMut::from(&b"abc"[..]);
let res = http_stream.write_body_buf().await;
assert_eq!(res.unwrap_err().etype(), &WriteTimedout);
}
#[tokio::test]
async fn test_write_continue_resp() {
let wire = b"HTTP/1.1 100 Continue\r\n\r\n";
@ -1610,6 +1680,48 @@ mod tests_stream {
response.set_version(http::Version::HTTP_11);
assert!(!is_upgrade_resp(&response));
}
#[test]
fn test_get_write_timeout() {
let mut http_stream = HttpSession::new(Box::new(Builder::new().build()));
let expected = Duration::from_secs(5);
http_stream.set_write_timeout(expected);
assert_eq!(Some(expected), http_stream.write_timeout(50));
}
#[test]
fn test_get_write_timeout_none() {
let http_stream = HttpSession::new(Box::new(Builder::new().build()));
assert!(http_stream.write_timeout(50).is_none());
}
#[test]
fn test_get_write_timeout_min_send_rate_zero_noop() {
let mut http_stream = HttpSession::new(Box::new(Builder::new().build()));
http_stream.set_min_send_rate(0);
assert!(http_stream.write_timeout(50).is_none());
}
#[test]
fn test_get_write_timeout_min_send_rate_overrides_write_timeout() {
let mut http_stream = HttpSession::new(Box::new(Builder::new().build()));
let expected = Duration::from_millis(29800);
http_stream.set_write_timeout(Duration::from_secs(60));
http_stream.set_min_send_rate(5000);
assert_eq!(Some(expected), http_stream.write_timeout(149000));
}
#[test]
fn test_get_write_timeout_min_send_rate_max_zero_buf() {
let mut http_stream = HttpSession::new(Box::new(Builder::new().build()));
let expected = Duration::from_secs(1);
http_stream.set_min_send_rate(1);
assert_eq!(Some(expected), http_stream.write_timeout(0));
}
}
#[cfg(test)]

View file

@ -133,6 +133,60 @@ async fn test_ws_server_ends_conn() {
assert!(ws_stream.next().await.is_none());
}
#[tokio::test]
async fn test_download_timeout() {
init();
use hyper::body::HttpBody;
use tokio::time::sleep;
let client = hyper::Client::new();
let uri: hyper::Uri = "http://127.0.0.1:6147/download/".parse().unwrap();
let req = hyper::Request::builder()
.uri(uri)
.header("x-write-timeout", "1")
.body(hyper::Body::empty())
.unwrap();
let mut res = client.request(req).await.unwrap();
assert_eq!(res.status(), StatusCode::OK);
let mut err = false;
sleep(Duration::from_secs(2)).await;
while let Some(chunk) = res.body_mut().data().await {
if chunk.is_err() {
err = true;
}
}
assert!(err);
}
#[tokio::test]
async fn test_download_timeout_min_rate() {
init();
use hyper::body::HttpBody;
use tokio::time::sleep;
let client = hyper::Client::new();
let uri: hyper::Uri = "http://127.0.0.1:6147/download/".parse().unwrap();
let req = hyper::Request::builder()
.uri(uri)
.header("x-write-timeout", "1")
.header("x-min-rate", "10000")
.body(hyper::Body::empty())
.unwrap();
let mut res = client.request(req).await.unwrap();
assert_eq!(res.status(), StatusCode::OK);
let mut err = false;
sleep(Duration::from_secs(2)).await;
while let Some(chunk) = res.body_mut().data().await {
if chunk.is_err() {
err = true;
}
}
// no error as write timeout is overridden by min rate
assert!(!err);
}
mod test_cache {
use super::*;
use std::str::FromStr;

View file

@ -296,6 +296,15 @@ http {
}
}
location /download/ {
content_by_lua_block {
ngx.req.read_body()
local body = string.rep("A", 4194304)
ngx.header["Content-Length"] = #body
ngx.print(body)
}
}
location /tls_verify {
keepalive_timeout 0;
return 200;

View file

@ -39,6 +39,7 @@ use pingora_proxy::{ProxyHttp, Session};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
pub struct ExampleProxyHttps {}
@ -230,6 +231,17 @@ impl ProxyHttp for ExampleProxyHttp {
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
let req = session.req_header();
let write_timeout = req
.headers
.get("x-write-timeout")
.and_then(|v| v.to_str().ok().and_then(|v| v.parse().ok()));
let min_rate = req
.headers
.get("x-min-rate")
.and_then(|v| v.to_str().ok().and_then(|v| v.parse().ok()));
let downstream_compression = req.headers.get("x-downstream-compression").is_some();
if !downstream_compression {
// enable upstream compression for all requests by default
@ -242,6 +254,13 @@ impl ProxyHttp for ExampleProxyHttp {
.adjust_level(0);
}
if let Some(min_rate) = min_rate {
session.set_min_send_rate(min_rate);
}
if let Some(write_timeout) = write_timeout {
session.set_write_timeout(Duration::from_secs(write_timeout));
}
Ok(false)
}