mirror of
https://github.com/cloudflare/pingora.git
synced 2024-09-20 02:31:35 +02:00
Updating the rate-limiter documentation with a simpler example
This commit is contained in:
parent
ef5ed1af3b
commit
76c4fdacbb
6 changed files with 287 additions and 118 deletions
2
.bleep
2
.bleep
|
@ -1 +1 @@
|
|||
9ec558778457c28b1ffbb1d7b4f4f5ab370b97a5
|
||||
c02dc12f1f7caf9aa90a13112321cf5c3d2f6b6d
|
|
@ -20,7 +20,7 @@ In this guide, we will cover the most used features, operations and settings of
|
|||
* [Examples: take control of the request](modify_filter.md)
|
||||
* [Connection pooling and reuse](pooling.md)
|
||||
* [Handling failures and failover](failover.md)
|
||||
* [RateLimiter quickstart](ratelimiter.md)
|
||||
* [RateLimiter quickstart](rate_limiter.md)
|
||||
|
||||
## Advanced topics (WIP)
|
||||
* [Pingora internals](internals.md)
|
||||
|
|
167
docs/user_guide/rate_limiter.md
Normal file
167
docs/user_guide/rate_limiter.md
Normal file
|
@ -0,0 +1,167 @@
|
|||
# **RateLimiter quickstart**
|
||||
Pingora provides a crate `pingora-limits` which provides a simple and easy to use rate limiter for your application. Below is an example of how you can use [`Rate`](https://docs.rs/pingora-limits/latest/pingora_limits/rate/struct.Rate.html) to create an application that uses multiple limiters to restrict the rate at which requests can be made on a per-app basis (determined by a request header).
|
||||
|
||||
## Steps
|
||||
1. Add the following dependencies to your `Cargo.toml`:
|
||||
```toml
|
||||
async-trait="0.1"
|
||||
pingora = { version = "0.3", features = [ "lb" ] }
|
||||
pingora-limits = "0.3.0"
|
||||
once_cell = "1.19.0"
|
||||
```
|
||||
2. Declare a global rate limiter map to store the rate limiter for each client. In this example, we use `appid`.
|
||||
3. Override the `request_filter` method in the `ProxyHttp` trait to implement rate limiting.
|
||||
1. Retrieve the client appid from header.
|
||||
2. Retrieve the current window requests from the rate limiter map. If there is no rate limiter for the client, create a new one and insert it into the map.
|
||||
3. If the current window requests exceed the limit, return 429 and set RateLimiter associated headers.
|
||||
4. If the request is not rate limited, return `Ok(false)` to continue the request.
|
||||
|
||||
## Example
|
||||
```rust
|
||||
use async_trait::async_trait;
|
||||
use once_cell::sync::Lazy;
|
||||
use pingora::http::ResponseHeader;
|
||||
use pingora::prelude::*;
|
||||
use pingora_limits::rate::Rate;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
fn main() {
|
||||
let mut server = Server::new(Some(Opt::default())).unwrap();
|
||||
server.bootstrap();
|
||||
let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
|
||||
// Set health check
|
||||
let hc = TcpHealthCheck::new();
|
||||
upstreams.set_health_check(hc);
|
||||
upstreams.health_check_frequency = Some(Duration::from_secs(1));
|
||||
// Set background service
|
||||
let background = background_service("health check", upstreams);
|
||||
let upstreams = background.task();
|
||||
// Set load balancer
|
||||
let mut lb = http_proxy_service(&server.configuration, LB(upstreams));
|
||||
lb.add_tcp("0.0.0.0:6188");
|
||||
|
||||
// let rate = Rate
|
||||
server.add_service(background);
|
||||
server.add_service(lb);
|
||||
server.run_forever();
|
||||
}
|
||||
|
||||
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
|
||||
|
||||
impl LB {
|
||||
pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
|
||||
match session
|
||||
.req_header()
|
||||
.headers
|
||||
.get("appid")
|
||||
.map(|v| v.to_str())
|
||||
{
|
||||
None => None,
|
||||
Some(v) => match v {
|
||||
Ok(v) => Some(v.to_string()),
|
||||
Err(_) => None,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rate limiter
|
||||
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
|
||||
|
||||
// max request per second per client
|
||||
static MAX_REQ_PER_SEC: isize = 1;
|
||||
|
||||
#[async_trait]
|
||||
impl ProxyHttp for LB {
|
||||
type CTX = ();
|
||||
|
||||
fn new_ctx(&self) {}
|
||||
|
||||
async fn upstream_peer(
|
||||
&self,
|
||||
_session: &mut Session,
|
||||
_ctx: &mut Self::CTX,
|
||||
) -> Result<Box<HttpPeer>> {
|
||||
let upstream = self.0.select(b"", 256).unwrap();
|
||||
// Set SNI
|
||||
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
|
||||
Ok(peer)
|
||||
}
|
||||
|
||||
async fn upstream_request_filter(
|
||||
&self,
|
||||
_session: &mut Session,
|
||||
upstream_request: &mut RequestHeader,
|
||||
_ctx: &mut Self::CTX,
|
||||
) -> Result<()>
|
||||
where
|
||||
Self::CTX: Send + Sync,
|
||||
{
|
||||
upstream_request
|
||||
.insert_header("Host", "one.one.one.one")
|
||||
.unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
|
||||
where
|
||||
Self::CTX: Send + Sync,
|
||||
{
|
||||
let appid = match self.get_request_appid(session) {
|
||||
None => return Ok(false), // no client appid found, skip rate limiting
|
||||
Some(addr) => addr,
|
||||
};
|
||||
|
||||
// retrieve the current window requests
|
||||
let curr_window_requests = RATE_LIMITER.observe(&appid, 1);
|
||||
if curr_window_requests > MAX_REQ_PER_SEC {
|
||||
// rate limited, return 429
|
||||
let mut header = ResponseHeader::build(429, None).unwrap();
|
||||
header
|
||||
.insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string())
|
||||
.unwrap();
|
||||
header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
|
||||
header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
|
||||
session.set_keepalive(None);
|
||||
session
|
||||
.write_response_header(Box::new(header), true)
|
||||
.await?;
|
||||
return Ok(true);
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Testing
|
||||
To use the example above,
|
||||
|
||||
1. Run your program with `cargo run`.
|
||||
2. Verify the program is working with a few executions of ` curl localhost:6188 -H "appid:1" -v`
|
||||
- The first request should work and any later requests that arrive within 1s of a previous request should fail with:
|
||||
```
|
||||
* Trying 127.0.0.1:6188...
|
||||
* Connected to localhost (127.0.0.1) port 6188 (#0)
|
||||
> GET / HTTP/1.1
|
||||
> Host: localhost:6188
|
||||
> User-Agent: curl/7.88.1
|
||||
> Accept: */*
|
||||
> appid:1
|
||||
>
|
||||
< HTTP/1.1 429 Too Many Requests
|
||||
< X-Rate-Limit-Limit: 1
|
||||
< X-Rate-Limit-Remaining: 0
|
||||
< X-Rate-Limit-Reset: 1
|
||||
< Date: Sun, 14 Jul 2024 20:29:02 GMT
|
||||
< Connection: close
|
||||
<
|
||||
* Closing connection 0
|
||||
```
|
||||
|
||||
## Complete Example
|
||||
You can run the pre-made example code in the [`pingora-proxy` examples folder](https://github.com/cloudflare/pingora/tree/main/pingora-proxy/examples/rate_limiter.rs) with
|
||||
|
||||
```
|
||||
cargo run --example rate_limiter
|
||||
```
|
|
@ -1,116 +0,0 @@
|
|||
# **RateLimiter quickstart**
|
||||
Pingora provides a crate `pingora-limits` which provides a simple and easy to use rate limiter for your application.
|
||||
|
||||
## Steps
|
||||
1. Add the following dependencies to your `Cargo.toml`:
|
||||
```toml
|
||||
pingora-limits = "0.1.0"
|
||||
lazy_static = "1.4.0"
|
||||
```
|
||||
2. Declare a global rate limiter map to store the rate limiter for each client. In this example, we use `appid`.
|
||||
3. Override the `request_filter` method in the `ProxyHttp` trait to implement rate limiting.
|
||||
1. Retrieve the client appid from header.
|
||||
2. Retrieve the current window requests from the rate limiter map. If there is no rate limiter for the client, create a new one and insert it into the map.
|
||||
3. If the current window requests exceed the limit, return 429 and set RateLimiter associated headers.
|
||||
4. If the request is not rate limited, return `Ok(false)` to continue the request.
|
||||
|
||||
## Example
|
||||
```rust
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use async_trait::async_trait;
|
||||
use lazy_static::lazy_static;
|
||||
use pingora::http::ResponseHeader;
|
||||
use pingora::prelude::*;
|
||||
use pingora_limits::rate::Rate;
|
||||
|
||||
fn main() {
|
||||
let mut server = Server::new(Some(Opt::default())).unwrap();
|
||||
server.bootstrap();
|
||||
let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
|
||||
// Set health check
|
||||
let hc = TcpHealthCheck::new();
|
||||
upstreams.set_health_check(hc);
|
||||
upstreams.health_check_frequency = Some(Duration::from_secs(1));
|
||||
// Set background service
|
||||
let background = background_service("health check", upstreams);
|
||||
let upstreams = background.task();
|
||||
// Set load balancer
|
||||
let mut lb = http_proxy_service(&server.configuration, LB(upstreams));
|
||||
lb.add_tcp("0.0.0.0:6188");
|
||||
|
||||
// let rate = Rate
|
||||
server.add_service(background);
|
||||
server.add_service(lb);
|
||||
server.run_forever();
|
||||
}
|
||||
|
||||
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
|
||||
|
||||
impl LB {
|
||||
pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
|
||||
match session.req_header().headers.get("appid").map(|v| v.to_str()) {
|
||||
None => None,
|
||||
Some(v) => match v {
|
||||
Ok(v) => Some(v.to_string()),
|
||||
Err(_) => None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// global limiter
|
||||
lazy_static! {
|
||||
static ref RATE_LIMITER_MAP: Arc<Mutex<HashMap<String, Rate>>> = {
|
||||
Arc::new(Mutex::new(HashMap::new()))
|
||||
};
|
||||
}
|
||||
// max request per second per client
|
||||
static MAX_REQ_PER_SEC: isize = 1;
|
||||
|
||||
#[async_trait]
|
||||
impl ProxyHttp for LB {
|
||||
type CTX = ();
|
||||
|
||||
fn new_ctx(&self) -> Self::CTX {
|
||||
()
|
||||
}
|
||||
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
|
||||
let upstream = self.0
|
||||
.select(b"", 256)
|
||||
.unwrap();
|
||||
// Set SNI
|
||||
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
|
||||
Ok(peer)
|
||||
}
|
||||
async fn upstream_request_filter(&self, _session: &mut Session, upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> Result<()> where Self::CTX: Send + Sync {
|
||||
upstream_request.insert_header("Host", "one.one.one.one").unwrap();
|
||||
Ok(())
|
||||
}
|
||||
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> where Self::CTX: Send + Sync {
|
||||
let appid = match self.get_request_appid(session) {
|
||||
None => return Ok(false), // no client appid found, skip rate limiting
|
||||
Some(addr) => addr
|
||||
};
|
||||
|
||||
// retrieve the current window requests
|
||||
let curr_window_requests = {
|
||||
let mut rate_limiter_map = RATE_LIMITER_MAP.lock().unwrap();
|
||||
let rate_limiter = rate_limiter_map.entry(appid.clone()).insert_or_with(|| Rate::new(Duration::from_secs(1)));
|
||||
rate_limiter.observe(&appid, 1)
|
||||
};
|
||||
if curr_window_requests > MAX_REQ_PER_SEC { // rate limited, return 429
|
||||
let mut header = ResponseHeader::build(429, None).unwrap();
|
||||
header.insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string()).unwrap();
|
||||
header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
|
||||
header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
|
||||
session.set_keepalive(None);
|
||||
session.write_response_header(Box::new(header)).await?;
|
||||
return Ok(true);
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
```
|
|
@ -44,6 +44,7 @@ env_logger = "0.9"
|
|||
hyperlocal = "0.8"
|
||||
hyper = "0.14"
|
||||
tokio-tungstenite = "0.20.1"
|
||||
pingora-limits = { version = "0.3.0", path = "../pingora-limits" }
|
||||
pingora-load-balancing = { version = "0.3.0", path = "../pingora-load-balancing" }
|
||||
prometheus = "0"
|
||||
futures-util = "0.3"
|
||||
|
|
117
pingora-proxy/examples/rate_limiter.rs
Normal file
117
pingora-proxy/examples/rate_limiter.rs
Normal file
|
@ -0,0 +1,117 @@
|
|||
use async_trait::async_trait;
|
||||
use once_cell::sync::Lazy;
|
||||
use pingora_core::prelude::*;
|
||||
use pingora_http::{RequestHeader, ResponseHeader};
|
||||
use pingora_limits::rate::Rate;
|
||||
use pingora_load_balancing::prelude::{RoundRobin, TcpHealthCheck};
|
||||
use pingora_load_balancing::LoadBalancer;
|
||||
use pingora_proxy::{http_proxy_service, ProxyHttp, Session};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
fn main() {
|
||||
let mut server = Server::new(Some(Opt::default())).unwrap();
|
||||
server.bootstrap();
|
||||
let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
|
||||
// Set health check
|
||||
let hc = TcpHealthCheck::new();
|
||||
upstreams.set_health_check(hc);
|
||||
upstreams.health_check_frequency = Some(Duration::from_secs(1));
|
||||
// Set background service
|
||||
let background = background_service("health check", upstreams);
|
||||
let upstreams = background.task();
|
||||
// Set load balancer
|
||||
let mut lb = http_proxy_service(&server.configuration, LB(upstreams));
|
||||
lb.add_tcp("0.0.0.0:6188");
|
||||
|
||||
// let rate = Rate
|
||||
server.add_service(background);
|
||||
server.add_service(lb);
|
||||
server.run_forever();
|
||||
}
|
||||
|
||||
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
|
||||
|
||||
impl LB {
|
||||
pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
|
||||
match session
|
||||
.req_header()
|
||||
.headers
|
||||
.get("appid")
|
||||
.map(|v| v.to_str())
|
||||
{
|
||||
None => None,
|
||||
Some(v) => match v {
|
||||
Ok(v) => Some(v.to_string()),
|
||||
Err(_) => None,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rate limiter
|
||||
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
|
||||
|
||||
// max request per second per client
|
||||
static MAX_REQ_PER_SEC: isize = 1;
|
||||
|
||||
#[async_trait]
|
||||
impl ProxyHttp for LB {
|
||||
type CTX = ();
|
||||
|
||||
fn new_ctx(&self) {}
|
||||
|
||||
async fn upstream_peer(
|
||||
&self,
|
||||
_session: &mut Session,
|
||||
_ctx: &mut Self::CTX,
|
||||
) -> Result<Box<HttpPeer>> {
|
||||
let upstream = self.0.select(b"", 256).unwrap();
|
||||
// Set SNI
|
||||
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
|
||||
Ok(peer)
|
||||
}
|
||||
|
||||
async fn upstream_request_filter(
|
||||
&self,
|
||||
_session: &mut Session,
|
||||
upstream_request: &mut RequestHeader,
|
||||
_ctx: &mut Self::CTX,
|
||||
) -> Result<()>
|
||||
where
|
||||
Self::CTX: Send + Sync,
|
||||
{
|
||||
upstream_request
|
||||
.insert_header("Host", "one.one.one.one")
|
||||
.unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
|
||||
where
|
||||
Self::CTX: Send + Sync,
|
||||
{
|
||||
let appid = match self.get_request_appid(session) {
|
||||
None => return Ok(false), // no client appid found, skip rate limiting
|
||||
Some(addr) => addr,
|
||||
};
|
||||
|
||||
// retrieve the current window requests
|
||||
let curr_window_requests = RATE_LIMITER.observe(&appid, 1);
|
||||
if curr_window_requests > MAX_REQ_PER_SEC {
|
||||
// rate limited, return 429
|
||||
let mut header = ResponseHeader::build(429, None).unwrap();
|
||||
header
|
||||
.insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string())
|
||||
.unwrap();
|
||||
header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
|
||||
header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
|
||||
session.set_keepalive(None);
|
||||
session
|
||||
.write_response_header(Box::new(header), true)
|
||||
.await?;
|
||||
return Ok(true);
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue