From 212f9d6ed4cc90670e515a9ede68a8615e731511 Mon Sep 17 00:00:00 2001 From: JPeer264 Date: Mon, 22 Jun 2026 17:12:32 +0200 Subject: [PATCH] feat(node): Add BullMQ integration --- .../node-bullmq/docker-compose.yml | 7 + .../node-bullmq/global-setup.mjs | 13 + .../node-bullmq/global-teardown.mjs | 13 + .../node-bullmq/package.json | 25 + .../node-bullmq/playwright.config.mjs | 11 + .../test-applications/node-bullmq/src/app.mjs | 58 +++ .../node-bullmq/src/instrument.mjs | 7 + .../node-bullmq/start-event-proxy.mjs | 6 + .../node-bullmq/tests/bullmq.test.ts | 124 +++++ .../node-integration-tests/package.json | 1 + .../suites/tracing/bullmq/docker-compose.yml | 13 + .../suites/tracing/bullmq/instrument.mjs | 9 + .../suites/tracing/bullmq/scenario.mjs | 32 ++ .../suites/tracing/bullmq/test.ts | 89 ++++ packages/node/src/index.ts | 1 + .../tracing/bullmq/contextManager.ts | 51 ++ .../src/integrations/tracing/bullmq/index.ts | 44 ++ .../src/integrations/tracing/bullmq/meter.ts | 82 +++ .../src/integrations/tracing/bullmq/span.ts | 72 +++ .../src/integrations/tracing/bullmq/tracer.ts | 101 ++++ .../src/integrations/tracing/bullmq/types.ts | 79 +++ .../test/integrations/tracing/bullmq.test.ts | 484 ++++++++++++++++++ yarn.lock | 104 ++-- 23 files changed, 1386 insertions(+), 40 deletions(-) create mode 100644 dev-packages/e2e-tests/test-applications/node-bullmq/docker-compose.yml create mode 100644 dev-packages/e2e-tests/test-applications/node-bullmq/global-setup.mjs create mode 100644 dev-packages/e2e-tests/test-applications/node-bullmq/global-teardown.mjs create mode 100644 dev-packages/e2e-tests/test-applications/node-bullmq/package.json create mode 100644 dev-packages/e2e-tests/test-applications/node-bullmq/playwright.config.mjs create mode 100644 dev-packages/e2e-tests/test-applications/node-bullmq/src/app.mjs create mode 100644 dev-packages/e2e-tests/test-applications/node-bullmq/src/instrument.mjs create mode 100644 dev-packages/e2e-tests/test-applications/node-bullmq/start-event-proxy.mjs create mode 100644 dev-packages/e2e-tests/test-applications/node-bullmq/tests/bullmq.test.ts create mode 100644 dev-packages/node-integration-tests/suites/tracing/bullmq/docker-compose.yml create mode 100644 dev-packages/node-integration-tests/suites/tracing/bullmq/instrument.mjs create mode 100644 dev-packages/node-integration-tests/suites/tracing/bullmq/scenario.mjs create mode 100644 dev-packages/node-integration-tests/suites/tracing/bullmq/test.ts create mode 100644 packages/node/src/integrations/tracing/bullmq/contextManager.ts create mode 100644 packages/node/src/integrations/tracing/bullmq/index.ts create mode 100644 packages/node/src/integrations/tracing/bullmq/meter.ts create mode 100644 packages/node/src/integrations/tracing/bullmq/span.ts create mode 100644 packages/node/src/integrations/tracing/bullmq/tracer.ts create mode 100644 packages/node/src/integrations/tracing/bullmq/types.ts create mode 100644 packages/node/test/integrations/tracing/bullmq.test.ts diff --git a/dev-packages/e2e-tests/test-applications/node-bullmq/docker-compose.yml b/dev-packages/e2e-tests/test-applications/node-bullmq/docker-compose.yml new file mode 100644 index 000000000000..15a423ffbc07 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/node-bullmq/docker-compose.yml @@ -0,0 +1,7 @@ +services: + redis: + image: redis:8 + restart: always + container_name: e2e-tests-node-bullmq-redis + ports: + - '6379:6379' diff --git a/dev-packages/e2e-tests/test-applications/node-bullmq/global-setup.mjs b/dev-packages/e2e-tests/test-applications/node-bullmq/global-setup.mjs new file mode 100644 index 000000000000..438b88b61794 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/node-bullmq/global-setup.mjs @@ -0,0 +1,13 @@ +import { execSync } from 'child_process'; +import { dirname } from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +export default async function globalSetup() { + // Start Redis via Docker Compose + execSync('docker compose up -d --wait', { + cwd: __dirname, + stdio: 'inherit', + }); +} diff --git a/dev-packages/e2e-tests/test-applications/node-bullmq/global-teardown.mjs b/dev-packages/e2e-tests/test-applications/node-bullmq/global-teardown.mjs new file mode 100644 index 000000000000..35ce41179193 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/node-bullmq/global-teardown.mjs @@ -0,0 +1,13 @@ +import { execSync } from 'child_process'; +import { dirname } from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +export default async function globalTeardown() { + // Stop Redis and remove containers + execSync('docker compose down --volumes', { + cwd: __dirname, + stdio: 'inherit', + }); +} diff --git a/dev-packages/e2e-tests/test-applications/node-bullmq/package.json b/dev-packages/e2e-tests/test-applications/node-bullmq/package.json new file mode 100644 index 000000000000..cbd47311eec6 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/node-bullmq/package.json @@ -0,0 +1,25 @@ +{ + "name": "node-bullmq", + "version": "0.0.1", + "private": true, + "type": "module", + "scripts": { + "start": "node src/app.mjs", + "clean": "npx rimraf node_modules pnpm-lock.yaml", + "test": "playwright test", + "test:build": "pnpm install", + "test:assert": "pnpm test" + }, + "dependencies": { + "@sentry/node": "file:../../packed/sentry-node-packed.tgz", + "bullmq": "^5.0.0", + "express": "^4.21.0" + }, + "devDependencies": { + "@playwright/test": "~1.56.0", + "@sentry-internal/test-utils": "link:../../../test-utils" + }, + "volta": { + "extends": "../../package.json" + } +} diff --git a/dev-packages/e2e-tests/test-applications/node-bullmq/playwright.config.mjs b/dev-packages/e2e-tests/test-applications/node-bullmq/playwright.config.mjs new file mode 100644 index 000000000000..d5fd0b394f15 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/node-bullmq/playwright.config.mjs @@ -0,0 +1,11 @@ +import { getPlaywrightConfig } from '@sentry-internal/test-utils'; + +const config = getPlaywrightConfig({ + startCommand: `pnpm start`, +}); + +export default { + ...config, + globalSetup: './global-setup.mjs', + globalTeardown: './global-teardown.mjs', +}; diff --git a/dev-packages/e2e-tests/test-applications/node-bullmq/src/app.mjs b/dev-packages/e2e-tests/test-applications/node-bullmq/src/app.mjs new file mode 100644 index 000000000000..8366c8363e46 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/node-bullmq/src/app.mjs @@ -0,0 +1,58 @@ +import './instrument.mjs'; + +import * as Sentry from '@sentry/node'; +import { Queue, Worker } from 'bullmq'; +import express from 'express'; + +const app = express(); +const port = 3030; + +const connection = { host: '127.0.0.1', port: 6379 }; +const telemetry = new Sentry.BullMQTelemetry(); + +const testQueue = new Queue('test-queue', { connection, telemetry }); + +const worker = new Worker( + 'test-queue', + async job => { + if (job.name === 'fail-job') { + throw new Error('Test error from BullMQ processor'); + } + + if (job.name === 'breadcrumb-job') { + Sentry.addBreadcrumb({ message: 'breadcrumb-from-bullmq-processor' }); + } + + return { success: true }; + }, + { connection, telemetry }, +); + +worker.on('error', err => { + console.error('Worker error:', err); +}); + +app.get('/enqueue/success', async (req, res) => { + await testQueue.add('success-job', { data: 'test' }); + res.send('Job enqueued'); +}); + +app.get('/enqueue/fail', async (req, res) => { + await testQueue.add('fail-job', { data: 'test' }); + res.send('Job enqueued'); +}); + +app.get('/enqueue/breadcrumb-test', async (req, res) => { + await testQueue.add('breadcrumb-job', { data: 'test' }); + res.send('Job enqueued'); +}); + +app.get('/check-isolation', async (req, res) => { + res.send('Isolation check'); +}); + +Sentry.setupExpressErrorHandler(app); + +app.listen(port, () => { + console.log(`App listening on port ${port}`); +}); diff --git a/dev-packages/e2e-tests/test-applications/node-bullmq/src/instrument.mjs b/dev-packages/e2e-tests/test-applications/node-bullmq/src/instrument.mjs new file mode 100644 index 000000000000..fcf7d3c23b04 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/node-bullmq/src/instrument.mjs @@ -0,0 +1,7 @@ +import * as Sentry from '@sentry/node'; + +Sentry.init({ + dsn: process.env.E2E_TEST_DSN, + tunnel: 'http://localhost:3031/', + tracesSampleRate: 1.0, +}); diff --git a/dev-packages/e2e-tests/test-applications/node-bullmq/start-event-proxy.mjs b/dev-packages/e2e-tests/test-applications/node-bullmq/start-event-proxy.mjs new file mode 100644 index 000000000000..2426471fe487 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/node-bullmq/start-event-proxy.mjs @@ -0,0 +1,6 @@ +import { startEventProxyServer } from '@sentry-internal/test-utils'; + +startEventProxyServer({ + port: 3031, + proxyServerName: 'node-bullmq', +}); diff --git a/dev-packages/e2e-tests/test-applications/node-bullmq/tests/bullmq.test.ts b/dev-packages/e2e-tests/test-applications/node-bullmq/tests/bullmq.test.ts new file mode 100644 index 000000000000..a98025d6b9e7 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/node-bullmq/tests/bullmq.test.ts @@ -0,0 +1,124 @@ +import { expect, test } from '@playwright/test'; +import { waitForError, waitForMetric, waitForTransaction } from '@sentry-internal/test-utils'; + +test('Creates a queue.submit span when adding a job', async ({ baseURL }) => { + const transactionPromise = waitForTransaction('node-bullmq', transactionEvent => { + return transactionEvent.transaction === 'GET /enqueue/success'; + }); + + await fetch(`${baseURL}/enqueue/success`); + + const transaction = await transactionPromise; + + const submitSpan = transaction.spans?.find(span => span.op === 'queue.submit'); + expect(submitSpan).toBeDefined(); + expect(submitSpan!.origin).toBe('auto.queue.bullmq.producer'); + expect(submitSpan!.data?.['messaging.system']).toBe('bullmq'); +}); + +test('Creates a transaction for queue.task when processing a job', async ({ baseURL }) => { + const transactionPromise = waitForTransaction('node-bullmq', transactionEvent => { + return transactionEvent.contexts?.trace?.op === 'queue.task'; + }); + + await fetch(`${baseURL}/enqueue/success`); + + const transaction = await transactionPromise; + + expect(transaction.contexts?.trace?.op).toBe('queue.task'); + expect(transaction.contexts?.trace?.origin).toBe('auto.queue.bullmq.consumer'); + expect(transaction.contexts?.trace?.data?.['messaging.system']).toBe('bullmq'); +}); + +test('Sends exception to Sentry on error in job processor', async ({ baseURL }) => { + const errorEventPromise = waitForError('node-bullmq', event => { + return ( + !event.type && + event.exception?.values?.[0]?.value === 'Test error from BullMQ processor' && + event.exception?.values?.[0]?.mechanism?.type === 'auto.queue.bullmq' + ); + }); + + await fetch(`${baseURL}/enqueue/fail`); + + const errorEvent = await errorEventPromise; + + expect(errorEvent.exception?.values).toHaveLength(1); + expect(errorEvent.exception?.values?.[0]?.mechanism).toEqual({ + handled: false, + type: 'auto.queue.bullmq', + }); +}); + +test('BullMQ processor breadcrumbs do not leak into subsequent HTTP requests', async ({ baseURL }) => { + const processTransactionPromise = waitForTransaction('node-bullmq', transactionEvent => { + return transactionEvent.contexts?.trace?.op === 'queue.task'; + }); + + await fetch(`${baseURL}/enqueue/breadcrumb-test`); + + await processTransactionPromise; + + const transactionPromise = waitForTransaction('node-bullmq', transactionEvent => { + return transactionEvent.transaction === 'GET /check-isolation'; + }); + + await fetch(`${baseURL}/check-isolation`); + + const transaction = await transactionPromise; + + const leakedBreadcrumb = (transaction.breadcrumbs || []).find( + (b: { message?: string }) => b.message === 'breadcrumb-from-bullmq-processor', + ); + expect(leakedBreadcrumb).toBeUndefined(); +}); + +test('Links consumer transaction to producer span via sentry.previous_trace', async ({ baseURL }) => { + const httpTransactionPromise = waitForTransaction('node-bullmq', transactionEvent => { + return transactionEvent.transaction === 'GET /enqueue/success'; + }); + + const consumerTransactionPromise = waitForTransaction('node-bullmq', transactionEvent => { + return transactionEvent.contexts?.trace?.op === 'queue.task'; + }); + + await fetch(`${baseURL}/enqueue/success`); + + const httpTransaction = await httpTransactionPromise; + const consumerTransaction = await consumerTransactionPromise; + + const producerSpan = httpTransaction.spans?.find(span => span.op === 'queue.submit'); + expect(producerSpan).toBeDefined(); + + const previousTrace = consumerTransaction.contexts?.trace?.data?.['sentry.previous_trace']; + expect(previousTrace).toBeDefined(); + expect(previousTrace).toContain(httpTransaction.contexts?.trace?.trace_id); +}); + +test('Emits bullmq.jobs.completed counter metric on successful job', async ({ baseURL }) => { + const metricPromise = waitForMetric('node-bullmq', metric => { + return metric.name === 'bullmq.jobs.completed' && metric.type === 'counter'; + }); + + await fetch(`${baseURL}/enqueue/success`); + + const metric = await metricPromise; + + expect(metric.name).toBe('bullmq.jobs.completed'); + expect(metric.type).toBe('counter'); + expect(metric.value).toEqual(expect.any(Number)); +}); + +test('Emits bullmq.job.duration histogram metric on job completion', async ({ baseURL }) => { + const metricPromise = waitForMetric('node-bullmq', metric => { + return metric.name === 'bullmq.job.duration' && metric.type === 'distribution'; + }); + + await fetch(`${baseURL}/enqueue/success`); + + const metric = await metricPromise; + + expect(metric.name).toBe('bullmq.job.duration'); + expect(metric.type).toBe('distribution'); + expect(metric.value).toEqual(expect.any(Number)); +}); diff --git a/dev-packages/node-integration-tests/package.json b/dev-packages/node-integration-tests/package.json index 53c4bc96eeaf..27d650046d3b 100644 --- a/dev-packages/node-integration-tests/package.json +++ b/dev-packages/node-integration-tests/package.json @@ -55,6 +55,7 @@ "@types/pg": "^8.6.5", "ai": "^4.3.16", "amqplib": "^0.10.9", + "bullmq": "^5.79.1", "body-parser": "^2.2.2", "connect": "^3.7.0", "consola": "^3.2.3", diff --git a/dev-packages/node-integration-tests/suites/tracing/bullmq/docker-compose.yml b/dev-packages/node-integration-tests/suites/tracing/bullmq/docker-compose.yml new file mode 100644 index 000000000000..58a1e6576949 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/bullmq/docker-compose.yml @@ -0,0 +1,13 @@ +services: + db: + image: redis:latest + restart: always + container_name: integration-tests-bullmq-redis + ports: + - '6380:6379' + healthcheck: + test: ['CMD-SHELL', 'redis-cli ping | grep -q PONG'] + interval: 2s + timeout: 3s + retries: 30 + start_period: 5s diff --git a/dev-packages/node-integration-tests/suites/tracing/bullmq/instrument.mjs b/dev-packages/node-integration-tests/suites/tracing/bullmq/instrument.mjs new file mode 100644 index 000000000000..46a27dd03b74 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/bullmq/instrument.mjs @@ -0,0 +1,9 @@ +import * as Sentry from '@sentry/node'; +import { loggingTransport } from '@sentry-internal/node-integration-tests'; + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + release: '1.0', + tracesSampleRate: 1.0, + transport: loggingTransport, +}); diff --git a/dev-packages/node-integration-tests/suites/tracing/bullmq/scenario.mjs b/dev-packages/node-integration-tests/suites/tracing/bullmq/scenario.mjs new file mode 100644 index 000000000000..497a0b86cd45 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/bullmq/scenario.mjs @@ -0,0 +1,32 @@ +import * as Sentry from '@sentry/node'; +import { Queue, Worker } from 'bullmq'; + +const telemetry = new Sentry.BullMQTelemetry(); +const connection = { host: '127.0.0.1', port: 6380 }; + +async function run() { + const queue = new Queue('test-queue', { connection, telemetry }); + + const worker = new Worker( + 'test-queue', + async () => { + // job processed + }, + { connection, telemetry }, + ); + + const jobProcessed = new Promise(resolve => { + worker.on('completed', () => resolve()); + }); + + await Sentry.startSpan({ name: 'enqueue test-job' }, async () => { + await queue.add('test-job', { data: 'test-data' }); + }); + + await jobProcessed; + await worker.close(); + await queue.close(); + await Sentry.flush(); +} + +run(); diff --git a/dev-packages/node-integration-tests/suites/tracing/bullmq/test.ts b/dev-packages/node-integration-tests/suites/tracing/bullmq/test.ts new file mode 100644 index 000000000000..99eac2474aa3 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/bullmq/test.ts @@ -0,0 +1,89 @@ +import type { SerializedMetricContainer, TransactionEvent } from '@sentry/core'; +import { afterAll, describe, expect } from 'vitest'; +import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../utils/runner'; + +describe('bullmq', () => { + afterAll(() => { + cleanupChildProcesses(); + }); + + createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createRunner, test) => { + test('traces producer and consumer operations with queue attributes', { timeout: 90_000 }, async () => { + const receivedTransactions: TransactionEvent[] = []; + + await createRunner() + .withDockerCompose({ workingDirectory: [__dirname] }) + .ignore('trace_metric') + .expectN(3, { + transaction: (transaction: TransactionEvent) => { + receivedTransactions.push(transaction); + }, + }) + .expect({ + transaction: (transaction: TransactionEvent) => { + receivedTransactions.push(transaction); + + const producerTransaction = receivedTransactions.find(t => t.transaction === 'enqueue test-job'); + const consumerTransaction = receivedTransactions.find( + t => t.contexts?.trace?.data?.['sentry.origin'] === 'auto.queue.bullmq.consumer', + ); + + expect(producerTransaction).toBeDefined(); + const producerSpan = producerTransaction!.spans?.find(s => s.origin === 'auto.queue.bullmq.producer'); + expect(producerSpan).toBeDefined(); + expect(producerSpan!.op).toBe('queue.submit'); + expect(producerSpan!.status).toBe('ok'); + expect(producerSpan!.data?.['messaging.system']).toBe('bullmq'); + + expect(consumerTransaction).toBeDefined(); + expect(consumerTransaction!.contexts?.trace).toEqual( + expect.objectContaining({ + op: 'queue.task', + status: 'ok', + data: expect.objectContaining({ + 'messaging.system': 'bullmq', + 'sentry.op': 'queue.task', + 'sentry.origin': 'auto.queue.bullmq.consumer', + 'sentry.previous_trace': expect.stringContaining( + producerTransaction!.contexts!.trace!.trace_id as string, + ), + }), + }), + ); + }, + }) + .start() + .completed(); + }); + + test('emits completion counter and duration histogram for processed jobs', { timeout: 90_000 }, async () => { + await createRunner() + .withDockerCompose({ workingDirectory: [__dirname] }) + .ignore('transaction') + .expect({ + trace_metric: (metrics: SerializedMetricContainer) => { + const items = metrics.items || []; + + expect(items).toHaveLength(2); + expect(items).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + name: 'bullmq.jobs.completed', + type: 'counter', + value: expect.any(Number), + }), + expect.objectContaining({ + name: 'bullmq.job.duration', + type: 'distribution', + unit: 'ms', + value: expect.any(Number), + }), + ]), + ); + }, + }) + .start() + .completed(); + }); + }); +}); diff --git a/packages/node/src/index.ts b/packages/node/src/index.ts index 26c1a9ab3916..c3ecc2d35212 100644 --- a/packages/node/src/index.ts +++ b/packages/node/src/index.ts @@ -24,6 +24,7 @@ export { tediousIntegration } from './integrations/tracing/tedious'; export { genericPoolIntegration } from './integrations/tracing/genericPool'; export { dataloaderIntegration } from './integrations/tracing/dataloader'; export { amqplibIntegration } from './integrations/tracing/amqplib'; +export { BullMQTelemetry } from './integrations/tracing/bullmq'; export { vercelAIIntegration } from './integrations/tracing/vercelai'; export { openAIIntegration } from './integrations/tracing/openai'; export { anthropicAIIntegration } from './integrations/tracing/anthropic-ai'; diff --git a/packages/node/src/integrations/tracing/bullmq/contextManager.ts b/packages/node/src/integrations/tracing/bullmq/contextManager.ts new file mode 100644 index 000000000000..de3effa120dd --- /dev/null +++ b/packages/node/src/integrations/tracing/bullmq/contextManager.ts @@ -0,0 +1,51 @@ +import type { Span } from '@opentelemetry/api'; +import { + extractTraceparentData, + getActiveSpan, + getCurrentScope, + spanToTraceHeader, + withActiveSpan, + withIsolationScope, +} from '@sentry/core'; +import type { ContextManager, SentryContext } from './types'; + +export class SentryBullMQContextManager implements ContextManager { + public active(): SentryContext { + return { + span: getActiveSpan(), + scope: getCurrentScope(), + }; + } + + public with unknown>(context: SentryContext, fn: A): ReturnType { + if (context.span) { + return withIsolationScope(() => { + return withActiveSpan(context.span as Span, fn) as ReturnType; + }); + } + return withIsolationScope(fn) as ReturnType; + } + + public getMetadata(context: SentryContext): string { + if (context.span) { + return spanToTraceHeader(context.span); + } + return ''; + } + + public fromMetadata(activeContext: SentryContext, metadata: string): SentryContext { + const traceparent = extractTraceparentData(metadata); + if (!traceparent?.traceId || !traceparent?.parentSpanId) { + return activeContext; + } + + return { + ...activeContext, + producerSpanContext: { + traceId: traceparent.traceId, + spanId: traceparent.parentSpanId, + sampled: traceparent.parentSampled === true, + }, + }; + } +} diff --git a/packages/node/src/integrations/tracing/bullmq/index.ts b/packages/node/src/integrations/tracing/bullmq/index.ts new file mode 100644 index 000000000000..bb30396e5cdb --- /dev/null +++ b/packages/node/src/integrations/tracing/bullmq/index.ts @@ -0,0 +1,44 @@ +import { SentryBullMQContextManager } from './contextManager'; +import { SentryBullMQMeter } from './meter'; +import { SentryBullMQTracer } from './tracer'; +import type { ContextManager, Meter, Telemetry, Tracer, SentryContext } from './types'; + +/** + * A Sentry-specific telemetry implementation for BullMQ. + * + * This implements BullMQ's Telemetry interface to provide automatic tracing + * for queue operations with proper Queue Insights attributes. + * + * @example + * ```javascript + * import * as Sentry from '@sentry/node'; + * import { Queue, Worker } from 'bullmq'; + * + * const telemetry = new Sentry.BullMQTelemetry(); + * + * const queue = new Queue('myQueue', { + * connection: { host: '127.0.0.1', port: 6379 }, + * telemetry, + * }); + * + * const worker = new Worker('myQueue', async (job) => { + * // Process job + * }, { + * connection: { host: '127.0.0.1', port: 6379 }, + * telemetry, + * }); + * ``` + * + * @see https://docs.bullmq.io/guide/telemetry + */ +export class BullMQTelemetry implements Telemetry { + public tracer: Tracer; + public contextManager: ContextManager; + public meter: Meter; + + public constructor() { + this.tracer = new SentryBullMQTracer(); + this.contextManager = new SentryBullMQContextManager(); + this.meter = new SentryBullMQMeter(); + } +} diff --git a/packages/node/src/integrations/tracing/bullmq/meter.ts b/packages/node/src/integrations/tracing/bullmq/meter.ts new file mode 100644 index 000000000000..12418310cb82 --- /dev/null +++ b/packages/node/src/integrations/tracing/bullmq/meter.ts @@ -0,0 +1,82 @@ +import { metrics } from '@sentry/core'; +import type { AttributeValue, Counter, Gauge, Histogram, Meter, MetricOptions } from './types'; + +function toMetricAttributes( + attributes?: Record, +): Record | undefined { + if (!attributes) { + return undefined; + } + const result: Record = {}; + for (const [key, value] of Object.entries(attributes)) { + if (typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean') { + result[key] = value; + } + } + return result; +} + +class SentryBullMQCounter implements Counter { + private _name: string; + private _unit?: string; + + public constructor(name: string, unit?: string) { + this._name = name; + this._unit = unit; + } + + public add(value: number, attributes?: Record): void { + metrics.count(this._name, value, { + unit: this._unit, + attributes: toMetricAttributes(attributes), + }); + } +} + +class SentryBullMQHistogram implements Histogram { + private _name: string; + private _unit?: string; + + public constructor(name: string, unit?: string) { + this._name = name; + this._unit = unit; + } + + public record(value: number, attributes?: Record): void { + metrics.distribution(this._name, value, { + unit: this._unit, + attributes: toMetricAttributes(attributes), + }); + } +} + +class SentryBullMQGauge implements Gauge { + private _name: string; + private _unit?: string; + + public constructor(name: string, unit?: string) { + this._name = name; + this._unit = unit; + } + + public record(value: number, attributes?: Record): void { + metrics.gauge(this._name, value, { + unit: this._unit, + attributes: toMetricAttributes(attributes), + }); + } +} + +export class SentryBullMQMeter implements Meter { + public createCounter(name: string, options?: MetricOptions): Counter { + return new SentryBullMQCounter(name, options?.unit); + } + + public createHistogram(name: string, options?: MetricOptions): Histogram { + return new SentryBullMQHistogram(name, options?.unit); + } + + public createGauge(name: string, options?: MetricOptions): Gauge { + return new SentryBullMQGauge(name, options?.unit); + } +} diff --git a/packages/node/src/integrations/tracing/bullmq/span.ts b/packages/node/src/integrations/tracing/bullmq/span.ts new file mode 100644 index 000000000000..d41b7dee7cb3 --- /dev/null +++ b/packages/node/src/integrations/tracing/bullmq/span.ts @@ -0,0 +1,72 @@ +import type { Attributes, AttributeValue as OtelAttributeValue, Span } from '@opentelemetry/api'; +import type { Scope } from '@sentry/core'; +import { captureException } from '@sentry/core'; +import type { AttributeValue, TelemetrySpan } from './types'; + +function toOtelAttributeValue(value: AttributeValue): OtelAttributeValue { + return value as OtelAttributeValue; +} + +function toOtelAttributes(attributes: Record): Attributes { + return attributes as Attributes; +} + +export class SentryBullMQSpan implements TelemetrySpan { + private _span: Span; + private _scope: Scope; + + public constructor(span: Span, scope: Scope) { + this._span = span; + this._scope = scope; + } + + public setAttribute(key: string, value: AttributeValue): void { + this._span.setAttribute(key, toOtelAttributeValue(value)); + } + + public setAttributes(attributes: Record): void { + this._span.setAttributes(toOtelAttributes(attributes)); + } + + public addEvent(name: string, attributes?: Record): void { + this._span.addEvent(name, attributes ? toOtelAttributes(attributes) : undefined); + + if (name === 'job failed') { + const reason = attributes?.['bullmq.job.failed.reason']; + captureException(new Error(String(reason || 'Unknown error')), { + mechanism: { + handled: false, + type: 'auto.queue.bullmq', + }, + }); + } + } + + public recordException(exception: Error | string | { code?: number; message?: string; name?: string }): void { + const error = + exception instanceof Error + ? exception + : new Error(typeof exception === 'string' ? exception : exception.message || 'Unknown error'); + + this._span.recordException(error); + + captureException(error, { + mechanism: { + handled: false, + type: 'auto.queue.bullmq', + }, + }); + } + + public setSpanOnContext(context: unknown): unknown { + return { + ...(context as object), + span: this._span, + scope: this._scope, + }; + } + + public end(): void { + this._span.end(); + } +} diff --git a/packages/node/src/integrations/tracing/bullmq/tracer.ts b/packages/node/src/integrations/tracing/bullmq/tracer.ts new file mode 100644 index 000000000000..6e8d67b19946 --- /dev/null +++ b/packages/node/src/integrations/tracing/bullmq/tracer.ts @@ -0,0 +1,101 @@ +import type { SpanAttributes } from '@sentry/core'; +import { + getCurrentScope, + SEMANTIC_ATTRIBUTE_SENTRY_OP, + SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, + SEMANTIC_LINK_ATTRIBUTE_LINK_TYPE, + startInactiveSpan, +} from '@sentry/core'; +import { SentryBullMQSpan } from './span'; +import type { AttributeValue, SpanOptions, TelemetrySpan, Tracer, SentryContext } from './types'; + +const MESSAGING_SYSTEM = 'bullmq'; + +// BullMQ span names follow OTel messaging semconv: "{operation} {destination}" +// e.g. "add myQueue", "addBulk myQueue", "process myQueue", "addFlow myQueue" +const PRODUCER_OPERATIONS = new Set(['add', 'addbulk', 'addflow', 'addbulkflows']); +const CONSUMER_OPERATIONS = new Set(['process']); + +function getOperation(name: string): string { + return name.split(' ')[0]!.toLowerCase(); +} + +function getOpFromSpanName(name: string): string { + const operation = getOperation(name); + + if (CONSUMER_OPERATIONS.has(operation)) { + return 'queue.task'; + } + + if (PRODUCER_OPERATIONS.has(operation)) { + return 'queue.submit'; + } + + return 'queue'; +} + +function getOriginFromSpanName(name: string): string { + const operation = getOperation(name); + + if (CONSUMER_OPERATIONS.has(operation)) { + return 'auto.queue.bullmq.consumer'; + } + + if (PRODUCER_OPERATIONS.has(operation)) { + return 'auto.queue.bullmq.producer'; + } + + return 'auto.queue.bullmq'; +} + +function toSentryAttributes(attributes: Record): SpanAttributes { + return attributes as SpanAttributes; +} + +export class SentryBullMQTracer implements Tracer { + public startSpan(name: string, options?: SpanOptions, context?: SentryContext): TelemetrySpan { + const op = getOpFromSpanName(name); + const origin = getOriginFromSpanName(name); + + const attributes: SpanAttributes = { + [SEMANTIC_ATTRIBUTE_SENTRY_OP]: op, + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: origin, + 'messaging.system': MESSAGING_SYSTEM, + }; + + if (options?.attributes) { + Object.assign(attributes, toSentryAttributes(options.attributes)); + } + + const span = startInactiveSpan({ + name, + attributes, + forceTransaction: op === 'queue.task', + }); + + if (context?.producerSpanContext) { + const producerSpanCtx = { + traceId: context.producerSpanContext.traceId, + spanId: context.producerSpanContext.spanId, + traceFlags: context.producerSpanContext.sampled ? 1 : 0, + }; + + span.addLink({ + context: producerSpanCtx, + attributes: { + [SEMANTIC_LINK_ATTRIBUTE_LINK_TYPE]: 'previous_trace', + }, + }); + + // TODO(v11): Remove this once EAP can store span links. We currently only set this attribute so that we + // can obtain the previous trace information from the EAP store. Long-term, EAP will handle + // span links and then we should remove this again. + span.setAttribute( + 'sentry.previous_trace', + `${producerSpanCtx.traceId}-${producerSpanCtx.spanId}-${producerSpanCtx.traceFlags}`, + ); + } + + return new SentryBullMQSpan(span, getCurrentScope()); + } +} diff --git a/packages/node/src/integrations/tracing/bullmq/types.ts b/packages/node/src/integrations/tracing/bullmq/types.ts new file mode 100644 index 000000000000..6fb0d04a8cd0 --- /dev/null +++ b/packages/node/src/integrations/tracing/bullmq/types.ts @@ -0,0 +1,79 @@ +import type { Span } from '@opentelemetry/api'; +import type { Scope } from '@sentry/core'; + +/** + * Vendored from bullmq@5.79.1 — src/interfaces/telemetry.ts + * https://github.com/taskforcesh/bullmq/blob/master/src/interfaces/telemetry.ts + * + * Minimal subset of BullMQ's Telemetry interface so we don't depend on bullmq at runtime. + * + * SPDX-License-Identifier: MIT + * Copyright (c) 2018 BullForce Labs AB and contributors + */ + +export type AttributeValue = string | number | boolean | Array; + +export interface SpanOptions { + kind?: number; + attributes?: Record; + parent?: unknown; +} + +export interface TelemetrySpan { + setAttribute(key: string, value: AttributeValue): void; + setAttributes(attributes: Record): void; + addEvent(name: string, attributes?: Record): void; + recordException(exception: Error | string | { code?: number; message?: string; name?: string }): void; + setSpanOnContext(context: unknown): unknown; + end(): void; +} + +export interface Tracer { + startSpan(name: string, options?: SpanOptions, context?: Context): TelemetrySpan; +} + +export interface ContextManager { + active(): Context; + with unknown>(context: Context, fn: A): ReturnType; + getMetadata(context: Context): string; + fromMetadata(activeContext: Context, metadata: string): Context; +} + +export interface MetricOptions { + description?: string; + unit?: string; +} + +export interface Counter { + add(value: number, attributes?: Record): void; +} + +export interface Histogram { + record(value: number, attributes?: Record): void; +} + +export interface Gauge { + record(value: number, attributes?: Record): void; +} + +export interface Meter { + createCounter(name: string, options?: MetricOptions): Counter; + createHistogram(name: string, options?: MetricOptions): Histogram; + createGauge?(name: string, options?: MetricOptions): Gauge; +} + +export interface Telemetry { + tracer: Tracer; + contextManager: ContextManager; + meter?: Meter; +} + +export interface SentryContext { + span: Span | undefined; + scope: Scope; + producerSpanContext?: { + traceId: string; + spanId: string; + sampled: boolean; + }; +} diff --git a/packages/node/test/integrations/tracing/bullmq.test.ts b/packages/node/test/integrations/tracing/bullmq.test.ts new file mode 100644 index 000000000000..2d7e5f94e362 --- /dev/null +++ b/packages/node/test/integrations/tracing/bullmq.test.ts @@ -0,0 +1,484 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import * as SentryCore from '@sentry/core'; +import { BullMQTelemetry } from '../../../src/integrations/tracing/bullmq'; + +vi.mock('@sentry/core', async importOriginal => { + const actual = await importOriginal(); + return { + ...actual, + startInactiveSpan: vi.fn(() => mockOtelSpan()), + getActiveSpan: vi.fn(() => undefined), + getCurrentScope: vi.fn(() => mockScope()), + withActiveSpan: vi.fn((_span, fn) => fn()), + withIsolationScope: vi.fn(fn => fn()), + spanToTraceHeader: vi.fn(() => '00-traceid-spanid-01'), + captureException: vi.fn(), + metrics: { + count: vi.fn(), + distribution: vi.fn(), + gauge: vi.fn(), + }, + }; +}); + +function mockOtelSpan() { + return { + setAttribute: vi.fn(), + setAttributes: vi.fn(), + addEvent: vi.fn(), + addLink: vi.fn(), + recordException: vi.fn(), + end: vi.fn(), + spanContext: () => ({ spanId: 'abc123', traceId: 'def456', traceFlags: 1 }), + }; +} + +function mockScope() { + return { + setTag: vi.fn(), + setContext: vi.fn(), + }; +} + +beforeEach(() => { + vi.restoreAllMocks(); +}); + +describe('BullMQTelemetry', () => { + it('exposes tracer, contextManager, and meter', () => { + const telemetry = new BullMQTelemetry(); + + expect(telemetry.tracer).toBeDefined(); + expect(telemetry.contextManager).toBeDefined(); + expect(telemetry.meter).toBeDefined(); + }); +}); + +describe('SentryBullMQTracer', () => { + describe('startSpan', () => { + it.each([ + { name: 'add myQueue', expectedOp: 'queue.submit', expectedOrigin: 'auto.queue.bullmq.producer' }, + { name: 'addBulk myQueue', expectedOp: 'queue.submit', expectedOrigin: 'auto.queue.bullmq.producer' }, + { name: 'addFlow myQueue', expectedOp: 'queue.submit', expectedOrigin: 'auto.queue.bullmq.producer' }, + { name: 'addBulkFlows myQueue', expectedOp: 'queue.submit', expectedOrigin: 'auto.queue.bullmq.producer' }, + { name: 'process myQueue', expectedOp: 'queue.task', expectedOrigin: 'auto.queue.bullmq.consumer' }, + { name: 'pause myQueue', expectedOp: 'queue', expectedOrigin: 'auto.queue.bullmq' }, + { name: 'close myQueue', expectedOp: 'queue', expectedOrigin: 'auto.queue.bullmq' }, + { name: 'drain myQueue', expectedOp: 'queue', expectedOrigin: 'auto.queue.bullmq' }, + { name: 'getNextJob myQueue', expectedOp: 'queue', expectedOrigin: 'auto.queue.bullmq' }, + ])('maps "$name" to op=$expectedOp and origin=$expectedOrigin', ({ name, expectedOp, expectedOrigin }) => { + const telemetry = new BullMQTelemetry(); + + telemetry.tracer.startSpan(name); + + expect(SentryCore.startInactiveSpan).toHaveBeenCalledWith({ + name, + attributes: { + [SentryCore.SEMANTIC_ATTRIBUTE_SENTRY_OP]: expectedOp, + [SentryCore.SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: expectedOrigin, + 'messaging.system': 'bullmq', + }, + forceTransaction: expectedOp === 'queue.task', + }); + }); + + it('merges user-provided attributes from SpanOptions', () => { + const telemetry = new BullMQTelemetry(); + + telemetry.tracer.startSpan('add emails', { + attributes: { 'messaging.destination.name': 'emails' }, + }); + + expect(SentryCore.startInactiveSpan).toHaveBeenCalledWith( + expect.objectContaining({ + attributes: expect.objectContaining({ + 'messaging.destination.name': 'emails', + }), + }), + ); + }); + + it('forces transaction for queue.task spans', () => { + const telemetry = new BullMQTelemetry(); + + telemetry.tracer.startSpan('process notifications'); + + expect(SentryCore.startInactiveSpan).toHaveBeenCalledWith(expect.objectContaining({ forceTransaction: true })); + }); + + it('does not force transaction for queue.submit spans', () => { + const telemetry = new BullMQTelemetry(); + + telemetry.tracer.startSpan('add notifications'); + + expect(SentryCore.startInactiveSpan).toHaveBeenCalledWith(expect.objectContaining({ forceTransaction: false })); + }); + + it('adds span link to producer when context has producerSpanContext', () => { + const telemetry = new BullMQTelemetry(); + + telemetry.tracer.startSpan('process myQueue', undefined, { + span: undefined, + scope: mockScope() as any, + producerSpanContext: { + traceId: 'aabbccddaabbccddaabbccddaabbccdd', + spanId: '1122334455667788', + sampled: true, + }, + }); + + const span = vi.mocked(SentryCore.startInactiveSpan).mock.results[0]!.value; + expect(span.addLink).toHaveBeenCalledWith({ + context: { + traceId: 'aabbccddaabbccddaabbccddaabbccdd', + spanId: '1122334455667788', + traceFlags: 1, + }, + attributes: { + 'sentry.link.type': 'previous_trace', + }, + }); + expect(span.setAttribute).toHaveBeenCalledWith( + 'sentry.previous_trace', + 'aabbccddaabbccddaabbccddaabbccdd-1122334455667788-1', + ); + }); + + it('does not add span link when context has no producerSpanContext', () => { + const telemetry = new BullMQTelemetry(); + + telemetry.tracer.startSpan('process myQueue', undefined, { + span: undefined, + scope: mockScope() as any, + }); + + const span = vi.mocked(SentryCore.startInactiveSpan).mock.results[0]!.value; + expect(span.addLink).not.toHaveBeenCalled(); + }); + }); +}); + +describe('SentryBullMQSpan', () => { + function createSpan() { + const telemetry = new BullMQTelemetry(); + const span = telemetry.tracer.startSpan('Queue.add test-queue'); + const otelSpan = vi.mocked(SentryCore.startInactiveSpan).mock.results[0]!.value; + return { span, otelSpan }; + } + + it('delegates setAttribute to the underlying OTel span', () => { + const { span, otelSpan } = createSpan(); + + span.setAttribute('messaging.destination.name', 'emails'); + + expect(otelSpan.setAttribute).toHaveBeenCalledWith('messaging.destination.name', 'emails'); + }); + + it('delegates setAttributes to the underlying OTel span', () => { + const { span, otelSpan } = createSpan(); + + span.setAttributes({ 'messaging.destination.name': 'emails', 'messaging.message.id': '42' }); + + expect(otelSpan.setAttributes).toHaveBeenCalledWith({ + 'messaging.destination.name': 'emails', + 'messaging.message.id': '42', + }); + }); + + it('delegates addEvent to the underlying OTel span', () => { + const { span, otelSpan } = createSpan(); + + span.addEvent('job.retry', { 'bullmq.job.attempt': 3 }); + + expect(otelSpan.addEvent).toHaveBeenCalledWith('job.retry', { 'bullmq.job.attempt': 3 }); + }); + + it('delegates addEvent without attributes', () => { + const { span, otelSpan } = createSpan(); + + span.addEvent('job.completed'); + + expect(otelSpan.addEvent).toHaveBeenCalledWith('job.completed', undefined); + }); + + describe('recordException', () => { + it('records Error instances on the OTel span and captures via Sentry', () => { + const { span, otelSpan } = createSpan(); + const error = new Error('Redis connection refused'); + + span.recordException(error); + + expect(otelSpan.recordException).toHaveBeenCalledWith(error); + expect(SentryCore.captureException).toHaveBeenCalledWith(error, { + mechanism: { handled: false, type: 'auto.queue.bullmq' }, + }); + }); + + it('wraps non-Error exception objects in an Error and captures via Sentry', () => { + const { span, otelSpan } = createSpan(); + + span.recordException({ code: 503, message: 'Service unavailable' }); + + const wrappedError = new Error('Service unavailable'); + expect(otelSpan.recordException).toHaveBeenCalledWith(wrappedError); + expect(SentryCore.captureException).toHaveBeenCalledWith(wrappedError, { + mechanism: { handled: false, type: 'auto.queue.bullmq' }, + }); + }); + + it('wraps string exceptions in an Error and captures via Sentry', () => { + const { span, otelSpan } = createSpan(); + + (span as any).recordException('Connection timed out'); + + const wrappedError = new Error('Connection timed out'); + expect(otelSpan.recordException).toHaveBeenCalledWith(wrappedError); + expect(SentryCore.captureException).toHaveBeenCalledWith(wrappedError, { + mechanism: { handled: false, type: 'auto.queue.bullmq' }, + }); + }); + + it('uses fallback message when non-Error exception has no message', () => { + const { span, otelSpan } = createSpan(); + + span.recordException({ code: 500 }); + + const wrappedError = new Error('Unknown error'); + expect(otelSpan.recordException).toHaveBeenCalledWith(wrappedError); + expect(SentryCore.captureException).toHaveBeenCalledWith(wrappedError, { + mechanism: { handled: false, type: 'auto.queue.bullmq' }, + }); + }); + }); + + describe('addEvent', () => { + it('captures exception when event name is "job failed"', () => { + const { span } = createSpan(); + + span.addEvent('job failed', { 'bullmq.job.failed.reason': 'Connection refused' }); + + expect(SentryCore.captureException).toHaveBeenCalledWith(new Error('Connection refused'), { + mechanism: { handled: false, type: 'auto.queue.bullmq' }, + }); + }); + + it('does not capture exception for other event names', () => { + const { span } = createSpan(); + + span.addEvent('job completed', { 'bullmq.job.result': '{"ok":true}' }); + + expect(SentryCore.captureException).not.toHaveBeenCalled(); + }); + }); + + it('returns context with span and scope from setSpanOnContext', () => { + const { span, otelSpan } = createSpan(); + const inputContext = { existingKey: 'value' }; + + const result = span.setSpanOnContext(inputContext) as Record; + + expect(result.existingKey).toBe('value'); + expect(result.span).toBe(otelSpan); + expect(result.scope).toBeDefined(); + }); + + it('delegates end to the underlying OTel span', () => { + const { span, otelSpan } = createSpan(); + + span.end(); + + expect(otelSpan.end).toHaveBeenCalledOnce(); + }); +}); + +describe('SentryBullMQContextManager', () => { + it('returns current active span and scope from active()', () => { + const telemetry = new BullMQTelemetry(); + + const context = telemetry.contextManager.active(); + + expect(SentryCore.getActiveSpan).toHaveBeenCalledOnce(); + expect(SentryCore.getCurrentScope).toHaveBeenCalled(); + expect(context).toHaveProperty('span'); + expect(context).toHaveProperty('scope'); + }); + + describe('with', () => { + it('calls withActiveSpan inside withIsolationScope when context has a span', () => { + const telemetry = new BullMQTelemetry(); + const fakeSpan = mockOtelSpan(); + const context = { span: fakeSpan, scope: mockScope() }; + const fn = vi.fn(() => 'result'); + + const result = telemetry.contextManager.with(context as any, fn); + + expect(result).toBe('result'); + expect(SentryCore.withIsolationScope).toHaveBeenCalledOnce(); + expect(SentryCore.withActiveSpan).toHaveBeenCalledWith(fakeSpan, fn); + }); + + it('calls only withIsolationScope when context has no span', () => { + const telemetry = new BullMQTelemetry(); + const context = { span: undefined, scope: mockScope() }; + const fn = vi.fn(() => 'result'); + + telemetry.contextManager.with(context as any, fn); + + expect(SentryCore.withIsolationScope).toHaveBeenCalledOnce(); + expect(SentryCore.withActiveSpan).not.toHaveBeenCalled(); + }); + }); + + describe('getMetadata', () => { + it('returns sentry-trace header when context has a span', () => { + const telemetry = new BullMQTelemetry(); + const fakeSpan = mockOtelSpan(); + const context = { span: fakeSpan, scope: mockScope() }; + + const metadata = telemetry.contextManager.getMetadata(context as any); + + expect(SentryCore.spanToTraceHeader).toHaveBeenCalledWith(fakeSpan); + expect(metadata).toBe('00-traceid-spanid-01'); + }); + + it('returns empty string when context has no span', () => { + const telemetry = new BullMQTelemetry(); + const context = { span: undefined, scope: mockScope() }; + + const metadata = telemetry.contextManager.getMetadata(context as any); + + expect(metadata).toBe(''); + expect(SentryCore.spanToTraceHeader).not.toHaveBeenCalled(); + }); + }); + + describe('fromMetadata', () => { + it('parses sentry-trace header and attaches producerSpanContext', () => { + const telemetry = new BullMQTelemetry(); + const fakeSpan = mockOtelSpan(); + const context = { span: fakeSpan, scope: mockScope() }; + + const result = telemetry.contextManager.fromMetadata( + context as any, + 'aabbccddaabbccddaabbccddaabbccdd-1122334455667788-1', + ); + + expect(result.span).toBe(fakeSpan); + expect(result.scope).toBe(context.scope); + expect(result.producerSpanContext).toEqual({ + traceId: 'aabbccddaabbccddaabbccddaabbccdd', + spanId: '1122334455667788', + sampled: true, + }); + }); + + it('returns the active context unchanged when metadata is empty', () => { + const telemetry = new BullMQTelemetry(); + const context = { span: undefined, scope: mockScope() }; + + const result = telemetry.contextManager.fromMetadata(context as any, ''); + + expect(result).toBe(context); + }); + + it('returns the active context unchanged when metadata is invalid', () => { + const telemetry = new BullMQTelemetry(); + const context = { span: undefined, scope: mockScope() }; + + const result = telemetry.contextManager.fromMetadata(context as any, 'not-a-valid-header'); + + expect(result).toBe(context); + }); + }); +}); + +describe('SentryBullMQMeter', () => { + describe('counter', () => { + it('delegates add to metrics.count with name and unit', () => { + const telemetry = new BullMQTelemetry(); + const counter = telemetry.meter!.createCounter('bullmq.jobs.completed', { unit: '1' }); + + counter.add(5, { 'queue.name': 'emails' }); + + expect(SentryCore.metrics.count).toHaveBeenCalledWith('bullmq.jobs.completed', 5, { + unit: '1', + attributes: { 'queue.name': 'emails' }, + }); + }); + + it('passes undefined attributes when none provided', () => { + const telemetry = new BullMQTelemetry(); + const counter = telemetry.meter!.createCounter('bullmq.jobs.failed'); + + counter.add(1); + + expect(SentryCore.metrics.count).toHaveBeenCalledWith('bullmq.jobs.failed', 1, { + unit: undefined, + attributes: undefined, + }); + }); + }); + + describe('histogram', () => { + it('delegates record to metrics.distribution', () => { + const telemetry = new BullMQTelemetry(); + const histogram = telemetry.meter!.createHistogram('bullmq.job.duration', { unit: 'ms' }); + + histogram.record(142.5, { 'queue.name': 'notifications' }); + + expect(SentryCore.metrics.distribution).toHaveBeenCalledWith('bullmq.job.duration', 142.5, { + unit: 'ms', + attributes: { 'queue.name': 'notifications' }, + }); + }); + }); + + describe('gauge', () => { + it('delegates record to metrics.gauge', () => { + const telemetry = new BullMQTelemetry(); + const gauge = telemetry.meter!.createGauge!('bullmq.queue.size', { unit: '1' }); + + gauge.record(37, { 'queue.name': 'reports' }); + + expect(SentryCore.metrics.gauge).toHaveBeenCalledWith('bullmq.queue.size', 37, { + unit: '1', + attributes: { 'queue.name': 'reports' }, + }); + }); + }); + + describe('attribute filtering', () => { + it('filters out array attribute values', () => { + const telemetry = new BullMQTelemetry(); + const counter = telemetry.meter!.createCounter('bullmq.jobs.completed'); + + counter.add(1, { + 'queue.name': 'emails', + tags: ['urgent', 'retry'], + priority: 5, + enabled: true, + }); + + expect(SentryCore.metrics.count).toHaveBeenCalledWith('bullmq.jobs.completed', 1, { + unit: undefined, + attributes: { + 'queue.name': 'emails', + priority: 5, + enabled: true, + }, + }); + }); + + it('returns empty object when all attributes are arrays', () => { + const telemetry = new BullMQTelemetry(); + const counter = telemetry.meter!.createCounter('bullmq.jobs.completed'); + + counter.add(1, { tags: ['a', 'b'] }); + + expect(SentryCore.metrics.count).toHaveBeenCalledWith('bullmq.jobs.completed', 1, { + unit: undefined, + attributes: {}, + }); + }); + }); +}); diff --git a/yarn.lock b/yarn.lock index 13c6097ac913..6a48c67aac4e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5251,35 +5251,35 @@ dependencies: sparse-bitfield "^3.0.3" -"@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3": - version "3.0.3" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.3.tgz#9edec61b22c3082018a79f6d1c30289ddf3d9d11" - integrity sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw== +"@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.4": + version "3.0.4" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.4.tgz#22619f76a6b10ba78c8b74025b0d9754cad69cc7" + integrity sha512-LCkGo6JDfaBhgST7UpPWgNgLINpcpabaHfyz5OBx75nUYxBsaEPxjnyNjWpeb/xBup/682QnBfRBy2/LvPutZQ== -"@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3": - version "3.0.3" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.3.tgz#33677a275204898ad8acbf62734fc4dc0b6a4855" - integrity sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw== +"@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.4": + version "3.0.4" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.4.tgz#c2fc0573afe08b0cf213e66eef76842b121d1577" + integrity sha512-zExlW9zUJKZH/tOtVMttwjKa4Xm/3KcNjnE3dPN92uCktwavMxpgCA3MoJK/DOnTWsQgo224OaST27/mPNAf+w== -"@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3": - version "3.0.3" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.3.tgz#19edf7cdc2e7063ee328403c1d895a86dd28f4bb" - integrity sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg== +"@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.4": + version "3.0.4" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.4.tgz#4e3822f5522e18ed92611b894dc5db1bc882f39d" + integrity sha512-dgX0P/9wGPJeHFBG+ZmhgE6bmtMt7NP5CRBGyyktpopdk/mW4POnrpQsSLtKI1dwpc+pPLuXHDh6vvskyQE/sw== -"@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3": - version "3.0.3" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.3.tgz#94fb0543ba2e28766c3fc439cabbe0440ae70159" - integrity sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw== +"@msgpackr-extract/msgpackr-extract-linux-arm@3.0.4": + version "3.0.4" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.4.tgz#27ec4bc7eb6c311c982a50f1a6e1e1414638a6f8" + integrity sha512-Tg3yX65f5GbtXLkrYEHE5oibZG9epyYWas7FogTTEJeDEF9JlXJzKgXaNhT3UXlTOeA+AfZpYZYZ0uPj7Cfquw== -"@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3": - version "3.0.3" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.3.tgz#4a0609ab5fe44d07c9c60a11e4484d3c38bbd6e3" - integrity sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg== +"@msgpackr-extract/msgpackr-extract-linux-x64@3.0.4": + version "3.0.4" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.4.tgz#37317d833c7d01d086f6155fa59c478adb6839f6" + integrity sha512-8TNXMEjJc3QEy7R/x1INhgiU+XakDAFUzBhaz7+Rbrs8NH5UQeHQxxmzsSBJGyV6I1jW79undiQm8tOI+D+8FQ== -"@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3": - version "3.0.3" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.3.tgz#0aa5502d547b57abfc4ac492de68e2006e417242" - integrity sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ== +"@msgpackr-extract/msgpackr-extract-win32-x64@3.0.4": + version "3.0.4" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.4.tgz#a1c79dcc9ae5f8c02aea8c2f144e5af6a822e5e8" + integrity sha512-CmCXPQrkbwExx3j946/PtHWHbYJiCRBRDl4BlkRQcJB/YOwQxJRTpoo7aTsortjgoJ1x7opzTSxn7C+ASSLVjQ== "@napi-rs/wasm-runtime@0.2.4": version "0.2.4" @@ -12450,6 +12450,18 @@ builtins@^5.0.0: dependencies: semver "^7.0.0" +bullmq@^5.79.1: + version "5.79.1" + resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-5.79.1.tgz#12d33ad99a9addfb4bb96a9ca74363ef5b11b689" + integrity sha512-cteoHRr1FGOTUgzFrnMyBNGtQhNeVR8Ej6nImNSHQDJi4tj6GMD0p9ZG65ZsTnvR9RVf18dhRxWu4kFl634QGA== + dependencies: + cron-parser "4.9.0" + ioredis "5.10.1" + msgpackr "2.0.2" + node-abort-controller "3.1.1" + semver "7.8.1" + tslib "2.8.1" + bun-types@^1.2.9: version "1.2.9" resolved "https://registry.yarnpkg.com/bun-types/-/bun-types-1.2.9.tgz#e0208ba62f534eb64284c1f347f73bde7105c0f0" @@ -13585,7 +13597,7 @@ critters@0.0.16: postcss "^8.3.7" pretty-bytes "^5.3.0" -cron-parser@^4.2.0: +cron-parser@4.9.0, cron-parser@^4.2.0: version "4.9.0" resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.9.0.tgz#0340694af3e46a0894978c6f52a6dbb5c0f11ad5" integrity sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q== @@ -21960,19 +21972,26 @@ ms@2.1.3, ms@^2.0.0, ms@^2.1.1, ms@^2.1.3: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== -msgpackr-extract@^3.0.2: - version "3.0.3" - resolved "https://registry.yarnpkg.com/msgpackr-extract/-/msgpackr-extract-3.0.3.tgz#e9d87023de39ce714872f9e9504e3c1996d61012" - integrity sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA== +msgpackr-extract@^3.0.2, msgpackr-extract@^3.0.4: + version "3.0.4" + resolved "https://registry.yarnpkg.com/msgpackr-extract/-/msgpackr-extract-3.0.4.tgz#d252698947f7a1a62478d22bfb789ba48dd4c1ac" + integrity sha512-4kmO/MdyUIkLIvTPr8VHLil4AtoKIoniWPIEk5+CDy0xnWC84azhSFmuJ7PxZdsYtiP5kEeQsORAVIeMgxT+Hw== dependencies: node-gyp-build-optional-packages "5.2.2" optionalDependencies: - "@msgpackr-extract/msgpackr-extract-darwin-arm64" "3.0.3" - "@msgpackr-extract/msgpackr-extract-darwin-x64" "3.0.3" - "@msgpackr-extract/msgpackr-extract-linux-arm" "3.0.3" - "@msgpackr-extract/msgpackr-extract-linux-arm64" "3.0.3" - "@msgpackr-extract/msgpackr-extract-linux-x64" "3.0.3" - "@msgpackr-extract/msgpackr-extract-win32-x64" "3.0.3" + "@msgpackr-extract/msgpackr-extract-darwin-arm64" "3.0.4" + "@msgpackr-extract/msgpackr-extract-darwin-x64" "3.0.4" + "@msgpackr-extract/msgpackr-extract-linux-arm" "3.0.4" + "@msgpackr-extract/msgpackr-extract-linux-arm64" "3.0.4" + "@msgpackr-extract/msgpackr-extract-linux-x64" "3.0.4" + "@msgpackr-extract/msgpackr-extract-win32-x64" "3.0.4" + +msgpackr@2.0.2: + version "2.0.2" + resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-2.0.2.tgz#2b9838ba796d4c9760878a04d4fa69391178b5ac" + integrity sha512-c5hYOXFbP79Slh6Dzd2wzk+jnV7mX1UxfMYtilnY1NmalXPqG8DGb5cYCMBrW4AsH3zekBBZd4QrKz9NhtvYLQ== + optionalDependencies: + msgpackr-extract "^3.0.4" msgpackr@^1.11.9: version "1.11.9" @@ -22379,6 +22398,11 @@ node-abi@^3.3.0, node-abi@^3.73.0, node-abi@^3.89.0: dependencies: semver "^7.3.5" +node-abort-controller@3.1.1: + version "3.1.1" + resolved "https://registry.yarnpkg.com/node-abort-controller/-/node-abort-controller-3.1.1.tgz#a94377e964a9a37ac3976d848cb5c765833b8548" + integrity sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ== + node-addon-api@^3.0.0: version "3.2.1" resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-3.2.1.tgz#81325e0a2117789c0128dab65e7e38f07ceba161" @@ -26677,6 +26701,11 @@ semver@7.7.4: resolved "https://registry.yarnpkg.com/semver/-/semver-7.7.4.tgz#28464e36060e991fa7a11d0279d2d3f3b57a7e8a" integrity sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA== +semver@7.8.1, semver@^7.0.0, semver@^7.1.1, semver@^7.3.2, semver@^7.3.4, semver@^7.3.5, semver@^7.3.7, semver@^7.3.8, semver@^7.5.0, semver@^7.5.3, semver@^7.5.4, semver@^7.6.0, semver@^7.6.2, semver@^7.6.3, semver@^7.7.2, semver@^7.7.3, semver@^7.7.4, semver@^7.8.0: + version "7.8.1" + resolved "https://registry.yarnpkg.com/semver/-/semver-7.8.1.tgz#bf4970b5e70fda0686363cc18bfe8805d5ed957e" + integrity sha512-rkVq3IXh+4FDGch+KwzX3aV9W3kO54GyEgpvBzSyctDA6Xtd7RJQV1xmXbeQp5v7+VzLOfVqiutSE6GICgPFvg== + semver@^5.3.0, semver@^5.4.1, semver@^5.5.0, semver@^5.6.0, semver@^5.7.1: version "5.7.2" resolved "https://registry.yarnpkg.com/semver/-/semver-5.7.2.tgz#48d55db737c3287cd4835e17fa13feace1c41ef8" @@ -26687,11 +26716,6 @@ semver@^6.0.0, semver@^6.1.0, semver@^6.1.1, semver@^6.1.2, semver@^6.3.0, semve resolved "https://registry.yarnpkg.com/semver/-/semver-6.3.1.tgz#556d2ef8689146e46dcea4bfdd095f3434dffcb4" integrity sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA== -semver@^7.0.0, semver@^7.1.1, semver@^7.3.2, semver@^7.3.4, semver@^7.3.5, semver@^7.3.7, semver@^7.3.8, semver@^7.5.0, semver@^7.5.3, semver@^7.5.4, semver@^7.6.0, semver@^7.6.2, semver@^7.6.3, semver@^7.7.2, semver@^7.7.3, semver@^7.7.4, semver@^7.8.0: - version "7.8.0" - resolved "https://registry.yarnpkg.com/semver/-/semver-7.8.0.tgz#ed0661039fcbcda2ce71f01fa6adbefaa77040df" - integrity sha512-AcM7dV/5ul4EekoQ29Agm5vri8JNqRyj39o0qpX6vDF2GZrtutZl5RwgD1XnZjiTAfncsJhMI48QQH3sN87YNA== - send@^1.1.0, send@^1.2.0: version "1.2.1" resolved "https://registry.yarnpkg.com/send/-/send-1.2.1.tgz#9eab743b874f3550f40a26867bf286ad60d3f3ed"