refactor(supervisor): better orchestration of processes for conversion requests

This commit is contained in:
Julien Neuhart
2026-03-26 20:58:58 +01:00
parent 8e3acc8d0a
commit ed22f1e5e6
2 changed files with 214 additions and 107 deletions
+156 -104
View File
@@ -6,6 +6,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
)
@@ -54,9 +55,9 @@ type ProcessSupervisor interface {
// Healthy checks and returns the health status of the managed [Process].
//
// If the process has not been started or is restarting, it is considered
// healthy and true is returned. Otherwise, it returns the health status of
// the actual process.
// A non-started process is considered healthy (startup is deferred until
// the first request). Returns false if the process is currently restarting
// or is reported unhealthy by the underlying [Process].
Healthy() bool
// Run executes a provided task while managing the state of the [Process].
@@ -76,14 +77,18 @@ type ProcessSupervisor interface {
}
type processSupervisor struct {
logger *zap.Logger
process Process
maxReqLimit int64
maxQueueSize int64
maxConcurrency int64
semaphore chan struct{}
firstStart atomic.Bool
firstStartOnce sync.Once
logger *zap.Logger
process Process
maxReqLimit int64
maxQueueSize int64
maxConcurrency int64
semaphore chan struct{}
firstStart atomic.Bool
firstStartOnce sync.Once
// firstStartErr stores the error from the first Launch attempt executed
// via firstStartOnce. Subsequent callers that enter the !firstStart block
// need to observe this value after the Once has completed, without
// re-executing the closure.
firstStartErr error
reqCounter atomic.Int64
reqQueueSize atomic.Int64
@@ -146,8 +151,9 @@ func (s *processSupervisor) restart() error {
err := s.Shutdown()
if err != nil {
// No big deal? Chances are it's already stopped.
s.logger.Debug(fmt.Sprintf("stop process before restart: %s", err))
// Not necessarily critical — chances are the process is already stopped,
// but worth flagging in case it indicates a real issue.
s.logger.Warn(fmt.Sprintf("stop process before restart: %s", err))
}
err = s.Launch()
@@ -164,117 +170,75 @@ func (s *processSupervisor) restart() error {
func (s *processSupervisor) Healthy() bool {
if !s.firstStart.Load() {
// A non-started process is always healthy.
// A non-started process is considered healthy: Gotenberg defers
// process startup until the first request to keep resource usage low.
// Reporting unhealthy here would cause container orchestrators to
// restart the pod before any request arrives.
return true
}
if s.isRestarting.Load() {
// A restarting process is always healthy.
return true
// A restarting process is not yet healthy — this gives load balancers
// honest information so they can avoid routing traffic to this node.
return false
}
return s.process.Healthy(s.logger)
}
func (s *processSupervisor) Run(ctx context.Context, logger *zap.Logger, task func() error) error {
// A user reported a potential issue:
//
// "Although the counting operation is atomic, nothing prevent 2 concurrent
// goroutines to retrieve the same 'currentQueueSize' and to compare its
// value against the max limit. Then, resulting queue size would be 1 above
// the allowed limit."
//
// However, he was unable to actually trigger this issue, even when sending
// a lot of requests.
//
// For now, the best option is to consider this issue to be unlikely to
// happen, and keep the code as it is because it is more readable this way.
//
// See https://github.com/gotenberg/gotenberg/issues/951.
currentQueueSize := s.reqQueueSize.Load()
if s.maxQueueSize > 0 && currentQueueSize >= s.maxQueueSize {
return ErrMaximumQueueSizeExceeded
// Atomically check and increment the queue size to avoid the TOCTOU race
// originally reported in https://github.com/gotenberg/gotenberg/issues/951.
for {
current := s.reqQueueSize.Load()
if s.maxQueueSize > 0 && current >= s.maxQueueSize {
return ErrMaximumQueueSizeExceeded
}
if s.reqQueueSize.CompareAndSwap(current, current+1) {
break
}
}
s.reqQueueSize.Add(1)
for {
err := func() error {
select {
case s.semaphore <- struct{}{}:
logger.Debug("process lock acquired")
// If a restart drain is in progress, release the slot
// immediately so the drain can acquire it instead.
if s.isRestarting.Load() {
<-s.semaphore
return ErrProcessAlreadyRestarting
}
s.reqQueueSize.Add(-1)
s.reqCounter.Add(1)
s.activeTasks.Add(1)
releaseSemaphore := true
defer func() {
s.activeTasks.Add(-1)
if releaseSemaphore {
logger.Debug("process lock released")
<-s.semaphore
}
}()
if !s.firstStart.Load() {
s.firstStartOnce.Do(func() {
s.firstStartErr = s.runWithDeadline(ctx, func() error {
return s.Launch()
})
})
if s.firstStartErr != nil {
return fmt.Errorf("process first start: %w", s.firstStartErr)
}
}
if !s.Healthy() {
s.logger.Debug("process is unhealthy, cannot handle task, restarting...")
err := s.doRestart(ctx)
if err != nil {
return fmt.Errorf("process restart before task: %w", err)
}
}
err := s.runWithDeadline(ctx, task)
if s.maxReqLimit > 0 && s.reqCounter.Load() >= s.maxReqLimit {
// Only one goroutine should trigger the restart.
if s.restartMutex.TryLock() {
s.logger.Debug("max request limit reached, restarting eagerly...")
releaseSemaphore = false
go func() {
restartErr := s.doRestartLocked(context.Background())
s.restartMutex.Unlock()
if restartErr != nil {
s.logger.Error(fmt.Sprintf("process restart after task: %v", restartErr))
}
logger.Debug("process lock released")
<-s.semaphore
}()
}
}
// Note: no error wrapping because it leaks on Chromium console exceptions output.
if err := s.acquireSlot(ctx, logger); err != nil {
return err
case <-ctx.Done():
logger.Debug("failed to acquire process lock before deadline")
s.reqQueueSize.Add(-1)
return fmt.Errorf("acquire process lock: %w", ctx.Err())
}
s.reqQueueSize.Add(-1)
s.reqCounter.Add(1)
s.activeTasks.Add(1)
semaphoreOwned := true
defer func() {
s.activeTasks.Add(-1)
if semaphoreOwned {
logger.Debug("process lock released")
<-s.semaphore
}
}()
if err := s.ensureStarted(ctx); err != nil {
return err
}
if err := s.ensureHealthy(ctx); err != nil {
return err
}
err := s.runWithDeadline(ctx, task)
if s.maybeRestartAfterTask(logger) {
semaphoreOwned = false
}
// Note: no error wrapping because it leaks on Chromium console exceptions output.
return err
}()
if errors.Is(err, ErrProcessAlreadyRestarting) {
logger.Debug("process is already restarting, trying to acquire process lock again...")
time.Sleep(10 * time.Millisecond)
continue
}
@@ -283,6 +247,94 @@ func (s *processSupervisor) Run(ctx context.Context, logger *zap.Logger, task fu
}
}
// acquireSlot attempts to acquire a semaphore slot, yielding it back if a
// restart drain is in progress.
func (s *processSupervisor) acquireSlot(ctx context.Context, logger *zap.Logger) error {
select {
case s.semaphore <- struct{}{}:
// If a restart drain is in progress, release the slot
// immediately so the drain can acquire it instead.
if s.isRestarting.Load() {
<-s.semaphore
return ErrProcessAlreadyRestarting
}
logger.Debug("process lock acquired")
return nil
case <-ctx.Done():
logger.Debug("failed to acquire process lock before deadline")
s.reqQueueSize.Add(-1)
return fmt.Errorf("acquire process lock: %w", ctx.Err())
}
}
// ensureStarted performs a one-time lazy launch of the process on its first
// use. Subsequent calls are no-ops.
func (s *processSupervisor) ensureStarted(ctx context.Context) error {
if s.firstStart.Load() {
return nil
}
s.firstStartOnce.Do(func() {
s.firstStartErr = s.runWithDeadline(ctx, func() error {
return s.Launch()
})
})
if s.firstStartErr != nil {
return fmt.Errorf("process first start: %w", s.firstStartErr)
}
return nil
}
// ensureHealthy checks the underlying process health and triggers a
// synchronous restart if the process is unhealthy. Skips the check if a
// restart is already in progress.
func (s *processSupervisor) ensureHealthy(ctx context.Context) error {
if s.isRestarting.Load() || s.process.Healthy(s.logger) {
return nil
}
s.logger.Debug("process is unhealthy, cannot handle task, restarting...")
if err := s.doRestart(ctx); err != nil {
return fmt.Errorf("process restart before task: %w", err)
}
return nil
}
// maybeRestartAfterTask checks if the maximum request limit has been reached
// and, if so, triggers an asynchronous restart. If a restart is initiated, it
// takes ownership of the caller's semaphore slot (the caller must not release
// it). Returns true if ownership was taken.
func (s *processSupervisor) maybeRestartAfterTask(logger *zap.Logger) bool {
if s.maxReqLimit <= 0 || s.reqCounter.Load() < s.maxReqLimit {
return false
}
if !s.restartMutex.TryLock() {
return false
}
s.logger.Debug("max request limit reached, restarting eagerly...")
go func() {
restartErr := s.doRestartLocked(context.Background())
s.restartMutex.Unlock()
if restartErr != nil {
s.logger.Error(fmt.Sprintf("process restart after task: %v", restartErr))
}
logger.Debug("process lock released")
<-s.semaphore
}()
return true
}
// doRestart coordinates a process restart, draining all active concurrent
// tasks before stopping and restarting the process.
func (s *processSupervisor) doRestart(ctx context.Context) error {
+58 -3
View File
@@ -170,15 +170,15 @@ func TestProcessSupervisor_Healthy(t *testing.T) {
expectHealthy bool
}{
{
scenario: "non-started process is always healthy",
scenario: "non-started process is healthy",
initiallyStarted: false,
expectHealthy: true,
},
{
scenario: "restarting process is always healthy",
scenario: "restarting process is not healthy",
initiallyStarted: true,
initiallyRestarting: true,
expectHealthy: true,
expectHealthy: false,
},
{
scenario: "process reports as healthy",
@@ -552,6 +552,61 @@ func TestProcessSupervisor_ReqQueueSize(t *testing.T) {
}
}
func TestProcessSupervisor_QueueSizeCAS(t *testing.T) {
logger := zap.NewNop()
process := &ProcessMock{
StartMock: func(logger *zap.Logger) error {
return nil
},
HealthyMock: func(logger *zap.Logger) bool {
return true
},
}
maxQueueSize := int64(50)
// maxConcurrency=1 so all goroutines block on the semaphore, exercising queue logic.
ps := NewProcessSupervisor(logger, process, 0, maxQueueSize, 1).(*processSupervisor)
// Simulating a lock so that all goroutines queue up.
ps.semaphore <- struct{}{}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
goroutines := 100
var wg sync.WaitGroup
var exceeded atomic.Int64
for range goroutines {
wg.Go(func() {
err := ps.Run(ctx, logger, func() error {
return nil
})
if err != nil {
if errors.Is(err, ErrMaximumQueueSizeExceeded) {
exceeded.Add(1)
}
}
})
}
// Wait a bit for goroutines to queue up.
time.Sleep(50 * time.Millisecond)
currentQueue := ps.ReqQueueSize()
if currentQueue > maxQueueSize {
t.Fatalf("queue size %d exceeded max %d", currentQueue, maxQueueSize)
}
cancel()
wg.Wait()
if exceeded.Load() < int64(goroutines)-maxQueueSize {
t.Errorf("expected at least %d rejections, got %d", goroutines-int(maxQueueSize), exceeded.Load())
}
}
func TestProcessSupervisor_RestartsCount(t *testing.T) {
for _, tc := range []struct {
scenario string