Allow to create a new connection when the current one is shutting down

When we see a GOAWAY(NO_ERROR), the connector no longer fail the entire request.
Now the connector creates a new connection instead.
This commit is contained in:
Yuchen Wu 2024-06-24 15:09:45 -07:00 committed by Yuchen Wu
parent 38a9d556b5
commit ee7f66082f
2 changed files with 36 additions and 16 deletions

2
.bleep
View file

@ -1 +1 @@
361d88592075f7f98f581b139d0349f1b70190a2
9ec5f295aba1ec889914afb8c3cbb44724a516f1

View file

@ -21,7 +21,7 @@ use crate::upstreams::peer::{Peer, ALPN};
use bytes::Bytes;
use h2::client::SendRequest;
use log::{debug, warn};
use log::debug;
use parking_lot::{Mutex, RwLock};
use pingora_error::{Error, ErrorType::*, OrErr, Result};
use pingora_pool::{ConnectionMeta, ConnectionPool, PoolNode};
@ -52,6 +52,8 @@ pub(crate) struct ConnectionRefInner {
max_streams: usize,
// how many concurrent streams already active
current_streams: AtomicUsize,
// The connection is gracefully shutting down, no more stream is allowed
shutting_down: AtomicBool,
// because `SendRequest` doesn't actually have access to the underlying Stream,
// we log info about timing and tcp info here.
pub(crate) digest: Digest,
@ -78,12 +80,14 @@ impl ConnectionRef {
id,
max_streams,
current_streams: AtomicUsize::new(0),
shutting_down: false.into(),
digest,
release_lock: Arc::new(Mutex::new(())),
}))
}
pub fn more_streams_allowed(&self) -> bool {
self.0.max_streams > self.0.current_streams.load(Ordering::Relaxed)
!self.is_shutting_down()
&& self.0.max_streams > self.0.current_streams.load(Ordering::Relaxed)
}
pub fn is_idle(&self) -> bool {
@ -114,6 +118,12 @@ impl ConnectionRef {
*self.0.closed.borrow()
}
// different from is_closed, existing streams can still be processed but can no longer create
// new stream.
pub fn is_shutting_down(&self) -> bool {
self.0.shutting_down.load(Ordering::Relaxed)
}
// spawn a stream if more stream is allowed, otherwise return Ok(None)
pub async fn spawn_stream(&self) -> Result<Option<Http2Session>> {
// Atomically check if the current_stream is over the limit
@ -124,13 +134,28 @@ impl ConnectionRef {
self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
return Ok(None);
}
let send_req = self.0.connection_stub.new_stream().await.map_err(|e| {
// fail to create the stream, reset the counter
self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
e
})?;
Ok(Some(Http2Session::new(send_req, self.clone())))
match self.0.connection_stub.new_stream().await {
Ok(send_req) => Ok(Some(Http2Session::new(send_req, self.clone()))),
Err(e) => {
// fail to create the stream, reset the counter
self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
// Remote sends GOAWAY(NO_ERROR): graceful shutdown: this connection no longer
// accepts new streams. We can still try to create new connection.
if e.root_cause()
.downcast_ref::<Box<h2::Error>>()
.map(|e| {
e.is_go_away() && e.is_remote() && e.reason() == Some(h2::Reason::NO_ERROR)
})
.unwrap_or(false)
{
self.0.shutting_down.store(true, Ordering::Relaxed);
Ok(None)
} else {
Err(e)
}
}
}
}
}
@ -277,11 +302,6 @@ impl Connector {
.or_else(|| self.idle_pool.get(&reuse_hash));
if let Some(conn) = maybe_conn {
let h2_stream = conn.spawn_stream().await?;
if h2_stream.is_none() {
warn!("connection from the pools should have free stream to allocate, current in use {}, max {}",
conn.0.current_streams.load(Ordering::Relaxed),
conn.0.max_streams);
}
if conn.more_streams_allowed() {
self.in_use_pool.insert(reuse_hash, conn);
}
@ -318,8 +338,8 @@ impl Connector {
// find and remove the conn stored in in_use_pool so that it could be put in the idle pool
// if necessary
let conn = self.in_use_pool.release(reuse_hash, id).unwrap_or(conn);
if conn.is_closed() {
// Already dead h2 connection
if conn.is_closed() || conn.is_shutting_down() {
// should never be put back to the pool
return;
}
if conn.is_idle() {