mirror of
https://github.com/gotenberg/gotenberg.git
synced 2026-07-02 08:27:41 +08:00
feat(chromium): re-add concurrency support for Chromium (#1467)
* feat: add concurrency support to ProcessSupervisor - Replace the single-slot mutex channel with a configurable semaphore to allow multiple concurrent tasks. - Add drain logic to ensure all active tasks complete before process restarts. * feat: add chromium-max-concurrency flag - Add a --chromium-max-concurrency flag (1-6) to the Chromium module to control how many conversions run in parallel. - Update LibreOffice to pass maxConcurrency=1 as LibreOffice only supports a single concurrent conversion. * test: add integration tests for concurrent Chromium conversions - Add concurrent request support to the integration test framework with new step definitions for sending parallel requests and asserting on all responses. - Add a feature file for concurrent HTML to PDF conversions.
This commit is contained in:
@@ -0,0 +1,20 @@
|
||||
@chromium
|
||||
@chromium-concurrent
|
||||
Feature: Chromium concurrent conversions
|
||||
|
||||
Scenario: Concurrent HTML to PDF conversions with max concurrency 3
|
||||
Given I have a Gotenberg container with the following environment variable(s):
|
||||
| CHROMIUM_MAX_CONCURRENCY | 3 |
|
||||
When I make 3 concurrent "POST" requests to Gotenberg at the "/forms/chromium/convert/html" endpoint with the following form data and header(s):
|
||||
| files | testdata/page-1-html/index.html | file |
|
||||
Then all concurrent response status codes should be 200
|
||||
Then all concurrent responses should have 1 PDF(s)
|
||||
|
||||
Scenario: Concurrent conversions exceeding restart-after limit
|
||||
Given I have a Gotenberg container with the following environment variable(s):
|
||||
| CHROMIUM_MAX_CONCURRENCY | 3 |
|
||||
| CHROMIUM_RESTART_AFTER | 5 |
|
||||
When I make 10 concurrent "POST" requests to Gotenberg at the "/forms/chromium/convert/html" endpoint with the following form data and header(s):
|
||||
| files | testdata/page-1-html/index.html | file |
|
||||
Then all concurrent response status codes should be 200
|
||||
Then all concurrent responses should have 1 PDF(s)
|
||||
@@ -204,8 +204,9 @@ Feature: /debug
|
||||
"chromium-ignore-certificate-errors": "false",
|
||||
"chromium-incognito": "false",
|
||||
"chromium-max-queue-size": "0",
|
||||
"chromium-max-concurrency": "6",
|
||||
"chromium-proxy-server": "",
|
||||
"chromium-restart-after": "10",
|
||||
"chromium-restart-after": "100",
|
||||
"chromium-start-timeout": "20s",
|
||||
"gotenberg-build-debug-data": "true",
|
||||
"gotenberg-graceful-shutdown-duration": "30s",
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cucumber/godog"
|
||||
@@ -24,6 +25,7 @@ import (
|
||||
|
||||
type scenario struct {
|
||||
resp *httptest.ResponseRecorder
|
||||
concurrentResps []*httptest.ResponseRecorder
|
||||
workdir string
|
||||
gotenbergContainer testcontainers.Container
|
||||
gotenbergContainerNetwork *testcontainers.DockerNetwork
|
||||
@@ -33,6 +35,7 @@ type scenario struct {
|
||||
|
||||
func (s *scenario) reset(ctx context.Context) error {
|
||||
s.resp = httptest.NewRecorder()
|
||||
s.concurrentResps = nil
|
||||
|
||||
err := os.RemoveAll(s.workdir)
|
||||
if err != nil {
|
||||
@@ -281,6 +284,171 @@ func (s *scenario) iMakeARequestToGotenbergWithTheFollowingFormDataAndHeaders(ct
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scenario) iMakeConcurrentRequestsToGotenberg(ctx context.Context, count int, method, endpoint string, dataTable *godog.Table) error {
|
||||
if s.gotenbergContainer == nil {
|
||||
return errors.New("no Gotenberg container")
|
||||
}
|
||||
|
||||
fields := make(map[string]string)
|
||||
files := make(map[string][]string)
|
||||
headers := make(map[string]string)
|
||||
|
||||
for _, row := range dataTable.Rows {
|
||||
name := row.Cells[0].Value
|
||||
value := row.Cells[1].Value
|
||||
kind := row.Cells[2].Value
|
||||
|
||||
switch kind {
|
||||
case "field":
|
||||
fields[name] = value
|
||||
case "file":
|
||||
wd, err := os.Getwd()
|
||||
if err != nil {
|
||||
return fmt.Errorf("get current directory: %w", err)
|
||||
}
|
||||
value = fmt.Sprintf("%s/%s", wd, value)
|
||||
files[name] = append(files[name], value)
|
||||
case "header":
|
||||
headers[name] = value
|
||||
default:
|
||||
return fmt.Errorf("unexpected %q %q", kind, value)
|
||||
}
|
||||
}
|
||||
|
||||
base, err := containerHttpEndpoint(ctx, s.gotenbergContainer, "3000")
|
||||
if err != nil {
|
||||
return fmt.Errorf("get container HTTP endpoint: %w", err)
|
||||
}
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
s.concurrentResps = make([]*httptest.ResponseRecorder, 0, count)
|
||||
errs := make([]error, 0)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
resp, reqErr := doFormDataRequest(method, fmt.Sprintf("%s%s", base, endpoint), fields, files, headers)
|
||||
if reqErr != nil {
|
||||
mu.Lock()
|
||||
errs = append(errs, fmt.Errorf("do request: %w", reqErr))
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, reqErr := io.ReadAll(resp.Body)
|
||||
if reqErr != nil {
|
||||
mu.Lock()
|
||||
errs = append(errs, fmt.Errorf("read response body: %w", reqErr))
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
rec.Code = resp.StatusCode
|
||||
for key, values := range resp.Header {
|
||||
for _, v := range values {
|
||||
rec.Header().Add(key, v)
|
||||
}
|
||||
}
|
||||
_, _ = rec.Body.Write(body)
|
||||
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
cd := resp.Header.Get("Content-Disposition")
|
||||
if cd != "" {
|
||||
_, params, parseErr := mime.ParseMediaType(cd)
|
||||
if parseErr == nil {
|
||||
if filename, ok := params["filename"]; ok {
|
||||
traceID := resp.Header.Get("Gotenberg-Trace")
|
||||
dirPath := fmt.Sprintf("%s/%s", s.workdir, traceID)
|
||||
|
||||
mu.Lock()
|
||||
mkErr := os.MkdirAll(dirPath, 0o755)
|
||||
mu.Unlock()
|
||||
|
||||
if mkErr == nil {
|
||||
fpath := fmt.Sprintf("%s/%s", dirPath, filename)
|
||||
f, fErr := os.Create(fpath)
|
||||
if fErr == nil {
|
||||
_, _ = f.Write(body)
|
||||
f.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
s.concurrentResps = append(s.concurrentResps, rec)
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if len(errs) > 0 {
|
||||
return fmt.Errorf("concurrent requests failed: %v", errs)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scenario) allConcurrentResponseStatusCodesShouldBe(expected int) error {
|
||||
if len(s.concurrentResps) == 0 {
|
||||
return errors.New("no concurrent responses recorded")
|
||||
}
|
||||
|
||||
for i, resp := range s.concurrentResps {
|
||||
if resp.Code != expected {
|
||||
return fmt.Errorf("concurrent response %d: expected status %d, got %d %q", i+1, expected, resp.Code, resp.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scenario) allConcurrentResponsesShouldHavePdfs(expected int) error {
|
||||
if len(s.concurrentResps) == 0 {
|
||||
return errors.New("no concurrent responses recorded")
|
||||
}
|
||||
|
||||
for i, resp := range s.concurrentResps {
|
||||
traceID := resp.Header().Get("Gotenberg-Trace")
|
||||
dirPath := fmt.Sprintf("%s/%s", s.workdir, traceID)
|
||||
|
||||
_, err := os.Stat(dirPath)
|
||||
if os.IsNotExist(err) {
|
||||
return fmt.Errorf("concurrent response %d: directory %q does not exist", i+1, dirPath)
|
||||
}
|
||||
|
||||
var paths []string
|
||||
err = filepath.Walk(dirPath, func(path string, info os.FileInfo, pathErr error) error {
|
||||
if pathErr != nil {
|
||||
return pathErr
|
||||
}
|
||||
if strings.EqualFold(filepath.Ext(info.Name()), ".pdf") {
|
||||
paths = append(paths, path)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("concurrent response %d: walk %q: %w", i+1, dirPath, err)
|
||||
}
|
||||
|
||||
if len(paths) != expected {
|
||||
return fmt.Errorf("concurrent response %d: expected %d PDF(s), got %d", i+1, expected, len(paths))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scenario) iWaitForTheAsynchronousRequestToWebhook(ctx context.Context) error {
|
||||
if s.server == nil {
|
||||
return errors.New("server not initialized")
|
||||
@@ -965,9 +1133,12 @@ func InitializeScenario(ctx *godog.ScenarioContext) {
|
||||
ctx.When(`^I make a "(GET|HEAD)" request to Gotenberg at the "([^"]*)" endpoint$`, s.iMakeARequestToGotenberg)
|
||||
ctx.When(`^I make a "(GET|HEAD)" request to Gotenberg at the "([^"]*)" endpoint with the following header\(s\):$`, s.iMakeARequestToGotenbergWithTheFollowingHeaders)
|
||||
ctx.When(`^I make a "(POST)" request to Gotenberg at the "([^"]*)" endpoint with the following form data and header\(s\):$`, s.iMakeARequestToGotenbergWithTheFollowingFormDataAndHeaders)
|
||||
ctx.When(`^I make (\d+) concurrent "(POST)" requests to Gotenberg at the "([^"]*)" endpoint with the following form data and header\(s\):$`, s.iMakeConcurrentRequestsToGotenberg)
|
||||
ctx.When(`^I wait for the asynchronous request to the webhook$`, s.iWaitForTheAsynchronousRequestToWebhook)
|
||||
ctx.Then(`^the Gotenberg container (should|should NOT) log the following entries:$`, s.theGotenbergContainerShouldLogTheFollowingEntries)
|
||||
ctx.Then(`^the response status code should be (\d+)$`, s.theResponseStatusCodeShouldBe)
|
||||
ctx.Then(`^all concurrent response status codes should be (\d+)$`, s.allConcurrentResponseStatusCodesShouldBe)
|
||||
ctx.Then(`^all concurrent responses should have (\d+) PDF\(s\)$`, s.allConcurrentResponsesShouldHavePdfs)
|
||||
ctx.Then(`^the (response|webhook request|file request|server request) header "([^"]*)" should be "([^"]*)"$`, s.theHeaderValueShouldBe)
|
||||
ctx.Then(`^the (response|webhook request|file request|server request) cookie "([^"]*)" should be "([^"]*)"$`, s.theCookieValueShouldBe)
|
||||
ctx.Then(`^the (response|webhook request) body should match string:$`, s.theBodyShouldMatchString)
|
||||
|
||||
Reference in New Issue
Block a user