2021-08-25 10:47:16 +02:00
|
|
|
import { randomBytes } from 'crypto'
|
|
|
|
import { batcher } from './to-zipkin'
|
|
|
|
import { traceGlobals } from '../shared'
|
|
|
|
import fs from 'fs'
|
|
|
|
import path from 'path'
|
2021-10-05 12:52:45 +02:00
|
|
|
import { PHASE_DEVELOPMENT_SERVER } from '../../shared/lib/constants'
|
2021-08-25 10:47:16 +02:00
|
|
|
|
2021-10-05 12:52:45 +02:00
|
|
|
let writeStream: RotatingWriteStream
|
2021-08-25 10:47:16 +02:00
|
|
|
let traceId: string
|
|
|
|
let batch: ReturnType<typeof batcher> | undefined
|
|
|
|
|
2021-10-05 12:52:45 +02:00
|
|
|
const writeStreamOptions = {
|
|
|
|
flags: 'a',
|
|
|
|
encoding: 'utf8',
|
|
|
|
}
|
|
|
|
class RotatingWriteStream {
|
|
|
|
file: string
|
|
|
|
writeStream!: fs.WriteStream
|
|
|
|
size: number
|
|
|
|
sizeLimit: number
|
|
|
|
isRotating: Promise<void> | undefined
|
|
|
|
constructor(file: string, sizeLimit: number) {
|
|
|
|
this.file = file
|
|
|
|
this.size = 0
|
|
|
|
this.sizeLimit = sizeLimit
|
|
|
|
this.createWriteStream()
|
|
|
|
}
|
|
|
|
private createWriteStream() {
|
|
|
|
this.writeStream = fs.createWriteStream(this.file, writeStreamOptions)
|
|
|
|
}
|
|
|
|
// Recreate the file
|
|
|
|
private rotate(): void {
|
|
|
|
this.end()
|
|
|
|
try {
|
|
|
|
fs.unlinkSync(this.file)
|
|
|
|
} catch (err: any) {
|
|
|
|
// It's fine if the file does not exist yet
|
|
|
|
if (err.code !== 'ENOENT') {
|
|
|
|
throw err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
this.size = 0
|
|
|
|
this.createWriteStream()
|
|
|
|
}
|
|
|
|
async write(data: string): Promise<void> {
|
|
|
|
this.size += data.length
|
|
|
|
|
|
|
|
if (this.size > this.sizeLimit) {
|
|
|
|
this.rotate()
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!this.writeStream.write(data, 'utf8')) {
|
|
|
|
await new Promise<void>((resolve, _reject) => {
|
|
|
|
this.writeStream.once('drain', resolve)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
end(): void {
|
|
|
|
this.writeStream.end('', 'utf8')
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-25 10:47:16 +02:00
|
|
|
const reportToLocalHost = (
|
|
|
|
name: string,
|
|
|
|
duration: number,
|
|
|
|
timestamp: number,
|
|
|
|
id: string,
|
|
|
|
parentId?: string,
|
|
|
|
attrs?: Object
|
|
|
|
) => {
|
|
|
|
const distDir = traceGlobals.get('distDir')
|
2021-10-05 12:52:45 +02:00
|
|
|
const phase = traceGlobals.get('phase')
|
|
|
|
if (!distDir || !phase) {
|
2021-08-25 10:47:16 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!traceId) {
|
|
|
|
traceId = process.env.TRACE_ID || randomBytes(8).toString('hex')
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!batch) {
|
|
|
|
batch = batcher(async (events) => {
|
|
|
|
if (!writeStream) {
|
2021-09-02 13:28:51 +02:00
|
|
|
await fs.promises.mkdir(distDir, { recursive: true })
|
2021-08-25 10:47:16 +02:00
|
|
|
const file = path.join(distDir, 'trace')
|
2021-10-05 12:52:45 +02:00
|
|
|
writeStream = new RotatingWriteStream(
|
|
|
|
file,
|
|
|
|
// Development is limited to 50MB, production is unlimited
|
|
|
|
phase === PHASE_DEVELOPMENT_SERVER ? 52428800 : Infinity
|
|
|
|
)
|
2021-08-25 10:47:16 +02:00
|
|
|
}
|
|
|
|
const eventsJson = JSON.stringify(events)
|
|
|
|
try {
|
2021-10-05 12:52:45 +02:00
|
|
|
await writeStream.write(eventsJson + '\n')
|
2021-08-25 10:47:16 +02:00
|
|
|
} catch (err) {
|
|
|
|
console.log(err)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
batch.report({
|
|
|
|
traceId,
|
|
|
|
parentId,
|
|
|
|
name,
|
|
|
|
id,
|
|
|
|
timestamp,
|
|
|
|
duration,
|
|
|
|
tags: attrs,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
export default {
|
|
|
|
flushAll: () =>
|
|
|
|
batch
|
|
|
|
? batch.flushAll().then(() => {
|
2021-10-05 12:52:45 +02:00
|
|
|
const phase = traceGlobals.get('phase')
|
|
|
|
// Only end writeStream when manually flushing in production
|
|
|
|
if (phase !== PHASE_DEVELOPMENT_SERVER) {
|
|
|
|
writeStream.end()
|
|
|
|
}
|
2021-08-25 10:47:16 +02:00
|
|
|
})
|
|
|
|
: undefined,
|
|
|
|
report: reportToLocalHost,
|
|
|
|
}
|