Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,9 +758,10 @@ func (s *crdbSpan) getVerboseRecording(includeDetachedChildren bool, finishing b
result.StructuredRecordsSizeBytes -= result.Root.StructuredRecordsSizeBytes
result.Root = s.getRecordingNoChildrenLocked(tracingpb.RecordingVerbose, finishing)
result.StructuredRecordsSizeBytes += result.Root.StructuredRecordsSizeBytes
maxStructuredBytes := s.tracer.MaxStructuredBytesPerTrace()
for i := range oldEvents {
size := int64(oldEvents[i].Size())
if result.StructuredRecordsSizeBytes+size <= maxStructuredBytesPerTrace {
if result.StructuredRecordsSizeBytes+size <= maxStructuredBytes {
result.Root.AddStructuredRecord(oldEvents[i])
result.StructuredRecordsSizeBytes += size
}
Expand Down Expand Up @@ -793,7 +794,7 @@ func (s *crdbSpan) getVerboseRecording(includeDetachedChildren bool, finishing b
rollupChildrenMetadata(childrenMetadata, openChildRecording.Root.ChildrenMetadata)
}
}
result.addChildren(openRecordings, maxRecordedSpansPerTrace, maxStructuredBytesPerTrace)
result.addChildren(openRecordings, maxRecordedSpansPerTrace, maxStructuredBytes)
}()

// Copy over the OperationMetadata collected from s' children into the root of
Expand Down Expand Up @@ -881,17 +882,18 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRec Trace) {
// processors run in spans that FollowFrom an RPC Span that we don't
// collect.
childRec.Root.ParentSpanID = s.spanID
s.mu.recording.finishedChildren.addChildren([]Trace{childRec}, maxRecordedSpansPerTrace, maxStructuredBytesPerTrace)
s.mu.recording.finishedChildren.addChildren([]Trace{childRec}, maxRecordedSpansPerTrace, s.tracer.MaxStructuredBytesPerTrace())
case tracingpb.RecordingStructured:
fc := &s.mu.recording.finishedChildren
num := len(fc.Root.StructuredRecords)
fc.Root.StructuredRecords = childRec.appendStructuredEventsRecursively(fc.Root.StructuredRecords)
// Account for the size of the structured records that were appended,
// breaking out of the loop if we hit the byte limit. This incorporates
// the byte size accounting logic from RecordedSpan.AddStructuredRecord.
maxStructuredBytes := s.tracer.MaxStructuredBytesPerTrace()
for ; num < len(fc.Root.StructuredRecords); num++ {
size := int64(fc.Root.StructuredRecords[num].MemorySize())
if fc.StructuredRecordsSizeBytes+size > maxStructuredBytesPerTrace {
if fc.StructuredRecordsSizeBytes+size > maxStructuredBytes {
break
}
fc.Root.StructuredRecordsSizeBytes += size
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (sp *Span) reset(
goroutineID: goroutineID,
recording: recordingState{
logs: makeSizeLimitedBuffer[*tracingpb.LogRecord](maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer[*tracingpb.StructuredRecord](maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
structured: makeSizeLimitedBuffer[*tracingpb.StructuredRecord](sp.i.tracer.MaxStructuredBytesPerSpan(), h.structuredEventsAlloc[:]),
childrenMetadata: h.childrenMetadataAlloc,
finishedChildren: MakeTrace(tracingpb.RecordedSpan{}),
},
Expand Down
16 changes: 8 additions & 8 deletions pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func TestSpanRecordStructuredLimit(t *testing.T) {
Payload: anyPayload,
}

numStructuredRecordings := maxStructuredBytesPerSpan / structuredRecord.MemorySize()
numStructuredRecordings := defaultMaxStructuredBytesPerSpan / structuredRecord.MemorySize()
const extra = 10
for i := 1; i <= numStructuredRecordings+extra; i++ {
sp.RecordStructured(payload(i))
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestRecordingMaxSpans(t *testing.T) {
}
require.Equal(t, numChildren, numStructuredEvents)
// Same when requesting a Structured recording. Except the exact number of
// events is limited by maxStructuredBytesPerSpan.
// events is limited by defaultMaxStructuredBytesPerSpan.
require.NotEmpty(t, sp.GetRecording(tracingpb.RecordingStructured)[0].StructuredRecords)
}

Expand Down Expand Up @@ -760,9 +760,9 @@ func TestRecordingStructuredLogLimit(t *testing.T) {
tr := NewTracer()

// Create one big message. Only one of these fits in a span according to the
// maxStructuredBytesPerSpan limit.
// defaultMaxStructuredBytesPerSpan limit.
var sb strings.Builder
for i := 0; i < maxStructuredBytesPerSpan*7/8; i++ {
for i := 0; i < defaultMaxStructuredBytesPerSpan*7/8; i++ {
sb.WriteRune('c')
}
s := sb.String()
Expand All @@ -778,7 +778,7 @@ func TestRecordingStructuredLogLimit(t *testing.T) {
// maxStructuredBytesPerTrace allows.
// We'll check that some these messages are dropped when collecting the
// recording.
numMessages := 2 * maxStructuredBytesPerTrace / msgSize
numMessages := 2 * defaultMaxStructuredBytesPerTrace / msgSize
for i := 0; i < int(numMessages); i++ {
c := tr.StartSpan(fmt.Sprintf("child %d", i), WithParent(sp))
c.RecordStructured(msg)
Expand All @@ -792,7 +792,7 @@ func TestRecordingStructuredLogLimit(t *testing.T) {
trace := sp.i.crdb.GetRecording(tracingpb.RecordingVerbose, false /* finishing */)
// Check that we have two events in the trace (one per span).
require.Less(t, len(trace.appendStructuredEventsRecursively(nil /* buffer */)), int(numMessages))
require.LessOrEqual(t, trace.StructuredRecordsSizeBytes, int64(maxStructuredBytesPerTrace))
require.LessOrEqual(t, trace.StructuredRecordsSizeBytes, int64(defaultMaxStructuredBytesPerTrace))
})
}
}
Expand Down Expand Up @@ -1180,8 +1180,8 @@ func TestStructuredRecordingSizeLimit(t *testing.T) {
require.NoError(t, err)
record := &tracingpb.StructuredRecord{Payload: p}
eventSize := record.MemorySize()
maxEventsPerSpan := maxStructuredBytesPerSpan / eventSize
maxEventPerTrace := maxStructuredBytesPerTrace / eventSize
maxEventsPerSpan := defaultMaxStructuredBytesPerSpan / eventSize
maxEventPerTrace := defaultMaxStructuredBytesPerTrace / eventSize
// Try to record more events than the per-span limit allows, and check that
// some get dropped.
for i := 0; i < maxEventsPerSpan+10; i++ {
Expand Down
71 changes: 63 additions & 8 deletions pkg/util/tracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,22 @@ const (
maxRecordedSpansPerTrace = 1000
// maxRecordedBytesPerSpan limits the size of unstructured logs in a span.
maxLogBytesPerSpan = 256 * (1 << 10) // 256 KiB
// maxStructuredBytesPerSpan limits the size of structured logs in a span.
// defaultMaxStructuredBytesPerSpan limits the size of structured logs in a span.
// This limit applies to records directly logged into the span; it does not
// apply to records in child span (including structured records copied from
// the child into the parent when the child is dropped because of the number
// of spans limit).
// See also maxStructuredBytesPerTrace.
maxStructuredBytesPerSpan = 10 * (1 << 10) // 10 KiB
// maxStructuredBytesPerTrace limits the total size of structured logs in a
defaultMaxStructuredBytesPerSpan = 10 * (1 << 10) // 10 KiB
// defaultMaxStructuredBytesPerTrace limits the total size of structured logs in a
// trace recording, across all spans. This limit is enforced at the time when
// a span is finished and its recording is copied to the parent, and at the
// time when an open span's recording is collected - which calls into all its
// open children. Thus, if there are multiple open spans that are part of the
// same trace, each one of them can temporarily have up to
// maxStructuredBytesPerTrace worth of messages under it. Each open span is
// also subject to the maxStructuredBytesPerSpan limit.
maxStructuredBytesPerTrace = 1 << 20 // 1 MiB
// defaultMaxStructuredBytesPerTrace worth of messages under it. Each open span is
// also subject to the defaultMaxStructuredBytesPerSpan limit.
defaultMaxStructuredBytesPerTrace = 1 << 20 // 1 MiB

// maxSpanRegistrySize limits the number of local root spans tracked in
// a Tracer's registry.
Expand Down Expand Up @@ -189,6 +189,18 @@ var periodicSnapshotInterval = settings.RegisterDurationSetting(
0,
settings.WithPublic)

var structuredBytesLimit = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"trace.structured_bytes_per_trace.max",
"maximum size of structured log entries per trace recording",
defaultMaxStructuredBytesPerTrace)

var structuredBytesPerSpanLimit = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"trace.structured_bytes_per_span.max",
"maximum size of structured log entries per span",
defaultMaxStructuredBytesPerSpan)

// panicOnUseAfterFinish, if set, causes use of a span after Finish() to panic
// if detected.
var panicOnUseAfterFinish = buildutil.CrdbTestBuild ||
Expand Down Expand Up @@ -368,6 +380,16 @@ type Tracer struct {

testing TracerTestingKnobs

// maxStructuredBytesPerTrace limits the total size of structured logs in a
// trace recording. This value is configurable via the
// trace.structured_bytes_per_trace.max cluster setting.
_maxStructuredBytesPerTrace int64 // accessed atomically

// maxStructuredBytesPerSpan limits the size of structured logs in each span.
// This value is configurable via the trace.structured_bytes_per_span.max
// cluster setting.
_maxStructuredBytesPerSpan int64 // accessed atomically

// stack is populated in NewTracer and is printed in assertions related to
// mixing tracers.
stack debugutil.SafeStack
Expand Down Expand Up @@ -588,6 +610,30 @@ func (t *Tracer) SetActiveSpansRegistryEnabled(to bool) {
atomic.StoreInt32(&t._activeSpansRegistryEnabled, n)
}

// SetMaxStructuredBytesPerTrace sets the maximum size of structured logs per
// trace recording.
func (t *Tracer) SetMaxStructuredBytesPerTrace(limit int64) {
atomic.StoreInt64(&t._maxStructuredBytesPerTrace, limit)
}

// MaxStructuredBytesPerTrace returns the maximum size of structured logs per
// trace recording.
func (t *Tracer) MaxStructuredBytesPerTrace() int64 {
return atomic.LoadInt64(&t._maxStructuredBytesPerTrace)
}

// SetMaxStructuredBytesPerSpan sets the maximum size of structured logs per
// span.
func (t *Tracer) SetMaxStructuredBytesPerSpan(limit int64) {
atomic.StoreInt64(&t._maxStructuredBytesPerSpan, limit)
}

// MaxStructuredBytesPerSpan returns the maximum size of structured logs per
// span.
func (t *Tracer) MaxStructuredBytesPerSpan() int64 {
return atomic.LoadInt64(&t._maxStructuredBytesPerSpan)
}

// ActiveSpansRegistryEnabled returns true if this tracer is configured
// to register spans with the activeSpansRegistry
func (t *Tracer) ActiveSpansRegistryEnabled() bool {
Expand All @@ -606,8 +652,10 @@ func NewTracer() *Tracer {
}

t := &Tracer{
stack: debugutil.Stack(),
activeSpansRegistry: makeSpanRegistry(),
stack: debugutil.Stack(),
activeSpansRegistry: makeSpanRegistry(),
_maxStructuredBytesPerTrace: defaultMaxStructuredBytesPerTrace,
_maxStructuredBytesPerSpan: defaultMaxStructuredBytesPerSpan,
// These might be overridden in NewTracerWithOpt.
panicOnUseAfterFinish: panicOnUseAfterFinish,
debugUseAfterFinish: debugUseAfterFinish,
Expand Down Expand Up @@ -785,6 +833,11 @@ func (t *Tracer) configure(ctx context.Context, sv *settings.Values, tracingDefa
otlpCollectorAddr := openTelemetryCollector.Get(sv)
zipkinAddr := ZipkinCollector.Get(sv)
enableRedactable := enableTraceRedactable.Get(sv)
structuredBytesLimitVal := structuredBytesLimit.Get(sv)
structuredBytesPerSpanLimitVal := structuredBytesPerSpanLimit.Get(sv)

t.SetMaxStructuredBytesPerTrace(structuredBytesLimitVal)
t.SetMaxStructuredBytesPerSpan(structuredBytesPerSpanLimitVal)

switch tracingDefault {
case TracingModeFromEnv:
Expand Down Expand Up @@ -872,6 +925,8 @@ func (t *Tracer) configure(ctx context.Context, sv *settings.Values, tracingDefa
openTelemetryCollector.SetOnChange(sv, reconfigure)
ZipkinCollector.SetOnChange(sv, reconfigure)
enableTraceRedactable.SetOnChange(sv, reconfigure)
structuredBytesLimit.SetOnChange(sv, reconfigure)
structuredBytesPerSpanLimit.SetOnChange(sv, reconfigure)
}

func createOTLPSpanProcessor(
Expand Down
Loading