feat(pgx): metrics
This commit is contained in:
parent
bf66baf3c5
commit
cfb96e811e
1 changed files with 62 additions and 23 deletions
|
@ -6,11 +6,14 @@ import (
|
||||||
"database/sql/driver"
|
"database/sql/driver"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
pgx "github.com/jackc/pgx/v5/stdlib"
|
pgx "github.com/jackc/pgx/v5/stdlib"
|
||||||
"github.com/qustavo/sqlhooks/v2"
|
"github.com/qustavo/sqlhooks/v2"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/metric"
|
||||||
|
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -19,10 +22,14 @@ const (
|
||||||
TracedPGxDriverName = "pgx-traced"
|
TracedPGxDriverName = "pgx-traced"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type dbCtxStart struct{}
|
||||||
|
|
||||||
type tracedPgxHooks struct {
|
type tracedPgxHooks struct {
|
||||||
t *Telemetry
|
t *Telemetry
|
||||||
printQueries bool
|
printQueries bool
|
||||||
tracer trace.Tracer
|
tracer trace.Tracer
|
||||||
|
|
||||||
|
mDuration metric.Float64Histogram
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *tracedPgxHooks) Before(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
|
func (h *tracedPgxHooks) Before(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
|
||||||
|
@ -31,12 +38,24 @@ func (h *tracedPgxHooks) Before(ctx context.Context, query string, args ...inter
|
||||||
cleanedQuery = strings.ReplaceAll(cleanedQuery, " ", " ")
|
cleanedQuery = strings.ReplaceAll(cleanedQuery, " ", " ")
|
||||||
cleanedQuery = strings.TrimSpace(cleanedQuery)
|
cleanedQuery = strings.TrimSpace(cleanedQuery)
|
||||||
|
|
||||||
s := h.t.StartSpan(ctx, "db.sql.query", query)
|
operation := strings.Split(query, " ")[0]
|
||||||
s.AddAttributes(
|
|
||||||
attribute.String("db.system", "postgres"),
|
attrs := []attribute.KeyValue{
|
||||||
|
semconv.DBSystemPostgreSQL,
|
||||||
|
semconv.DBOperationName(operation),
|
||||||
|
/* Sentry */
|
||||||
attribute.String("db.statement", cleanedQuery),
|
attribute.String("db.statement", cleanedQuery),
|
||||||
attribute.StringSlice("db.params", formatArgs(args)),
|
attribute.StringSlice("db.params", formatArgs(args)),
|
||||||
)
|
|
||||||
|
/* OpenTelemetry */
|
||||||
|
attribute.String("db.query.text", cleanedQuery),
|
||||||
|
}
|
||||||
|
for i, arg := range args {
|
||||||
|
attrs = append(attrs, attribute.String(fmt.Sprintf("db.query.parameter.%d", i), formatArg(arg)))
|
||||||
|
}
|
||||||
|
|
||||||
|
s := h.t.StartSpan(ctx, "db.sql.query", operation, WithOtelTracer(h.tracer))
|
||||||
|
s.AddAttributes(attrs...)
|
||||||
|
|
||||||
if h.printQueries {
|
if h.printQueries {
|
||||||
l := log.Trace()
|
l := log.Trace()
|
||||||
|
@ -48,7 +67,7 @@ func (h *tracedPgxHooks) Before(ctx context.Context, query string, args ...inter
|
||||||
l.Msg(cleanedQuery)
|
l.Msg(cleanedQuery)
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.Context(), nil
|
return context.WithValue(s.Context(), dbCtxStart{}, time.Now()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *tracedPgxHooks) After(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
|
func (h *tracedPgxHooks) After(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
|
||||||
|
@ -56,6 +75,10 @@ func (h *tracedPgxHooks) After(ctx context.Context, query string, args ...interf
|
||||||
s.End()
|
s.End()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if start, ok := ctx.Value(dbCtxStart{}).(time.Time); ok {
|
||||||
|
h.mDuration.Record(ctx, float64(time.Since(start).Milliseconds()))
|
||||||
|
}
|
||||||
|
|
||||||
return ctx, nil
|
return ctx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,11 +90,23 @@ func (t *Telemetry) RegisterTracedPGx(printQueries bool) {
|
||||||
|
|
||||||
func (t *Telemetry) TracedPGx(printQueries bool) driver.Driver {
|
func (t *Telemetry) TracedPGx(printQueries bool) driver.Driver {
|
||||||
tracer := t.tracerProvider.Tracer(pgxClientID, trace.WithInstrumentationVersion(libVersion))
|
tracer := t.tracerProvider.Tracer(pgxClientID, trace.WithInstrumentationVersion(libVersion))
|
||||||
|
meter := t.meterProvider.Meter(pgxClientID, metric.WithInstrumentationVersion(libVersion))
|
||||||
|
|
||||||
|
mDuration, err := meter.Float64Histogram(
|
||||||
|
"db.client.operation.duration",
|
||||||
|
metric.WithDescription("Database query response times"),
|
||||||
|
metric.WithUnit("ms"),
|
||||||
|
metric.WithExplicitBucketBoundaries(0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal().Err(err).Msg("failed to create request duration histogram")
|
||||||
|
}
|
||||||
|
|
||||||
return sqlhooks.Wrap(&pgx.Driver{}, &tracedPgxHooks{
|
return sqlhooks.Wrap(&pgx.Driver{}, &tracedPgxHooks{
|
||||||
printQueries: printQueries,
|
printQueries: printQueries,
|
||||||
t: t,
|
t: t,
|
||||||
tracer: tracer,
|
tracer: tracer,
|
||||||
|
mDuration: mDuration,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,25 +114,29 @@ func formatArgs(args []interface{}) []string {
|
||||||
formattedArgs := make([]string, len(args))
|
formattedArgs := make([]string, len(args))
|
||||||
|
|
||||||
for i, arg := range args {
|
for i, arg := range args {
|
||||||
switch v := arg.(type) {
|
formattedArgs[i] = formatArg(arg)
|
||||||
case int:
|
|
||||||
formattedArgs[i] = fmt.Sprint(v)
|
|
||||||
case int64:
|
|
||||||
formattedArgs[i] = fmt.Sprint(v)
|
|
||||||
case float64:
|
|
||||||
formattedArgs[i] = fmt.Sprint(v)
|
|
||||||
case string:
|
|
||||||
formattedArgs[i] = v
|
|
||||||
case []byte:
|
|
||||||
formattedArgs[i] = string(v)
|
|
||||||
case bool:
|
|
||||||
formattedArgs[i] = fmt.Sprint(v)
|
|
||||||
case fmt.Stringer:
|
|
||||||
formattedArgs[i] = v.String()
|
|
||||||
default:
|
|
||||||
formattedArgs[i] = fmt.Sprintf("%+v", v)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return formattedArgs
|
return formattedArgs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func formatArg(arg interface{}) string {
|
||||||
|
switch v := arg.(type) {
|
||||||
|
case int:
|
||||||
|
return fmt.Sprint(v)
|
||||||
|
case int64:
|
||||||
|
return fmt.Sprint(v)
|
||||||
|
case float64:
|
||||||
|
return fmt.Sprint(v)
|
||||||
|
case string:
|
||||||
|
return v
|
||||||
|
case []byte:
|
||||||
|
return string(v)
|
||||||
|
case bool:
|
||||||
|
return fmt.Sprint(v)
|
||||||
|
case fmt.Stringer:
|
||||||
|
return v.String()
|
||||||
|
default:
|
||||||
|
return fmt.Sprintf("%+v", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue