diff --git a/Makefile b/Makefile index dd6fc00..856d391 100644 --- a/Makefile +++ b/Makefile @@ -33,6 +33,7 @@ API_DISABLE_HEALTH_CHECK_ROUTE_TELEMETRY=false API_ENABLE_DEBUG_ROUTE=false CHROMIUM_RESTART_AFTER=100 CHROMIUM_MAX_QUEUE_SIZE=0 +CHROMIUM_IDLE_SHUTDOWN_TIMEOUT=0 CHROMIUM_MAX_CONCURRENCY=6 CHROMIUM_AUTO_START=false CHROMIUM_START_TIMEOUT=20s @@ -50,6 +51,7 @@ CHROMIUM_DISABLE_JAVASCRIPT=false CHROMIUM_DISABLE_ROUTES=false LIBREOFFICE_RESTART_AFTER=10 LIBREOFFICE_MAX_QUEUE_SIZE=0 +LIBREOFFICE_IDLE_SHUTDOWN_TIMEOUT=0 LIBREOFFICE_AUTO_START=false LIBREOFFICE_START_TIMEOUT=20s LIBREOFFICE_DISABLE_ROUTES=false diff --git a/compose.yaml b/compose.yaml index ae1b5e4..8d8af54 100644 --- a/compose.yaml +++ b/compose.yaml @@ -36,6 +36,7 @@ services: - "--chromium-restart-after=${CHROMIUM_RESTART_AFTER}" - "--chromium-auto-start=${CHROMIUM_AUTO_START}" - "--chromium-max-queue-size=${CHROMIUM_MAX_QUEUE_SIZE}" + - "--chromium-idle-shutdown-timeout=${CHROMIUM_IDLE_SHUTDOWN_TIMEOUT}" - "--chromium-max-concurrency=${CHROMIUM_MAX_CONCURRENCY}" - "--chromium-start-timeout=${CHROMIUM_START_TIMEOUT}" - "--chromium-allow-insecure-localhost=${CHROMIUM_ALLOW_INSECURE_LOCALHOST}" @@ -52,6 +53,7 @@ services: - "--chromium-disable-routes=${CHROMIUM_DISABLE_ROUTES}" - "--libreoffice-restart-after=${LIBREOFFICE_RESTART_AFTER}" - "--libreoffice-max-queue-size=${LIBREOFFICE_MAX_QUEUE_SIZE}" + - "--libreoffice-idle-shutdown-timeout=${LIBREOFFICE_IDLE_SHUTDOWN_TIMEOUT}" - "--libreoffice-auto-start=${LIBREOFFICE_AUTO_START}" - "--libreoffice-start-timeout=${LIBREOFFICE_START_TIMEOUT}" - "--libreoffice-disable-routes=${LIBREOFFICE_DISABLE_ROUTES}" diff --git a/pkg/gotenberg/supervisor.go b/pkg/gotenberg/supervisor.go index 0766266..9f15dfb 100644 --- a/pkg/gotenberg/supervisor.go +++ b/pkg/gotenberg/supervisor.go @@ -91,28 +91,33 @@ type processSupervisor struct { // via firstStartOnce. Subsequent callers that enter the !firstStart block // need to observe this value after the Once has completed, without // re-executing the closure. - firstStartErr error - reqCounter atomic.Int64 - reqQueueSize atomic.Int64 - restartsCounter atomic.Int64 - isRestarting atomic.Bool - activeTasks atomic.Int64 - restartMutex sync.Mutex + firstStartErr error + reqCounter atomic.Int64 + reqQueueSize atomic.Int64 + restartsCounter atomic.Int64 + isRestarting atomic.Bool + activeTasks atomic.Int64 + restartMutex sync.Mutex + idleShutdownTimeout time.Duration + lastActivity atomic.Int64 // unix nano timestamp of last completed task + idleMu sync.Mutex // protects idleStopChan + idleStopChan chan struct{} // signal to stop the idle ticker goroutine } // NewProcessSupervisor initializes a new [ProcessSupervisor]. -func NewProcessSupervisor(logger *slog.Logger, process Process, maxReqLimit, maxQueueSize, maxConcurrency int64) ProcessSupervisor { +func NewProcessSupervisor(logger *slog.Logger, process Process, maxReqLimit, maxQueueSize, maxConcurrency int64, idleShutdownTimeout time.Duration) ProcessSupervisor { if maxConcurrency < 1 { maxConcurrency = 1 } b := &processSupervisor{ - logger: logger, - process: process, - semaphore: make(chan struct{}, maxConcurrency), - maxReqLimit: maxReqLimit, - maxQueueSize: maxQueueSize, - maxConcurrency: maxConcurrency, + logger: logger, + process: process, + semaphore: make(chan struct{}, maxConcurrency), + maxReqLimit: maxReqLimit, + maxQueueSize: maxQueueSize, + maxConcurrency: maxConcurrency, + idleShutdownTimeout: idleShutdownTimeout, } b.reqCounter.Store(0) b.reqQueueSize.Store(0) @@ -131,6 +136,12 @@ func (s *processSupervisor) Launch() error { } s.firstStart.Store(true) + + if s.idleShutdownTimeout > 0 { + s.lastActivity.Store(time.Now().UnixNano()) + s.startIdleTicker() + } + s.logger.DebugContext(context.Background(), "process successfully started") return nil @@ -138,6 +149,9 @@ func (s *processSupervisor) Launch() error { func (s *processSupervisor) Shutdown() error { s.logger.DebugContext(context.Background(), "shutdown process") + + s.stopIdleTicker() + err := s.process.Stop(s.logger) if err != nil { return fmt.Errorf("shutdown process: %w", err) @@ -220,6 +234,9 @@ func (s *processSupervisor) Run(ctx context.Context, logger *slog.Logger, task f defer func() { s.activeTasks.Add(-1) + if s.idleShutdownTimeout > 0 { + s.lastActivity.Store(time.Now().UnixNano()) + } if semaphoreOwned { logger.DebugContext(ctx, "process lock released") <-s.semaphore @@ -255,6 +272,87 @@ func (s *processSupervisor) Run(ctx context.Context, logger *slog.Logger, task f } } +// startIdleTicker starts a background goroutine that periodically checks +// whether the process has been idle long enough to shut down. +func (s *processSupervisor) startIdleTicker() { + stopChan := make(chan struct{}) + + s.idleMu.Lock() + s.idleStopChan = stopChan + s.idleMu.Unlock() + + go func() { + ticker := time.NewTicker(s.idleShutdownTimeout) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.maybeIdleShutdown() + case <-stopChan: + return + } + } + }() +} + +// stopIdleTicker signals the idle ticker goroutine to exit, if one is running. +func (s *processSupervisor) stopIdleTicker() { + s.idleMu.Lock() + defer s.idleMu.Unlock() + + if s.idleStopChan != nil { + close(s.idleStopChan) + s.idleStopChan = nil + } +} + +// maybeIdleShutdown stops the process if it has been idle for longer than +// the configured timeout. It is safe to call concurrently with Run and +// restart. +func (s *processSupervisor) maybeIdleShutdown() { + if !s.firstStart.Load() || s.isRestarting.Load() { + return + } + + if s.activeTasks.Load() > 0 || s.reqQueueSize.Load() > 0 { + return + } + + lastNano := s.lastActivity.Load() + if lastNano == 0 || time.Since(time.Unix(0, lastNano)) < s.idleShutdownTimeout { + return + } + + if !s.restartMutex.TryLock() { + return + } + defer s.restartMutex.Unlock() + + // Double-check after acquiring the lock. + if s.activeTasks.Load() > 0 || s.reqQueueSize.Load() > 0 { + return + } + + s.logger.InfoContext(context.Background(), "idle shutdown timeout reached, stopping process") + + // Stop the ticker — it will be restarted on the next Launch(). + s.stopIdleTicker() + + err := s.process.Stop(s.logger) + if err != nil { + s.logger.WarnContext(context.Background(), fmt.Sprintf("idle shutdown: %s", err)) + return + } + + // Reset state so ensureStarted() re-launches on next request. + s.firstStart.Store(false) + s.firstStartOnce = sync.Once{} + s.firstStartErr = nil + s.reqCounter.Store(0) + + s.logger.InfoContext(context.Background(), "process stopped due to idle timeout") +} + // acquireSlot attempts to acquire a semaphore slot, yielding it back if a // restart drain is in progress. func (s *processSupervisor) acquireSlot(ctx context.Context, logger *slog.Logger) error { diff --git a/pkg/gotenberg/supervisor_test.go b/pkg/gotenberg/supervisor_test.go index b44cd5c..a5c0c57 100644 --- a/pkg/gotenberg/supervisor_test.go +++ b/pkg/gotenberg/supervisor_test.go @@ -45,7 +45,7 @@ func TestProcessSupervisor_Launch(t *testing.T) { }, } - ps := NewProcessSupervisor(logger, process, 5, 0, 1).(*processSupervisor) + ps := NewProcessSupervisor(logger, process, 5, 0, 1, 0).(*processSupervisor) if tc.firstStartSet { ps.firstStart.Store(true) } @@ -93,7 +93,7 @@ func TestProcessSupervisor_Shutdown(t *testing.T) { }, } - ps := NewProcessSupervisor(logger, process, 5, 0, 1) + ps := NewProcessSupervisor(logger, process, 5, 0, 1, 0) err := ps.Shutdown() if !tc.expectError && err != nil { @@ -145,7 +145,7 @@ func TestProcessSupervisor_restart(t *testing.T) { }, } - ps := NewProcessSupervisor(logger, process, 5, 0, 1).(*processSupervisor) + ps := NewProcessSupervisor(logger, process, 5, 0, 1, 0).(*processSupervisor) err := ps.restart() @@ -201,7 +201,7 @@ func TestProcessSupervisor_Healthy(t *testing.T) { }, } - ps := NewProcessSupervisor(logger, process, 5, 0, 1).(*processSupervisor) + ps := NewProcessSupervisor(logger, process, 5, 0, 1, 0).(*processSupervisor) if tc.initiallyStarted { ps.firstStart.Store(true) } @@ -386,7 +386,7 @@ func TestProcessSupervisor_Run(t *testing.T) { }, } - ps := NewProcessSupervisor(logger, process, tc.maxReqLimit, tc.maxQueueSize, 1).(*processSupervisor) + ps := NewProcessSupervisor(logger, process, tc.maxReqLimit, tc.maxQueueSize, 1, 0).(*processSupervisor) if tc.initiallyStarted { ps.firstStart.Store(true) } @@ -470,7 +470,7 @@ func TestProcessSupervisor_runWithDeadline(t *testing.T) { }, } { t.Run(tc.scenario, func(t *testing.T) { - ps := NewProcessSupervisor(slog.New(slog.DiscardHandler), new(ProcessMock), 0, 0, 1).(*processSupervisor) + ps := NewProcessSupervisor(slog.New(slog.DiscardHandler), new(ProcessMock), 0, 0, 1, 0).(*processSupervisor) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) if tc.ctxDone { @@ -504,7 +504,7 @@ func TestProcessSupervisor_ReqQueueSize(t *testing.T) { return true }, } - ps := NewProcessSupervisor(logger, process, 0, 0, 1).(*processSupervisor) + ps := NewProcessSupervisor(logger, process, 0, 0, 1, 0).(*processSupervisor) // Simulating a lock. ps.semaphore <- struct{}{} @@ -565,7 +565,7 @@ func TestProcessSupervisor_QueueSizeCAS(t *testing.T) { maxQueueSize := int64(50) // maxConcurrency=1 so all goroutines block on the semaphore, exercising queue logic. - ps := NewProcessSupervisor(logger, process, 0, maxQueueSize, 1).(*processSupervisor) + ps := NewProcessSupervisor(logger, process, 0, maxQueueSize, 1, 0).(*processSupervisor) // Simulating a lock so that all goroutines queue up. ps.semaphore <- struct{}{} @@ -619,7 +619,7 @@ func TestProcessSupervisor_QueueSizeIncludesActiveTasks(t *testing.T) { } // maxQueueSize=1, maxConcurrency=1: only one request at a time. - ps := NewProcessSupervisor(logger, process, 0, 1, 1) + ps := NewProcessSupervisor(logger, process, 0, 1, 1, 0) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -708,7 +708,7 @@ func TestProcessSupervisor_RestartsCount(t *testing.T) { }, } - ps := NewProcessSupervisor(logger, process, 0, 0, 1).(*processSupervisor) + ps := NewProcessSupervisor(logger, process, 0, 0, 1, 0).(*processSupervisor) ps.restartsCounter.Store(tc.initialRestartsCount) for i := 0; i < tc.restartAttempts; i++ { @@ -741,7 +741,7 @@ func TestProcessSupervisor_ConcurrentRun(t *testing.T) { } maxConcurrency := int64(3) - ps := NewProcessSupervisor(logger, process, 0, 0, maxConcurrency).(*processSupervisor) + ps := NewProcessSupervisor(logger, process, 0, 0, maxConcurrency, 0).(*processSupervisor) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -803,7 +803,7 @@ func TestProcessSupervisor_RestartDrainsAllSlots(t *testing.T) { } maxConcurrency := int64(3) - ps := NewProcessSupervisor(logger, process, 3, 0, maxConcurrency).(*processSupervisor) + ps := NewProcessSupervisor(logger, process, 3, 0, maxConcurrency, 0).(*processSupervisor) ps.firstStart.Store(true) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -841,3 +841,103 @@ func TestProcessSupervisor_RestartDrainsAllSlots(t *testing.T) { t.Fatalf("expected 1 restart, got %d", ps.RestartsCount()) } } + +func TestProcessSupervisor_IdleShutdown(t *testing.T) { + logger := slog.New(slog.DiscardHandler) + + var stopCalls atomic.Int64 + process := &ProcessMock{ + StartMock: func(logger *slog.Logger) error { + return nil + }, + StopMock: func(logger *slog.Logger) error { + stopCalls.Add(1) + return nil + }, + HealthyMock: func(logger *slog.Logger) bool { + return true + }, + } + + idleTimeout := 50 * time.Millisecond + ps := NewProcessSupervisor(logger, process, 0, 0, 1, idleTimeout).(*processSupervisor) + + ctx := context.Background() + err := ps.Run(ctx, logger, func() error { + return nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Wait for idle shutdown to fire. + deadline := time.After(2 * time.Second) + for ps.firstStart.Load() { + select { + case <-deadline: + t.Fatal("timed out waiting for idle shutdown") + default: + time.Sleep(10 * time.Millisecond) + } + } + + if stopCalls.Load() < 1 { + t.Fatal("expected process to be stopped via idle shutdown") + } + + // Verify re-launch on next request. + err = ps.Run(ctx, logger, func() error { + return nil + }) + if err != nil { + t.Fatalf("unexpected error on re-launch: %v", err) + } + + if !ps.firstStart.Load() { + t.Fatal("expected process to be re-launched after idle shutdown") + } +} + +func TestProcessSupervisor_IdleShutdownSkippedWhenActive(t *testing.T) { + logger := slog.New(slog.DiscardHandler) + + var stopCalls atomic.Int64 + taskRunning := make(chan struct{}) + taskDone := make(chan struct{}) + + process := &ProcessMock{ + StartMock: func(logger *slog.Logger) error { + return nil + }, + StopMock: func(logger *slog.Logger) error { + stopCalls.Add(1) + return nil + }, + HealthyMock: func(logger *slog.Logger) bool { + return true + }, + } + + idleTimeout := 50 * time.Millisecond + ps := NewProcessSupervisor(logger, process, 0, 0, 1, idleTimeout) + + ctx := context.Background() + go func() { + _ = ps.Run(ctx, logger, func() error { + close(taskRunning) + <-taskDone + return nil + }) + }() + + <-taskRunning + + // Wait longer than the idle timeout while a task is active. + time.Sleep(idleTimeout * 3) + + if stopCalls.Load() > 0 { + t.Fatal("idle shutdown should not fire while a task is active") + } + + close(taskDone) +} diff --git a/pkg/modules/chromium/chromium.go b/pkg/modules/chromium/chromium.go index 04f62c7..e2805cc 100644 --- a/pkg/modules/chromium/chromium.go +++ b/pkg/modules/chromium/chromium.go @@ -435,6 +435,7 @@ func (mod *Chromium) Descriptor() gotenberg.ModuleDescriptor { fs := flag.NewFlagSet("chromium", flag.ExitOnError) fs.Int64("chromium-restart-after", 100, "Number of conversions after which Chromium will automatically restart. Set to 0 to disable this feature") fs.Int64("chromium-max-queue-size", 0, "Maximum request queue size for Chromium. Set to 0 to disable this feature") + fs.Duration("chromium-idle-shutdown-timeout", 0, "Shutdown Chromium after being idle for the given duration. Set to 0 to disable this feature") fs.Int64("chromium-max-concurrency", 6, "Maximum number of concurrent conversions. Chromium supports up to 6") fs.Bool("chromium-auto-start", false, "Automatically launch Chromium upon initialization if set to true; otherwise, Chromium will start at the time of the first conversion") fs.Duration("chromium-start-timeout", time.Duration(20)*time.Second, "Maximum duration to wait for Chromium to start or restart") @@ -504,7 +505,7 @@ func (mod *Chromium) Provision(ctx *gotenberg.Context) error { // Process. mod.browser = newChromiumBrowser(mod.args) - mod.supervisor = gotenberg.NewProcessSupervisor(mod.logger, mod.browser, flags.MustInt64("chromium-restart-after"), flags.MustInt64("chromium-max-queue-size"), mod.maxConcurrency) + mod.supervisor = gotenberg.NewProcessSupervisor(mod.logger, mod.browser, flags.MustInt64("chromium-restart-after"), flags.MustInt64("chromium-max-queue-size"), mod.maxConcurrency, flags.MustDuration("chromium-idle-shutdown-timeout")) // PDF Engine. provider, err := ctx.Module(new(gotenberg.PdfEngineProvider)) diff --git a/pkg/modules/libreoffice/api/api.go b/pkg/modules/libreoffice/api/api.go index b2d4bdf..b33051d 100644 --- a/pkg/modules/libreoffice/api/api.go +++ b/pkg/modules/libreoffice/api/api.go @@ -325,6 +325,7 @@ func (a *Api) Descriptor() gotenberg.ModuleDescriptor { fs := flag.NewFlagSet("api", flag.ExitOnError) fs.Int64("libreoffice-restart-after", 10, "Number of conversions after which LibreOffice will automatically restart. Set to 0 to disable this feature") fs.Int64("libreoffice-max-queue-size", 0, "Maximum request queue size for LibreOffice. Set to 0 to disable this feature") + fs.Duration("libreoffice-idle-shutdown-timeout", 0, "Shutdown LibreOffice after being idle for the given duration. Set to 0 to disable this feature") fs.Bool("libreoffice-auto-start", false, "Automatically launch LibreOffice upon initialization if set to true; otherwise, LibreOffice will start at the time of the first conversion") fs.Duration("libreoffice-start-timeout", time.Duration(20)*time.Second, "Maximum duration to wait for LibreOffice to start or restart") @@ -360,7 +361,7 @@ func (a *Api) Provision(ctx *gotenberg.Context) error { // Process. a.libreOffice = newLibreOfficeProcess(a.args) - a.supervisor = gotenberg.NewProcessSupervisor(a.logger, a.libreOffice, flags.MustInt64("libreoffice-restart-after"), flags.MustInt64("libreoffice-max-queue-size"), 1) + a.supervisor = gotenberg.NewProcessSupervisor(a.logger, a.libreOffice, flags.MustInt64("libreoffice-restart-after"), flags.MustInt64("libreoffice-max-queue-size"), 1, flags.MustDuration("libreoffice-idle-shutdown-timeout")) // Metrics. meter := gotenberg.Meter() diff --git a/test/integration/features/debug.feature b/test/integration/features/debug.feature index d3fb43e..5568ccb 100644 --- a/test/integration/features/debug.feature +++ b/test/integration/features/debug.feature @@ -84,6 +84,7 @@ Feature: /debug "chromium-disable-web-security": "false", "chromium-host-resolver-rules": "", "chromium-ignore-certificate-errors": "false", + "chromium-idle-shutdown-timeout": "0s", "chromium-incognito": "false", "chromium-max-concurrency": "6", "chromium-max-queue-size": "0", @@ -94,6 +95,7 @@ Feature: /debug "gotenberg-graceful-shutdown-duration": "30s", "libreoffice-auto-start": "false", "libreoffice-disable-routes": "false", + "libreoffice-idle-shutdown-timeout": "0s", "libreoffice-max-queue-size": "0", "libreoffice-restart-after": "10", "libreoffice-start-timeout": "20s", @@ -210,6 +212,7 @@ Feature: /debug "chromium-disable-web-security": "false", "chromium-host-resolver-rules": "", "chromium-ignore-certificate-errors": "false", + "chromium-idle-shutdown-timeout": "0s", "chromium-incognito": "false", "chromium-max-queue-size": "0", "chromium-max-concurrency": "6", @@ -220,6 +223,7 @@ Feature: /debug "gotenberg-graceful-shutdown-duration": "30s", "libreoffice-auto-start": "false", "libreoffice-disable-routes": "false", + "libreoffice-idle-shutdown-timeout": "0s", "libreoffice-max-queue-size": "0", "libreoffice-restart-after": "10", "libreoffice-start-timeout": "20s",