diff --git a/apps/sim/background/async-execution-correlation.test.ts b/apps/sim/background/async-execution-correlation.test.ts index 78c840f2fe0..e3eeeefe66c 100644 --- a/apps/sim/background/async-execution-correlation.test.ts +++ b/apps/sim/background/async-execution-correlation.test.ts @@ -56,7 +56,7 @@ describe('async execution correlation fallbacks', () => { expect(scheduleExecutionTaskOptions).toMatchObject({ queue: { name: 'schedule-execution', - concurrencyLimit: 50, + concurrencyLimit: 30, }, }) }) diff --git a/apps/sim/background/concurrency-limits.ts b/apps/sim/background/concurrency-limits.ts new file mode 100644 index 00000000000..7b902af96a4 --- /dev/null +++ b/apps/sim/background/concurrency-limits.ts @@ -0,0 +1,21 @@ +import { env, envNumber } from '@/lib/core/config/env' + +/** Per-task Trigger.dev concurrency caps. Bound heavy DB tasks so unbounded fan-out can't saturate the pool. */ + +export const WORKFLOW_EXECUTION_CONCURRENCY_LIMIT = envNumber( + env.WORKFLOW_EXECUTION_CONCURRENCY_LIMIT, + 75, + { min: 1, integer: true } +) + +export const WEBHOOK_EXECUTION_CONCURRENCY_LIMIT = envNumber( + env.WEBHOOK_EXECUTION_CONCURRENCY_LIMIT, + 75, + { min: 1, integer: true } +) + +export const RESUME_EXECUTION_CONCURRENCY_LIMIT = envNumber( + env.RESUME_EXECUTION_CONCURRENCY_LIMIT, + 50, + { min: 1, integer: true } +) diff --git a/apps/sim/background/resume-execution.ts b/apps/sim/background/resume-execution.ts index f1629cf2674..4300bcdbb01 100644 --- a/apps/sim/background/resume-execution.ts +++ b/apps/sim/background/resume-execution.ts @@ -6,6 +6,7 @@ import { withCascadeLock } from '@/lib/table/cascade-lock' import { isExecCancelled } from '@/lib/table/deps' import type { RowData, RowExecutionMetadata } from '@/lib/table/types' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' +import { RESUME_EXECUTION_CONCURRENCY_LIMIT } from '@/background/concurrency-limits' const logger = createLogger('TriggerResumeExecution') @@ -351,5 +352,8 @@ export const resumeExecutionTask = task({ retry: { maxAttempts: 1, }, + queue: { + concurrencyLimit: RESUME_EXECUTION_CONCURRENCY_LIMIT, + }, run: executeResumeJob, }) diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index 7e942e3f456..9b66eca4423 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -26,6 +26,7 @@ import { import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence' import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils' import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils' +import { WEBHOOK_EXECUTION_CONCURRENCY_LIMIT } from '@/background/concurrency-limits' import { getBlock } from '@/blocks' import { ExecutionSnapshot } from '@/executor/execution/snapshot' import type { ExecutionMetadata } from '@/executor/execution/types' @@ -683,5 +684,8 @@ export const webhookExecution = task({ retry: { maxAttempts: 1, }, + queue: { + concurrencyLimit: WEBHOOK_EXECUTION_CONCURRENCY_LIMIT, + }, run: async (payload: WebhookExecutionPayload) => executeWebhookJob(payload), }) diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index 88b355d7d58..266772c9a68 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -13,6 +13,7 @@ import { wasExecutionFinalizedByCore, } from '@/lib/workflows/executor/execution-core' import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence' +import { WORKFLOW_EXECUTION_CONCURRENCY_LIMIT } from '@/background/concurrency-limits' import { ExecutionSnapshot } from '@/executor/execution/snapshot' import type { ExecutionMetadata } from '@/executor/execution/types' import { hasExecutionResult } from '@/executor/utils/errors' @@ -206,5 +207,8 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { export const workflowExecutionTask = task({ id: 'workflow-execution', machine: 'medium-1x', + queue: { + concurrencyLimit: WORKFLOW_EXECUTION_CONCURRENCY_LIMIT, + }, run: executeWorkflowJob, }) diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 38bbab06471..86eb20145e4 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -206,7 +206,10 @@ export const env = createEnv({ TRIGGER_DEV_ENABLED: z.boolean().optional(), // Toggle to enable/disable Trigger.dev for async jobs CRON_SECRET: z.string().optional(), // Secret for authenticating cron job requests JOB_RETENTION_DAYS: z.string().optional().default('1'), // Days to retain job logs/data - SCHEDULE_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('50'), + SCHEDULE_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('30'), + WORKFLOW_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('75'), + WEBHOOK_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('75'), + RESUME_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('50'), SCHEDULE_ENQUEUE_BUDGET_MULTIPLIER: z.string().optional().default('2'), SCHEDULE_JITTER_MAX_MS: z.string().optional().default('30000'), SCHEDULE_INFRA_RETRY_BASE_MS: z.string().optional().default('60000'), diff --git a/apps/sim/lib/workflows/schedules/execution-limits.ts b/apps/sim/lib/workflows/schedules/execution-limits.ts index a5bb9c5bdc0..4840aaaa219 100644 --- a/apps/sim/lib/workflows/schedules/execution-limits.ts +++ b/apps/sim/lib/workflows/schedules/execution-limits.ts @@ -4,7 +4,7 @@ export const SCHEDULE_EXECUTION_QUEUE_NAME = 'schedule-execution' export const SCHEDULE_EXECUTION_CONCURRENCY_LIMIT = envNumber( env.SCHEDULE_EXECUTION_CONCURRENCY_LIMIT, - 50, + 30, { min: 1, integer: true } )