mirror of
https://github.com/gotenberg/gotenberg.git
synced 2026-07-02 08:27:41 +08:00
fix(gotenberg): debounce supervisor health probes to absorb transient CDP latency
This commit is contained in:
@@ -78,6 +78,23 @@ type ProcessSupervisor interface {
|
||||
ActiveTasksCount() int64
|
||||
}
|
||||
|
||||
// healthCheckCacheTTL caches successful health probe results so kubelet-
|
||||
// style probes (liveness + readiness, every few seconds each) do not
|
||||
// hammer the underlying process with CDP roundtrips on every call.
|
||||
// Tuned to bridge typical probe periods while still catching outages
|
||||
// quickly: a real outage surfaces on the next probe after the TTL
|
||||
// elapses.
|
||||
const healthCheckCacheTTL = 2 * time.Second
|
||||
|
||||
// healthFailureThreshold is the number of consecutive Healthy() failures
|
||||
// the supervisor tolerates before reporting unhealthy. Absorbs single-
|
||||
// probe blips of transient CDP latency (for example a slow
|
||||
// Browser.getVersion roundtrip when several conversion slots are
|
||||
// simultaneously stuck), without delaying detection of a real outage.
|
||||
// The container orchestrator's own failureThreshold stacks on top of
|
||||
// this. See https://github.com/gotenberg/gotenberg/issues/1561.
|
||||
const healthFailureThreshold = 2
|
||||
|
||||
type processSupervisor struct {
|
||||
logger *slog.Logger
|
||||
process Process
|
||||
@@ -100,9 +117,15 @@ type processSupervisor struct {
|
||||
activeTasks atomic.Int64
|
||||
restartMutex sync.Mutex
|
||||
idleShutdownTimeout time.Duration
|
||||
lastActivity atomic.Int64 // unix nano timestamp of last completed task
|
||||
idleMu sync.Mutex // protects idleStopChan
|
||||
idleStopChan chan struct{} // signal to stop the idle ticker goroutine
|
||||
lastActivity atomic.Int64 // unix nano timestamp of last completed task
|
||||
// healthMu serializes Healthy() probes so concurrent callers do not
|
||||
// all issue a CDP roundtrip; the second caller hits the refreshed
|
||||
// cache instead.
|
||||
healthMu sync.Mutex
|
||||
lastHealthyAt atomic.Int64 // unix nano of last successful probe; 0 means never
|
||||
consecutiveHealthFailures atomic.Int64 // reset to 0 on every successful probe
|
||||
idleMu sync.Mutex // protects idleStopChan
|
||||
idleStopChan chan struct{} // signal to stop the idle ticker goroutine
|
||||
}
|
||||
|
||||
// NewProcessSupervisor initializes a new [ProcessSupervisor].
|
||||
@@ -195,12 +218,52 @@ func (s *processSupervisor) Healthy() bool {
|
||||
}
|
||||
|
||||
if s.isRestarting.Load() {
|
||||
// A restarting process is not yet healthy — this gives load balancers
|
||||
// A restarting process is not yet healthy. This gives load balancers
|
||||
// honest information so they can avoid routing traffic to this node.
|
||||
return false
|
||||
}
|
||||
|
||||
return s.process.Healthy(s.logger)
|
||||
// Cache hit: a recent probe succeeded. Skip the CDP roundtrip so probe
|
||||
// spam does not pile commands onto a busy websocket.
|
||||
if s.recentlyHealthy() {
|
||||
return true
|
||||
}
|
||||
|
||||
// Serialize probes so concurrent callers do not all roundtrip. The
|
||||
// second caller will see the refreshed cache (or counter) and return
|
||||
// without re-probing.
|
||||
s.healthMu.Lock()
|
||||
defer s.healthMu.Unlock()
|
||||
|
||||
if s.recentlyHealthy() {
|
||||
return true
|
||||
}
|
||||
|
||||
if s.process.Healthy(s.logger) {
|
||||
s.lastHealthyAt.Store(time.Now().UnixNano())
|
||||
s.consecutiveHealthFailures.Store(0)
|
||||
return true
|
||||
}
|
||||
|
||||
if s.consecutiveHealthFailures.Add(1) < healthFailureThreshold {
|
||||
// First failure: tolerate it. Under load, a single blown CDP
|
||||
// timeout is more likely transient pressure than a dead process.
|
||||
// A genuinely dead process will fail the next probe as well and
|
||||
// flip us unhealthy then.
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// recentlyHealthy reports whether a successful probe landed within
|
||||
// [healthCheckCacheTTL]. Negative results are never cached so recovery
|
||||
// from a real outage is observable on the very next probe.
|
||||
func (s *processSupervisor) recentlyHealthy() bool {
|
||||
last := s.lastHealthyAt.Load()
|
||||
if last == 0 {
|
||||
return false
|
||||
}
|
||||
return time.Since(time.Unix(0, last)) < healthCheckCacheTTL
|
||||
}
|
||||
|
||||
func (s *processSupervisor) Run(ctx context.Context, logger *slog.Logger, task func() error) error {
|
||||
|
||||
@@ -186,10 +186,10 @@ func TestProcessSupervisor_Healthy(t *testing.T) {
|
||||
expectHealthy: true,
|
||||
},
|
||||
{
|
||||
scenario: "process reports as unhealthy",
|
||||
scenario: "single probe failure is tolerated",
|
||||
initiallyStarted: true,
|
||||
processHealthy: false,
|
||||
expectHealthy: false,
|
||||
expectHealthy: true,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.scenario, func(t *testing.T) {
|
||||
@@ -218,6 +218,109 @@ func TestProcessSupervisor_Healthy(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestProcessSupervisor_Healthy_ConsecutiveFailures verifies that only
|
||||
// the second consecutive process-level failure flips the supervisor to
|
||||
// unhealthy, and that a single success in between resets the counter.
|
||||
func TestProcessSupervisor_Healthy_ConsecutiveFailures(t *testing.T) {
|
||||
logger := slog.New(slog.DiscardHandler)
|
||||
|
||||
var processHealthy atomic.Bool
|
||||
process := &ProcessMock{
|
||||
HealthyMock: func(_ *slog.Logger) bool { return processHealthy.Load() },
|
||||
}
|
||||
|
||||
ps := NewProcessSupervisor(logger, process, 5, 0, 1, 0).(*processSupervisor)
|
||||
ps.firstStart.Store(true)
|
||||
|
||||
processHealthy.Store(false)
|
||||
if !ps.Healthy() {
|
||||
t.Fatal("first failure should be tolerated and report healthy")
|
||||
}
|
||||
if ps.Healthy() {
|
||||
t.Fatal("second consecutive failure should report unhealthy")
|
||||
}
|
||||
|
||||
processHealthy.Store(true)
|
||||
if !ps.Healthy() {
|
||||
t.Fatal("recovery should report healthy immediately")
|
||||
}
|
||||
|
||||
processHealthy.Store(false)
|
||||
// Cache hit from the previous success absorbs the first new failure;
|
||||
// invalidate it so we exercise the counter again.
|
||||
ps.lastHealthyAt.Store(0)
|
||||
if !ps.Healthy() {
|
||||
t.Fatal("post-recovery first failure should be tolerated again")
|
||||
}
|
||||
if ps.Healthy() {
|
||||
t.Fatal("post-recovery second consecutive failure should report unhealthy")
|
||||
}
|
||||
}
|
||||
|
||||
// TestProcessSupervisor_Healthy_CachesPositiveResult verifies that a
|
||||
// successful probe is cached for [healthCheckCacheTTL] so subsequent
|
||||
// supervisor.Healthy() calls do not re-issue the underlying process
|
||||
// check.
|
||||
func TestProcessSupervisor_Healthy_CachesPositiveResult(t *testing.T) {
|
||||
logger := slog.New(slog.DiscardHandler)
|
||||
|
||||
var calls atomic.Int64
|
||||
process := &ProcessMock{
|
||||
HealthyMock: func(_ *slog.Logger) bool {
|
||||
calls.Add(1)
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
ps := NewProcessSupervisor(logger, process, 5, 0, 1, 0).(*processSupervisor)
|
||||
ps.firstStart.Store(true)
|
||||
|
||||
for range 5 {
|
||||
if !ps.Healthy() {
|
||||
t.Fatal("expected healthy")
|
||||
}
|
||||
}
|
||||
|
||||
if got := calls.Load(); got != 1 {
|
||||
t.Fatalf("process.Healthy called %d times, want exactly 1 (cache should absorb the other 4)", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProcessSupervisor_Healthy_DoesNotCacheNegativeResult verifies that
|
||||
// a probe failure is not cached: the next Healthy() call must re-issue
|
||||
// the underlying process check so a recovered process surfaces on the
|
||||
// very next probe.
|
||||
func TestProcessSupervisor_Healthy_DoesNotCacheNegativeResult(t *testing.T) {
|
||||
logger := slog.New(slog.DiscardHandler)
|
||||
|
||||
var calls atomic.Int64
|
||||
var processHealthy atomic.Bool
|
||||
process := &ProcessMock{
|
||||
HealthyMock: func(_ *slog.Logger) bool {
|
||||
calls.Add(1)
|
||||
return processHealthy.Load()
|
||||
},
|
||||
}
|
||||
|
||||
ps := NewProcessSupervisor(logger, process, 5, 0, 1, 0).(*processSupervisor)
|
||||
ps.firstStart.Store(true)
|
||||
|
||||
processHealthy.Store(false)
|
||||
_ = ps.Healthy()
|
||||
_ = ps.Healthy()
|
||||
if got := calls.Load(); got != 2 {
|
||||
t.Fatalf("after two failing probes, process.Healthy called %d times, want 2 (negative results must not be cached)", got)
|
||||
}
|
||||
|
||||
processHealthy.Store(true)
|
||||
if !ps.Healthy() {
|
||||
t.Fatal("expected healthy on recovery")
|
||||
}
|
||||
if got := calls.Load(); got != 3 {
|
||||
t.Fatalf("after recovery, process.Healthy called %d times, want 3", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessSupervisor_Run(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
scenario string
|
||||
|
||||
Reference in New Issue
Block a user