From 935efb5bf2fbec7431ee29a0e0b548293a287ba2 Mon Sep 17 00:00:00 2001 From: eternal-flame-AD Date: Sun, 17 Nov 2024 18:47:56 -0600 Subject: [PATCH] prometheus - stage 2 workers Signed-off-by: eternal-flame-AD --- package.json | 2 +- packages/backend/src/boot/common.ts | 7 +++- packages/backend/src/boot/entry.ts | 12 +++++- packages/backend/src/boot/master.ts | 15 ++------ packages/backend/src/boot/worker.ts | 8 +--- packages/backend/src/core/QueueModule.ts | 4 +- packages/backend/src/queue/metrics.ts | 6 +-- .../queue/processors/InboxProcessorService.ts | 2 +- packages/backend/src/server/ServerService.ts | 4 +- .../backend/src/server/api/MetricsService.ts | 38 ++++++++++++++++++- yume-mods/etc/config.alloy | 16 +++++++- 11 files changed, 84 insertions(+), 30 deletions(-) diff --git a/package.json b/package.json index d9936c275a..135d42e262 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "check:connect": "cd packages/backend && pnpm check:connect", "migrateandstart": "pnpm migrate && pnpm start", "watch": "pnpm dev", - "dev": "node scripts/dev.mjs", + "dev": "cross-env RUN_MODE=dev node scripts/dev.mjs", "lint": "pnpm -r lint", "cy:open": "pnpm cypress open --browser --e2e --config-file=cypress.config.ts", "cy:run": "pnpm cypress run", diff --git a/packages/backend/src/boot/common.ts b/packages/backend/src/boot/common.ts index 268c07582d..420701fae4 100644 --- a/packages/backend/src/boot/common.ts +++ b/packages/backend/src/boot/common.ts @@ -4,6 +4,7 @@ */ import { NestFactory } from '@nestjs/core'; +import * as prom from 'prom-client'; import { ChartManagementService } from '@/core/chart/ChartManagementService.js'; import { QueueProcessorService } from '@/queue/QueueProcessorService.js'; import { NestLogger } from '@/NestLogger.js'; @@ -12,8 +13,9 @@ import { QueueStatsService } from '@/daemons/QueueStatsService.js'; import { ServerStatsService } from '@/daemons/ServerStatsService.js'; import { ServerService } from '@/server/ServerService.js'; import { MainModule } from '@/MainModule.js'; +import { MetricsService } from '@/server/api/MetricsService.js'; -export async function server() { +export async function server(workerRegistry?: prom.AggregatorRegistry) { const app = await NestFactory.createApplicationContext(MainModule, { logger: new NestLogger(), }); @@ -22,6 +24,9 @@ export async function server() { await serverService.launch(); if (process.env.NODE_ENV !== 'test') { + if (workerRegistry) { + app.get(MetricsService).setWorkerRegistry(workerRegistry); + } app.get(ChartManagementService).start(); app.get(QueueStatsService).start(); app.get(ServerStatsService).start(); diff --git a/packages/backend/src/boot/entry.ts b/packages/backend/src/boot/entry.ts index 25375c3015..67570eb9cd 100644 --- a/packages/backend/src/boot/entry.ts +++ b/packages/backend/src/boot/entry.ts @@ -8,6 +8,7 @@ */ import cluster from 'node:cluster'; +import * as prom from 'prom-client'; import { EventEmitter } from 'node:events'; import chalk from 'chalk'; import Xev from 'xev'; @@ -17,6 +18,15 @@ import { masterMain } from './master.js'; import { workerMain } from './worker.js'; import { readyRef } from './ready.js'; +const workerRegistry = new prom.AggregatorRegistry(); + +prom.collectDefaultMetrics({ + labels: { + cluster_type: `${cluster.isPrimary ? 'master' : 'worker'}`, + worker_id: cluster.worker?.id.toString() || 'none' + } +}); + import 'reflect-metadata'; process.title = `Misskey (${cluster.isPrimary ? 'master' : 'worker'})`; @@ -69,7 +79,7 @@ process.on('exit', code => { //#endregion if (cluster.isPrimary || envOption.disableClustering) { - await masterMain(); + await masterMain(workerRegistry); if (cluster.isPrimary) { ev.mount(); diff --git a/packages/backend/src/boot/master.ts b/packages/backend/src/boot/master.ts index c3a0dc6b98..ddda2afdf9 100644 --- a/packages/backend/src/boot/master.ts +++ b/packages/backend/src/boot/master.ts @@ -11,7 +11,6 @@ import * as prom from 'prom-client'; import cluster from 'node:cluster'; import chalk from 'chalk'; import chalkTemplate from 'chalk-template'; -import { collectDefaultMetrics } from 'prom-client'; import * as Sentry from '@sentry/node'; import { nodeProfilingIntegration } from '@sentry/profiling-node'; import Logger from '@/logger.js'; @@ -75,7 +74,7 @@ function greet() { /** * Init master process */ -export async function masterMain() { +export async function masterMain(workerRegistry?: prom.AggregatorRegistry) { let config!: Config; // initialize app @@ -86,12 +85,6 @@ export async function masterMain() { showNodejsVersion(); config = loadConfigBoot(); - collectDefaultMetrics({ - labels: { - cluster_type: 'master', - } - }); - //await connectDb(); if (config.pidFile) fs.writeFileSync(config.pidFile, process.pid.toString()); } catch (e) { @@ -123,11 +116,11 @@ export async function masterMain() { if (envOption.disableClustering) { if (envOption.onlyServer) { - await server(); + await server(workerRegistry); } else if (envOption.onlyQueue) { await jobQueue(); } else { - await server(); + await server(workerRegistry); await jobQueue(); } } else { @@ -136,7 +129,7 @@ export async function masterMain() { } else if (envOption.onlyQueue) { // nop } else { - await server(); + await server(workerRegistry); } await spawnWorkers(config.clusterLimit); diff --git a/packages/backend/src/boot/worker.ts b/packages/backend/src/boot/worker.ts index a4791cf062..e767c953e0 100644 --- a/packages/backend/src/boot/worker.ts +++ b/packages/backend/src/boot/worker.ts @@ -4,7 +4,7 @@ */ import cluster from 'node:cluster'; -import { collectDefaultMetrics } from 'prom-client'; +import { collectDefaultMetrics, AggregatorRegistry, RegistryContentType } from 'prom-client'; import * as Sentry from '@sentry/node'; import { nodeProfilingIntegration } from '@sentry/profiling-node'; import { envOption } from '@/env.js'; @@ -17,12 +17,6 @@ import { jobQueue, server } from './common.js'; export async function workerMain() { const config = loadConfig(); - collectDefaultMetrics({ - labels: { - cluster_type: 'worker', - } - }); - if (config.sentryForBackend) { Sentry.init({ integrations: [ diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index c3c9256e0e..f27fa8f3f4 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -18,7 +18,7 @@ import { SystemWebhookDeliverJobData, } from '../queue/types.js'; import type { Provider } from '@nestjs/common'; -import { mActiveJobs, mDelayedJobs, mJobReceivedCounter, mWaitingJobs } from '@/queue/metrics.js'; +import { mActiveJobs, mDelayedJobs, mJobBlockedCounter, mWaitingJobs } from '@/queue/metrics.js'; export type SystemQueue = Bull.Queue>; export type EndedPollNotificationQueue = Bull.Queue; @@ -39,7 +39,7 @@ function withMetrics(queue: Bull.Queue): Bull.Queue { }, 2000); queue.on('waiting', () => { - mJobReceivedCounter?.inc({ queue: queue.name }); + mJobBlockedCounter?.inc({ queue: queue.name }); }); } diff --git a/packages/backend/src/queue/metrics.ts b/packages/backend/src/queue/metrics.ts index 86c56daebc..2fa20a1754 100644 --- a/packages/backend/src/queue/metrics.ts +++ b/packages/backend/src/queue/metrics.ts @@ -1,8 +1,8 @@ import { metricCounter, metricGauge } from '@/server/api/MetricsService.js'; -export const mJobReceivedCounter = metricCounter({ - name: 'misskey_queue_jobs_received_total', - help: 'Total number of jobs received by queue', +export const mJobBlockedCounter = metricCounter({ + name: 'misskey_queue_jobs_blocked_total', + help: 'Total number of jobs waiting for a worker', labelNames: ['queue'], }); diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 215cc66837..4cae799219 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -30,9 +30,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; +import { metricCounter, metricHistogram } from '@/server/api/MetricsService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; -import { metricCounter, metricHistogram } from '@/server/api/MetricsService.js'; type UpdateInstanceJob = { latestRequestReceivedAt: Date, diff --git a/packages/backend/src/server/ServerService.ts b/packages/backend/src/server/ServerService.ts index dc48929112..e49ea9432b 100644 --- a/packages/backend/src/server/ServerService.ts +++ b/packages/backend/src/server/ServerService.ts @@ -237,7 +237,9 @@ export class ServerService implements OnApplicationShutdown { }, duration / 1000); } - if (logPath === '/metrics' || logPath === '/healthz') { + const addSlash = logPath + (logPath.endsWith('/') ? '' : '/'); + + if (addSlash.startsWith('/metrics/') || addSlash.startsWith('/healthz/')) { done(); return; } diff --git a/packages/backend/src/server/api/MetricsService.ts b/packages/backend/src/server/api/MetricsService.ts index 2837fdbb19..56facb8625 100644 --- a/packages/backend/src/server/api/MetricsService.ts +++ b/packages/backend/src/server/api/MetricsService.ts @@ -36,11 +36,17 @@ export function metricHistogram(conf: prom.HistogramConfigurat @Injectable() export class MetricsService { + private workerRegistry: prom.AggregatorRegistry | null = null; constructor( @Inject(DI.config) - private config: Config + private config: Config, ) {} + @bindThis + public setWorkerRegistry(workerRegistry: prom.AggregatorRegistry) { + this.workerRegistry = workerRegistry; + } + @bindThis public createServer(fastify: FastifyInstance, options: FastifyPluginOptions, done: (err?: Error) => void) { if (this.config.prometheusMetrics?.enable) { @@ -69,6 +75,36 @@ export class MetricsService { reply.code(500); } }); + + fastify.get('/metrics/cluster', async (request, reply) => { + if (token) { + const bearer = request.headers.authorization; + + if (!bearer) { + reply.code(401); + return; + } + + const [type, t] = bearer.split(' '); + + if (type !== 'Bearer' || t !== token) { + reply.code(403); + return; + } + } + + if (!this.workerRegistry) { + reply.code(404); + return; + } + + try { + reply.header('Content-Type', this.workerRegistry.contentType); + reply.send(await this.workerRegistry.clusterMetrics()); + } catch (err) { + reply.code(500); + } + }); } done(); diff --git a/yume-mods/etc/config.alloy b/yume-mods/etc/config.alloy index 5594f09dd6..f9c6382253 100644 --- a/yume-mods/etc/config.alloy +++ b/yume-mods/etc/config.alloy @@ -11,7 +11,21 @@ prometheus.scrape "yumechinokuni" { __scheme__ = "https", environment = "test", hostname = "test0.mi.yumechi.jp", - } + }, + { + __metrics_path__ = "/metrics/cluster", + __address__ = "mi.yumechi.jp:443", + __scheme__ = "https", + environment = "prod", + hostname = "mi.yumechi.jp", + }, + { + __metrics_path__ = "/metrics/cluster", + __address__ = "test0.mi.yumechi.jp:443", + __scheme__ = "https", + environment = "test", + hostname = "test0.mi.yumechi.jp", + }, ] forward_to = [prometheus.remote_write.mihari.receiver]