6249307b75
Fixes #39262 The solution is to call `stream.push(null)` to trigger the `end` event which allows `getRawBody` to run completely. <!-- Thanks for opening a PR! Your contribution is much appreciated. To make sure your PR is handled as smoothly as possible we request that you follow the checklist sections below. Choose the right checklist for the change that you're making: --> ## Bug - [x] Related issues linked using `fixes #number` - [x] Integration tests added - [ ] Errors have a helpful link attached, see `contributing.md` ## Feature - [ ] Implements an existing feature request or RFC. Make sure the feature request has been accepted for implementation before opening a PR. - [ ] Related issues linked using `fixes #number` - [ ] Integration tests added - [ ] Documentation added - [ ] Telemetry added. In case of a feature if it's used or not. - [ ] Errors have a helpful link attached, see `contributing.md` ## Documentation / Examples - [x] Make sure the linting passes by running `pnpm lint` - [x] The "examples guidelines" are followed from [our contributing doc](https://github.com/vercel/next.js/blob/canary/contributing/examples/adding-examples.md) Co-authored-by: JJ Kasper <jj@jjsweb.site>
91 lines
2.1 KiB
TypeScript
91 lines
2.1 KiB
TypeScript
import type { IncomingMessage } from 'http'
|
|
import { PassThrough, Readable } from 'stream'
|
|
|
|
export function requestToBodyStream(
|
|
context: { ReadableStream: typeof ReadableStream },
|
|
KUint8Array: typeof Uint8Array,
|
|
stream: Readable
|
|
) {
|
|
return new context.ReadableStream({
|
|
start(controller) {
|
|
stream.on('data', (chunk) =>
|
|
controller.enqueue(new KUint8Array([...new Uint8Array(chunk)]))
|
|
)
|
|
stream.on('end', () => controller.close())
|
|
stream.on('error', (err) => controller.error(err))
|
|
},
|
|
})
|
|
}
|
|
|
|
function replaceRequestBody<T extends IncomingMessage>(
|
|
base: T,
|
|
stream: Readable
|
|
): T {
|
|
for (const key in stream) {
|
|
let v = stream[key as keyof Readable] as any
|
|
if (typeof v === 'function') {
|
|
v = v.bind(base)
|
|
}
|
|
base[key as keyof T] = v
|
|
}
|
|
|
|
return base
|
|
}
|
|
|
|
export interface ClonableBody {
|
|
finalize(): Promise<void>
|
|
cloneBodyStream(): Readable
|
|
}
|
|
|
|
export function getClonableBody<T extends IncomingMessage>(
|
|
readable: T
|
|
): ClonableBody {
|
|
let buffered: Readable | null = null
|
|
|
|
const endPromise = new Promise<void | { error?: unknown }>(
|
|
(resolve, reject) => {
|
|
readable.on('end', resolve)
|
|
readable.on('error', reject)
|
|
}
|
|
).catch((error) => {
|
|
return { error }
|
|
})
|
|
|
|
return {
|
|
/**
|
|
* Replaces the original request body if necessary.
|
|
* This is done because once we read the body from the original request,
|
|
* we can't read it again.
|
|
*/
|
|
async finalize(): Promise<void> {
|
|
if (buffered) {
|
|
const res = await endPromise
|
|
|
|
if (res && typeof res === 'object' && res.error) {
|
|
throw res.error
|
|
}
|
|
replaceRequestBody(readable, buffered)
|
|
buffered = readable
|
|
}
|
|
},
|
|
/**
|
|
* Clones the body stream
|
|
* to pass into a middleware
|
|
*/
|
|
cloneBodyStream() {
|
|
const input = buffered ?? readable
|
|
const p1 = new PassThrough()
|
|
const p2 = new PassThrough()
|
|
input.on('data', (chunk) => {
|
|
p1.push(chunk)
|
|
p2.push(chunk)
|
|
})
|
|
input.on('end', () => {
|
|
p1.push(null)
|
|
p2.push(null)
|
|
})
|
|
buffered = p2
|
|
return p1
|
|
},
|
|
}
|
|
}
|