mirror of
https://github.com/gotenberg/gotenberg.git
synced 2026-07-02 00:17:40 +08:00
feat(gotenberg): emit queue-wait and launch sub-spans in supervisor Run
This commit is contained in:
@@ -8,6 +8,10 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// ErrProcessAlreadyRestarting happens if the [ProcessSupervisor] is trying
|
||||
@@ -279,6 +283,20 @@ func (s *processSupervisor) recentlyHealthy() bool {
|
||||
}
|
||||
|
||||
func (s *processSupervisor) Run(ctx context.Context, logger *slog.Logger, task func() error) error {
|
||||
// Time spent before the task body runs: queueing, slot acquisition, lazy
|
||||
// launch, and health checks. Ended once, when the task is about to execute.
|
||||
_, queueSpan := Tracer().Start(ctx, s.engine+".queue.wait",
|
||||
trace.WithSpanKind(trace.SpanKindInternal),
|
||||
)
|
||||
queueWaitDone := false
|
||||
endQueueWait := func() {
|
||||
if !queueWaitDone {
|
||||
queueWaitDone = true
|
||||
queueSpan.End()
|
||||
}
|
||||
}
|
||||
defer endQueueWait()
|
||||
|
||||
// Atomically check and increment the queue size to avoid the TOCTOU race
|
||||
// originally reported in https://github.com/gotenberg/gotenberg/issues/951.
|
||||
for {
|
||||
@@ -327,6 +345,7 @@ func (s *processSupervisor) Run(ctx context.Context, logger *slog.Logger, task f
|
||||
return err
|
||||
}
|
||||
|
||||
endQueueWait()
|
||||
err := s.runWithDeadline(ctx, task)
|
||||
|
||||
if s.maybeRestartAfterTask(logger) {
|
||||
@@ -465,8 +484,8 @@ func (s *processSupervisor) ensureStarted(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := s.runWithDeadline(ctx, func() error {
|
||||
return s.Launch()
|
||||
err := s.tracedLaunch(ctx, "first_start", func() error {
|
||||
return s.runWithDeadline(ctx, s.Launch)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("process first start: %w", err)
|
||||
@@ -475,6 +494,27 @@ func (s *processSupervisor) ensureStarted(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// tracedLaunch wraps a process (re)start in an <engine>.process.start span,
|
||||
// tagged with the reason that triggered it. The eager restart after the maximum
|
||||
// request limit runs on a background context, so its span is a detached root.
|
||||
func (s *processSupervisor) tracedLaunch(ctx context.Context, reason string, launch func() error) error {
|
||||
_, span := Tracer().Start(ctx, s.engine+".process.start",
|
||||
trace.WithSpanKind(trace.SpanKindInternal),
|
||||
trace.WithAttributes(attribute.String("gotenberg.process.start.reason", reason)),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
err := launch()
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
span.SetStatus(codes.Ok, "")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ensureHealthy checks the underlying process health and triggers a
|
||||
// synchronous restart if the process is unhealthy. Skips the check if a
|
||||
// restart is already in progress.
|
||||
@@ -485,7 +525,7 @@ func (s *processSupervisor) ensureHealthy(ctx context.Context) error {
|
||||
|
||||
s.logger.DebugContext(context.Background(), "process is unhealthy, cannot handle task, restarting...")
|
||||
|
||||
if err := s.doRestart(ctx); err != nil {
|
||||
if err := s.doRestart(ctx, "unhealthy"); err != nil {
|
||||
return fmt.Errorf("process restart before task: %w", err)
|
||||
}
|
||||
|
||||
@@ -508,7 +548,7 @@ func (s *processSupervisor) maybeRestartAfterTask(logger *slog.Logger) bool {
|
||||
s.logger.DebugContext(context.Background(), "max request limit reached, restarting eagerly...")
|
||||
|
||||
go func() {
|
||||
restartErr := s.doRestartLocked(context.Background())
|
||||
restartErr := s.doRestartLocked(context.Background(), "max_requests")
|
||||
s.restartMutex.Unlock()
|
||||
if restartErr != nil {
|
||||
s.logger.ErrorContext(context.Background(), fmt.Sprintf("process restart after task: %v", restartErr))
|
||||
@@ -522,15 +562,15 @@ func (s *processSupervisor) maybeRestartAfterTask(logger *slog.Logger) bool {
|
||||
|
||||
// doRestart coordinates a process restart, draining all active concurrent
|
||||
// tasks before stopping and restarting the process.
|
||||
func (s *processSupervisor) doRestart(ctx context.Context) error {
|
||||
func (s *processSupervisor) doRestart(ctx context.Context, reason string) error {
|
||||
s.restartMutex.Lock()
|
||||
defer s.restartMutex.Unlock()
|
||||
|
||||
return s.doRestartLocked(ctx)
|
||||
return s.doRestartLocked(ctx, reason)
|
||||
}
|
||||
|
||||
// doRestartLocked performs the restart drain logic. The caller must hold restartMutex.
|
||||
func (s *processSupervisor) doRestartLocked(ctx context.Context) error {
|
||||
func (s *processSupervisor) doRestartLocked(ctx context.Context, reason string) error {
|
||||
s.isRestarting.Store(true)
|
||||
defer s.isRestarting.Store(false)
|
||||
|
||||
@@ -550,8 +590,8 @@ func (s *processSupervisor) doRestartLocked(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
err := s.runWithDeadline(ctx, func() error {
|
||||
return s.restart()
|
||||
err := s.tracedLaunch(ctx, reason, func() error {
|
||||
return s.runWithDeadline(ctx, s.restart)
|
||||
})
|
||||
|
||||
for range acquired {
|
||||
|
||||
@@ -1116,3 +1116,45 @@ func TestProcessSupervisor_ConversionsSinceRestart(t *testing.T) {
|
||||
t.Errorf("expected 0 conversions after restart, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessSupervisor_RunEmitsSubSpans(t *testing.T) {
|
||||
recorder := newTestSpanRecorder(t)
|
||||
|
||||
process := &ProcessMock{
|
||||
StartMock: func(*slog.Logger) error { return nil },
|
||||
StopMock: func(*slog.Logger) error { return nil },
|
||||
HealthyMock: func(*slog.Logger) bool { return true },
|
||||
}
|
||||
s := NewProcessSupervisor(slog.New(slog.DiscardHandler), "chromium", process, 0, 0, 1, 0)
|
||||
|
||||
err := s.Run(context.Background(), slog.New(slog.DiscardHandler), func() error { return nil })
|
||||
if err != nil {
|
||||
t.Fatalf("run: %v", err)
|
||||
}
|
||||
|
||||
var queueWaits, processStarts int
|
||||
var startReason string
|
||||
for _, span := range recorder.Ended() {
|
||||
switch span.Name() {
|
||||
case "chromium.queue.wait":
|
||||
queueWaits++
|
||||
case "chromium.process.start":
|
||||
processStarts++
|
||||
for _, kv := range span.Attributes() {
|
||||
if string(kv.Key) == "gotenberg.process.start.reason" {
|
||||
startReason = kv.Value.AsString()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if queueWaits != 1 {
|
||||
t.Errorf("expected 1 chromium.queue.wait span, got %d", queueWaits)
|
||||
}
|
||||
if processStarts != 1 {
|
||||
t.Errorf("expected 1 chromium.process.start span (first start), got %d", processStarts)
|
||||
}
|
||||
if startReason != "first_start" {
|
||||
t.Errorf("expected process start reason first_start, got %q", startReason)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user