From 7465166de73bee8d4e5081a9ab426e9fa87ed474 Mon Sep 17 00:00:00 2001 From: Julien Neuhart Date: Tue, 2 Jun 2026 19:28:43 +0200 Subject: [PATCH] feat(gotenberg): emit queue-wait and launch sub-spans in supervisor Run --- pkg/gotenberg/supervisor.go | 58 +++++++++++++++++++++++++++----- pkg/gotenberg/supervisor_test.go | 42 +++++++++++++++++++++++ 2 files changed, 91 insertions(+), 9 deletions(-) diff --git a/pkg/gotenberg/supervisor.go b/pkg/gotenberg/supervisor.go index 2e8ce8a..5700661 100644 --- a/pkg/gotenberg/supervisor.go +++ b/pkg/gotenberg/supervisor.go @@ -8,6 +8,10 @@ import ( "sync" "sync/atomic" "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) // ErrProcessAlreadyRestarting happens if the [ProcessSupervisor] is trying @@ -279,6 +283,20 @@ func (s *processSupervisor) recentlyHealthy() bool { } func (s *processSupervisor) Run(ctx context.Context, logger *slog.Logger, task func() error) error { + // Time spent before the task body runs: queueing, slot acquisition, lazy + // launch, and health checks. Ended once, when the task is about to execute. + _, queueSpan := Tracer().Start(ctx, s.engine+".queue.wait", + trace.WithSpanKind(trace.SpanKindInternal), + ) + queueWaitDone := false + endQueueWait := func() { + if !queueWaitDone { + queueWaitDone = true + queueSpan.End() + } + } + defer endQueueWait() + // Atomically check and increment the queue size to avoid the TOCTOU race // originally reported in https://github.com/gotenberg/gotenberg/issues/951. for { @@ -327,6 +345,7 @@ func (s *processSupervisor) Run(ctx context.Context, logger *slog.Logger, task f return err } + endQueueWait() err := s.runWithDeadline(ctx, task) if s.maybeRestartAfterTask(logger) { @@ -465,8 +484,8 @@ func (s *processSupervisor) ensureStarted(ctx context.Context) error { return nil } - err := s.runWithDeadline(ctx, func() error { - return s.Launch() + err := s.tracedLaunch(ctx, "first_start", func() error { + return s.runWithDeadline(ctx, s.Launch) }) if err != nil { return fmt.Errorf("process first start: %w", err) @@ -475,6 +494,27 @@ func (s *processSupervisor) ensureStarted(ctx context.Context) error { return nil } +// tracedLaunch wraps a process (re)start in an .process.start span, +// tagged with the reason that triggered it. The eager restart after the maximum +// request limit runs on a background context, so its span is a detached root. +func (s *processSupervisor) tracedLaunch(ctx context.Context, reason string, launch func() error) error { + _, span := Tracer().Start(ctx, s.engine+".process.start", + trace.WithSpanKind(trace.SpanKindInternal), + trace.WithAttributes(attribute.String("gotenberg.process.start.reason", reason)), + ) + defer span.End() + + err := launch() + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + span.SetStatus(codes.Ok, "") + return nil +} + // ensureHealthy checks the underlying process health and triggers a // synchronous restart if the process is unhealthy. Skips the check if a // restart is already in progress. @@ -485,7 +525,7 @@ func (s *processSupervisor) ensureHealthy(ctx context.Context) error { s.logger.DebugContext(context.Background(), "process is unhealthy, cannot handle task, restarting...") - if err := s.doRestart(ctx); err != nil { + if err := s.doRestart(ctx, "unhealthy"); err != nil { return fmt.Errorf("process restart before task: %w", err) } @@ -508,7 +548,7 @@ func (s *processSupervisor) maybeRestartAfterTask(logger *slog.Logger) bool { s.logger.DebugContext(context.Background(), "max request limit reached, restarting eagerly...") go func() { - restartErr := s.doRestartLocked(context.Background()) + restartErr := s.doRestartLocked(context.Background(), "max_requests") s.restartMutex.Unlock() if restartErr != nil { s.logger.ErrorContext(context.Background(), fmt.Sprintf("process restart after task: %v", restartErr)) @@ -522,15 +562,15 @@ func (s *processSupervisor) maybeRestartAfterTask(logger *slog.Logger) bool { // doRestart coordinates a process restart, draining all active concurrent // tasks before stopping and restarting the process. -func (s *processSupervisor) doRestart(ctx context.Context) error { +func (s *processSupervisor) doRestart(ctx context.Context, reason string) error { s.restartMutex.Lock() defer s.restartMutex.Unlock() - return s.doRestartLocked(ctx) + return s.doRestartLocked(ctx, reason) } // doRestartLocked performs the restart drain logic. The caller must hold restartMutex. -func (s *processSupervisor) doRestartLocked(ctx context.Context) error { +func (s *processSupervisor) doRestartLocked(ctx context.Context, reason string) error { s.isRestarting.Store(true) defer s.isRestarting.Store(false) @@ -550,8 +590,8 @@ func (s *processSupervisor) doRestartLocked(ctx context.Context) error { } } - err := s.runWithDeadline(ctx, func() error { - return s.restart() + err := s.tracedLaunch(ctx, reason, func() error { + return s.runWithDeadline(ctx, s.restart) }) for range acquired { diff --git a/pkg/gotenberg/supervisor_test.go b/pkg/gotenberg/supervisor_test.go index 7dca670..f9bdfa0 100644 --- a/pkg/gotenberg/supervisor_test.go +++ b/pkg/gotenberg/supervisor_test.go @@ -1116,3 +1116,45 @@ func TestProcessSupervisor_ConversionsSinceRestart(t *testing.T) { t.Errorf("expected 0 conversions after restart, got %d", got) } } + +func TestProcessSupervisor_RunEmitsSubSpans(t *testing.T) { + recorder := newTestSpanRecorder(t) + + process := &ProcessMock{ + StartMock: func(*slog.Logger) error { return nil }, + StopMock: func(*slog.Logger) error { return nil }, + HealthyMock: func(*slog.Logger) bool { return true }, + } + s := NewProcessSupervisor(slog.New(slog.DiscardHandler), "chromium", process, 0, 0, 1, 0) + + err := s.Run(context.Background(), slog.New(slog.DiscardHandler), func() error { return nil }) + if err != nil { + t.Fatalf("run: %v", err) + } + + var queueWaits, processStarts int + var startReason string + for _, span := range recorder.Ended() { + switch span.Name() { + case "chromium.queue.wait": + queueWaits++ + case "chromium.process.start": + processStarts++ + for _, kv := range span.Attributes() { + if string(kv.Key) == "gotenberg.process.start.reason" { + startReason = kv.Value.AsString() + } + } + } + } + + if queueWaits != 1 { + t.Errorf("expected 1 chromium.queue.wait span, got %d", queueWaits) + } + if processStarts != 1 { + t.Errorf("expected 1 chromium.process.start span (first start), got %d", processStarts) + } + if startReason != "first_start" { + t.Errorf("expected process start reason first_start, got %q", startReason) + } +}