31d2b720d9
### What? This reimplements our stream cancellation code for a few more cases: 1. Adds support in all stream-returning APIs 2. Fixes cancellation detection in node 16 3. Implements out-of-band detection, so can cancel in the middle of a read It also (finally) adds tests for all the cases I'm aware of. ### Why? To allow disconnecting from an AI service when a client disconnects. $$$ ### How? 1. Reuses a single pipe function in all paths to push data from the dev's `ReadableStream` into our `ServerResponse` 2. Uses `ServerResponse` to detect disconnect, instead of the `IncomingMessage` (request) - The `close` event fire once all incoming body data is read - The request `abort` event will not fire after the incoming body data has been fully read 3. Using `on('close')` on the writable destination allows us to detect close - Checking for `res.destroyed` in the body of the loop meant we had to wait for the `await stream.read()` to complete before we could possibly cancel the stream - - - #52157 (and #51594) had an issue with Node 16, because I was using `res.closed` to detect when the server response was closed by the client disconnecting. But, `closed` wasn't [added](https://github.com/nodejs/node/pull/45672) until [v18.13.0](https://nodejs.org/en/blog/release/v18.13.0#:~:text=%5Bcbd710bbf4%5D%20%2D%20http%3A%20make%20OutgoingMessage%20more%20streamlike%20(Robert%20Nagy)%20%2345672). This fixes it by using `res.destroyed`. Reverts #52277 Relands #52157 Fixes #52809 ---------
31 lines
658 B
TypeScript
31 lines
658 B
TypeScript
import * as stream from 'stream'
|
|
import { Deferred, sleep } from './sleep'
|
|
|
|
export function Readable(write: number) {
|
|
const encoder = new TextEncoder()
|
|
const cleanedUp = new Deferred()
|
|
const aborted = new Deferred()
|
|
let i = 0
|
|
|
|
const readable = {
|
|
finished: Promise.all([cleanedUp.promise, aborted.promise]).then(() => i),
|
|
|
|
abort() {
|
|
aborted.resolve()
|
|
},
|
|
stream: new stream.Readable({
|
|
async read() {
|
|
if (i >= write) {
|
|
return
|
|
}
|
|
|
|
await sleep(100)
|
|
this.push(encoder.encode(String(i++)))
|
|
},
|
|
destroy() {
|
|
cleanedUp.resolve()
|
|
},
|
|
}),
|
|
}
|
|
return readable
|
|
}
|