diff --git a/.changeset/admin-metrics-collector-foundation.md b/.changeset/admin-metrics-collector-foundation.md new file mode 100644 index 00000000..058d2054 --- /dev/null +++ b/.changeset/admin-metrics-collector-foundation.md @@ -0,0 +1,5 @@ +--- +"nostream": minor +--- + +feat: add in-memory admin metrics collector foundation diff --git a/src/factories/admin-metrics-collector-factory.ts b/src/factories/admin-metrics-collector-factory.ts new file mode 100644 index 00000000..873b2b85 --- /dev/null +++ b/src/factories/admin-metrics-collector-factory.ts @@ -0,0 +1,11 @@ +import { AdminMetricsCollector } from '../utils/admin-metrics' + +let collector: AdminMetricsCollector | undefined + +export const getAdminMetricsCollector = (): AdminMetricsCollector => { + if (!collector) { + collector = new AdminMetricsCollector() + } + + return collector +} diff --git a/src/factories/worker-factory.ts b/src/factories/worker-factory.ts index 5d9fe549..13ea8cb9 100644 --- a/src/factories/worker-factory.ts +++ b/src/factories/worker-factory.ts @@ -5,6 +5,7 @@ import { WebSocketServer } from 'ws' import { getMasterDbClient, getReadReplicaDbClient } from '../database/client' import { AppWorker } from '../app/worker' +import { getAdminMetricsCollector } from './admin-metrics-collector-factory' import { createLogger } from './logger-factory' import { createSettings } from '../factories/settings-factory' import { createWebApp } from './web-app-factory' @@ -17,6 +18,8 @@ import { WebSocketServerAdapter } from '../adapters/web-socket-server-adapter' const logger = createLogger('worker-factory') export const workerFactory = (): AppWorker => { + getAdminMetricsCollector() + const dbClient = getMasterDbClient() const readReplicaDbClient = getReadReplicaDbClient() const eventRepository = new EventRepository(dbClient, readReplicaDbClient) diff --git a/src/utils/admin-metrics.ts b/src/utils/admin-metrics.ts new file mode 100644 index 00000000..c9406432 --- /dev/null +++ b/src/utils/admin-metrics.ts @@ -0,0 +1,79 @@ +import os from 'os' + +export interface AdminMetricsSnapshot { + timestamp: number + eventsPerSecond: number + acceptedEvents: number + rejectedEvents: number + activeConnections: number + cpuLoadPercent: number + memoryUsedMb: number +} + +export class AdminMetricsCollector { + private acceptedEvents = 0 + private rejectedEvents = 0 + private readonly activeConnections = new Set() + private readonly eventTimestamps: number[] = [] + private previousCpuUsage = process.cpuUsage() + private previousCpuTimeMs = Date.now() + + public recordAcceptedEvent(timestamp: number = Date.now()): void { + this.acceptedEvents += 1 + this.eventTimestamps.push(timestamp) + this.pruneEventWindow(timestamp) + } + + public recordRejectedEvent(timestamp: number = Date.now()): void { + this.rejectedEvents += 1 + this.eventTimestamps.push(timestamp) + this.pruneEventWindow(timestamp) + } + + public openConnection(connectionId: string): void { + this.activeConnections.add(connectionId) + } + + public closeConnection(connectionId: string): void { + this.activeConnections.delete(connectionId) + } + + public getSnapshot(timestamp: number = Date.now()): AdminMetricsSnapshot { + this.pruneEventWindow(timestamp) + + return { + timestamp, + eventsPerSecond: this.eventTimestamps.length, + acceptedEvents: this.acceptedEvents, + rejectedEvents: this.rejectedEvents, + activeConnections: this.activeConnections.size, + cpuLoadPercent: this.getCpuLoadPercent(timestamp), + memoryUsedMb: Math.round((process.memoryUsage().heapUsed / 1024 / 1024) * 100) / 100, + } + } + + private pruneEventWindow(timestamp: number): void { + const minTimestamp = timestamp - 1000 + + while (this.eventTimestamps.length > 0 && this.eventTimestamps[0] <= minTimestamp) { + this.eventTimestamps.shift() + } + } + + private getCpuLoadPercent(timestamp: number): number { + const elapsedMs = timestamp - this.previousCpuTimeMs + if (elapsedMs <= 0) { + return 0 + } + + const cpuDiff = process.cpuUsage(this.previousCpuUsage) + this.previousCpuUsage = process.cpuUsage() + this.previousCpuTimeMs = timestamp + + const cpuTotalMs = (cpuDiff.user + cpuDiff.system) / 1000 + const cores = Math.max(os.cpus().length, 1) + const loadPercent = (cpuTotalMs / (elapsedMs * cores)) * 100 + + return Math.round(Math.max(0, Math.min(loadPercent, 100)) * 100) / 100 + } +} diff --git a/test/unit/utils/admin-metrics.spec.ts b/test/unit/utils/admin-metrics.spec.ts new file mode 100644 index 00000000..5f6be6b2 --- /dev/null +++ b/test/unit/utils/admin-metrics.spec.ts @@ -0,0 +1,65 @@ +import { expect } from 'chai' +import Sinon from 'sinon' + +import { AdminMetricsCollector } from '../../../src/utils/admin-metrics' + +describe('AdminMetricsCollector', () => { + let collector: AdminMetricsCollector + let clock: Sinon.SinonFakeTimers + let sandbox: Sinon.SinonSandbox + + beforeEach(() => { + sandbox = Sinon.createSandbox() + clock = sandbox.useFakeTimers(10000) + collector = new AdminMetricsCollector() + }) + + afterEach(() => { + clock.restore() + sandbox.restore() + }) + + it('returns zeroed snapshot by default', () => { + const snapshot = collector.getSnapshot() + + expect(snapshot.eventsPerSecond).to.equal(0) + expect(snapshot.acceptedEvents).to.equal(0) + expect(snapshot.rejectedEvents).to.equal(0) + expect(snapshot.activeConnections).to.equal(0) + expect(snapshot.cpuLoadPercent).to.be.greaterThanOrEqual(0) + expect(snapshot.memoryUsedMb).to.be.greaterThan(0) + }) + + it('tracks accepted and rejected event counters', () => { + collector.recordAcceptedEvent() + collector.recordAcceptedEvent() + collector.recordRejectedEvent() + + const snapshot = collector.getSnapshot() + + expect(snapshot.acceptedEvents).to.equal(2) + expect(snapshot.rejectedEvents).to.equal(1) + }) + + it('tracks events per second in a rolling 1-second window', () => { + collector.recordAcceptedEvent(10000) + collector.recordRejectedEvent(10500) + collector.recordRejectedEvent(10999) + + expect(collector.getSnapshot(11000).eventsPerSecond).to.equal(2) + expect(collector.getSnapshot(11500).eventsPerSecond).to.equal(1) + expect(collector.getSnapshot(12000).eventsPerSecond).to.equal(0) + }) + + it('tracks active websocket connections by id', () => { + collector.openConnection('conn-1') + collector.openConnection('conn-2') + collector.openConnection('conn-1') + + expect(collector.getSnapshot().activeConnections).to.equal(2) + + collector.closeConnection('conn-2') + + expect(collector.getSnapshot().activeConnections).to.equal(1) + }) +})