Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/sim/background/async-execution-correlation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe('async execution correlation fallbacks', () => {
expect(scheduleExecutionTaskOptions).toMatchObject({
queue: {
name: 'schedule-execution',
concurrencyLimit: 50,
concurrencyLimit: 30,
},
})
})
Expand Down
21 changes: 21 additions & 0 deletions apps/sim/background/concurrency-limits.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { env, envNumber } from '@/lib/core/config/env'

/** Per-task Trigger.dev concurrency caps. Bound heavy DB tasks so unbounded fan-out can't saturate the pool. */

export const WORKFLOW_EXECUTION_CONCURRENCY_LIMIT = envNumber(
env.WORKFLOW_EXECUTION_CONCURRENCY_LIMIT,
75,
{ min: 1, integer: true }
Comment on lines +5 to +8

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Module Load Freezes Env Defaults

When these constants are evaluated before the Trigger worker has the new env vars in process.env, envNumber falls back to 75 and the override is never read again. In deployments where Trigger-specific env vars are only injected through the Trigger project/runtime settings, lowering WORKFLOW_EXECUTION_CONCURRENCY_LIMIT would be silently ignored and the task can keep running at the default cap.

)

export const WEBHOOK_EXECUTION_CONCURRENCY_LIMIT = envNumber(
env.WEBHOOK_EXECUTION_CONCURRENCY_LIMIT,
75,
{ min: 1, integer: true }
)

export const RESUME_EXECUTION_CONCURRENCY_LIMIT = envNumber(
env.RESUME_EXECUTION_CONCURRENCY_LIMIT,
50,
{ min: 1, integer: true }
)
4 changes: 4 additions & 0 deletions apps/sim/background/resume-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { withCascadeLock } from '@/lib/table/cascade-lock'
import { isExecCancelled } from '@/lib/table/deps'
import type { RowData, RowExecutionMetadata } from '@/lib/table/types'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { RESUME_EXECUTION_CONCURRENCY_LIMIT } from '@/background/concurrency-limits'

const logger = createLogger('TriggerResumeExecution')

Expand Down Expand Up @@ -351,5 +352,8 @@ export const resumeExecutionTask = task({
retry: {
maxAttempts: 1,
},
queue: {
concurrencyLimit: RESUME_EXECUTION_CONCURRENCY_LIMIT,
},
run: executeResumeJob,
})
4 changes: 4 additions & 0 deletions apps/sim/background/webhook-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence'
import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
import { WEBHOOK_EXECUTION_CONCURRENCY_LIMIT } from '@/background/concurrency-limits'
import { getBlock } from '@/blocks'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
Expand Down Expand Up @@ -683,5 +684,8 @@ export const webhookExecution = task({
retry: {
maxAttempts: 1,
},
queue: {
concurrencyLimit: WEBHOOK_EXECUTION_CONCURRENCY_LIMIT,
},
run: async (payload: WebhookExecutionPayload) => executeWebhookJob(payload),
})
4 changes: 4 additions & 0 deletions apps/sim/background/workflow-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
wasExecutionFinalizedByCore,
} from '@/lib/workflows/executor/execution-core'
import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence'
import { WORKFLOW_EXECUTION_CONCURRENCY_LIMIT } from '@/background/concurrency-limits'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import { hasExecutionResult } from '@/executor/utils/errors'
Expand Down Expand Up @@ -206,5 +207,8 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
export const workflowExecutionTask = task({
id: 'workflow-execution',
machine: 'medium-1x',
queue: {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Implicit Queue Sharing

When queue has a concurrencyLimit but no explicit name, these new high-volume tasks depend on Trigger.dev's implicit queue identity. If nameless queues resolve to a shared default queue, workflow, webhook, resume, and existing nameless cleanup jobs can throttle each other, so a webhook burst can delay unrelated workflow or resume runs instead of applying independent per-task caps.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

concurrencyLimit: WORKFLOW_EXECUTION_CONCURRENCY_LIMIT,
},
run: executeWorkflowJob,
})
5 changes: 4 additions & 1 deletion apps/sim/lib/core/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ export const env = createEnv({
TRIGGER_DEV_ENABLED: z.boolean().optional(), // Toggle to enable/disable Trigger.dev for async jobs
CRON_SECRET: z.string().optional(), // Secret for authenticating cron job requests
JOB_RETENTION_DAYS: z.string().optional().default('1'), // Days to retain job logs/data
SCHEDULE_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('50'),
SCHEDULE_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('30'),
WORKFLOW_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('75'),
WEBHOOK_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('75'),
RESUME_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('50'),
SCHEDULE_ENQUEUE_BUDGET_MULTIPLIER: z.string().optional().default('2'),
SCHEDULE_JITTER_MAX_MS: z.string().optional().default('30000'),
SCHEDULE_INFRA_RETRY_BASE_MS: z.string().optional().default('60000'),
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/lib/workflows/schedules/execution-limits.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export const SCHEDULE_EXECUTION_QUEUE_NAME = 'schedule-execution'

export const SCHEDULE_EXECUTION_CONCURRENCY_LIMIT = envNumber(
env.SCHEDULE_EXECUTION_CONCURRENCY_LIMIT,
50,
30,
{ min: 1, integer: true }
)

Expand Down
Loading