prometheus - stage 2 workers #22

Merged
yume merged 1 commit from develop into master 2024-11-17 19:07:23 -06:00
11 changed files with 84 additions and 30 deletions
Showing only changes of commit 935efb5bf2 - Show all commits

View file

@ -32,7 +32,7 @@
"check:connect": "cd packages/backend && pnpm check:connect", "check:connect": "cd packages/backend && pnpm check:connect",
"migrateandstart": "pnpm migrate && pnpm start", "migrateandstart": "pnpm migrate && pnpm start",
"watch": "pnpm dev", "watch": "pnpm dev",
"dev": "node scripts/dev.mjs", "dev": "cross-env RUN_MODE=dev node scripts/dev.mjs",
"lint": "pnpm -r lint", "lint": "pnpm -r lint",
"cy:open": "pnpm cypress open --browser --e2e --config-file=cypress.config.ts", "cy:open": "pnpm cypress open --browser --e2e --config-file=cypress.config.ts",
"cy:run": "pnpm cypress run", "cy:run": "pnpm cypress run",

View file

@ -4,6 +4,7 @@
*/ */
import { NestFactory } from '@nestjs/core'; import { NestFactory } from '@nestjs/core';
import * as prom from 'prom-client';
import { ChartManagementService } from '@/core/chart/ChartManagementService.js'; import { ChartManagementService } from '@/core/chart/ChartManagementService.js';
import { QueueProcessorService } from '@/queue/QueueProcessorService.js'; import { QueueProcessorService } from '@/queue/QueueProcessorService.js';
import { NestLogger } from '@/NestLogger.js'; import { NestLogger } from '@/NestLogger.js';
@ -12,8 +13,9 @@ import { QueueStatsService } from '@/daemons/QueueStatsService.js';
import { ServerStatsService } from '@/daemons/ServerStatsService.js'; import { ServerStatsService } from '@/daemons/ServerStatsService.js';
import { ServerService } from '@/server/ServerService.js'; import { ServerService } from '@/server/ServerService.js';
import { MainModule } from '@/MainModule.js'; import { MainModule } from '@/MainModule.js';
import { MetricsService } from '@/server/api/MetricsService.js';
export async function server() { export async function server(workerRegistry?: prom.AggregatorRegistry<prom.PrometheusContentType>) {
const app = await NestFactory.createApplicationContext(MainModule, { const app = await NestFactory.createApplicationContext(MainModule, {
logger: new NestLogger(), logger: new NestLogger(),
}); });
@ -22,6 +24,9 @@ export async function server() {
await serverService.launch(); await serverService.launch();
if (process.env.NODE_ENV !== 'test') { if (process.env.NODE_ENV !== 'test') {
if (workerRegistry) {
app.get(MetricsService).setWorkerRegistry(workerRegistry);
}
app.get(ChartManagementService).start(); app.get(ChartManagementService).start();
app.get(QueueStatsService).start(); app.get(QueueStatsService).start();
app.get(ServerStatsService).start(); app.get(ServerStatsService).start();

View file

@ -8,6 +8,7 @@
*/ */
import cluster from 'node:cluster'; import cluster from 'node:cluster';
import * as prom from 'prom-client';
import { EventEmitter } from 'node:events'; import { EventEmitter } from 'node:events';
import chalk from 'chalk'; import chalk from 'chalk';
import Xev from 'xev'; import Xev from 'xev';
@ -17,6 +18,15 @@ import { masterMain } from './master.js';
import { workerMain } from './worker.js'; import { workerMain } from './worker.js';
import { readyRef } from './ready.js'; import { readyRef } from './ready.js';
const workerRegistry = new prom.AggregatorRegistry<prom.PrometheusContentType>();
prom.collectDefaultMetrics({
labels: {
cluster_type: `${cluster.isPrimary ? 'master' : 'worker'}`,
worker_id: cluster.worker?.id.toString() || 'none'
}
});
import 'reflect-metadata'; import 'reflect-metadata';
process.title = `Misskey (${cluster.isPrimary ? 'master' : 'worker'})`; process.title = `Misskey (${cluster.isPrimary ? 'master' : 'worker'})`;
@ -69,7 +79,7 @@ process.on('exit', code => {
//#endregion //#endregion
if (cluster.isPrimary || envOption.disableClustering) { if (cluster.isPrimary || envOption.disableClustering) {
await masterMain(); await masterMain(workerRegistry);
if (cluster.isPrimary) { if (cluster.isPrimary) {
ev.mount(); ev.mount();

View file

@ -11,7 +11,6 @@ import * as prom from 'prom-client';
import cluster from 'node:cluster'; import cluster from 'node:cluster';
import chalk from 'chalk'; import chalk from 'chalk';
import chalkTemplate from 'chalk-template'; import chalkTemplate from 'chalk-template';
import { collectDefaultMetrics } from 'prom-client';
import * as Sentry from '@sentry/node'; import * as Sentry from '@sentry/node';
import { nodeProfilingIntegration } from '@sentry/profiling-node'; import { nodeProfilingIntegration } from '@sentry/profiling-node';
import Logger from '@/logger.js'; import Logger from '@/logger.js';
@ -75,7 +74,7 @@ function greet() {
/** /**
* Init master process * Init master process
*/ */
export async function masterMain() { export async function masterMain(workerRegistry?: prom.AggregatorRegistry<prom.PrometheusContentType>) {
let config!: Config; let config!: Config;
// initialize app // initialize app
@ -86,12 +85,6 @@ export async function masterMain() {
showNodejsVersion(); showNodejsVersion();
config = loadConfigBoot(); config = loadConfigBoot();
collectDefaultMetrics({
labels: {
cluster_type: 'master',
}
});
//await connectDb(); //await connectDb();
if (config.pidFile) fs.writeFileSync(config.pidFile, process.pid.toString()); if (config.pidFile) fs.writeFileSync(config.pidFile, process.pid.toString());
} catch (e) { } catch (e) {
@ -123,11 +116,11 @@ export async function masterMain() {
if (envOption.disableClustering) { if (envOption.disableClustering) {
if (envOption.onlyServer) { if (envOption.onlyServer) {
await server(); await server(workerRegistry);
} else if (envOption.onlyQueue) { } else if (envOption.onlyQueue) {
await jobQueue(); await jobQueue();
} else { } else {
await server(); await server(workerRegistry);
await jobQueue(); await jobQueue();
} }
} else { } else {
@ -136,7 +129,7 @@ export async function masterMain() {
} else if (envOption.onlyQueue) { } else if (envOption.onlyQueue) {
// nop // nop
} else { } else {
await server(); await server(workerRegistry);
} }
await spawnWorkers(config.clusterLimit); await spawnWorkers(config.clusterLimit);

View file

@ -4,7 +4,7 @@
*/ */
import cluster from 'node:cluster'; import cluster from 'node:cluster';
import { collectDefaultMetrics } from 'prom-client'; import { collectDefaultMetrics, AggregatorRegistry, RegistryContentType } from 'prom-client';
import * as Sentry from '@sentry/node'; import * as Sentry from '@sentry/node';
import { nodeProfilingIntegration } from '@sentry/profiling-node'; import { nodeProfilingIntegration } from '@sentry/profiling-node';
import { envOption } from '@/env.js'; import { envOption } from '@/env.js';
@ -17,12 +17,6 @@ import { jobQueue, server } from './common.js';
export async function workerMain() { export async function workerMain() {
const config = loadConfig(); const config = loadConfig();
collectDefaultMetrics({
labels: {
cluster_type: 'worker',
}
});
if (config.sentryForBackend) { if (config.sentryForBackend) {
Sentry.init({ Sentry.init({
integrations: [ integrations: [

View file

@ -18,7 +18,7 @@ import {
SystemWebhookDeliverJobData, SystemWebhookDeliverJobData,
} from '../queue/types.js'; } from '../queue/types.js';
import type { Provider } from '@nestjs/common'; import type { Provider } from '@nestjs/common';
import { mActiveJobs, mDelayedJobs, mJobReceivedCounter, mWaitingJobs } from '@/queue/metrics.js'; import { mActiveJobs, mDelayedJobs, mJobBlockedCounter, mWaitingJobs } from '@/queue/metrics.js';
export type SystemQueue = Bull.Queue<Record<string, unknown>>; export type SystemQueue = Bull.Queue<Record<string, unknown>>;
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>; export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
@ -39,7 +39,7 @@ function withMetrics<T>(queue: Bull.Queue<T>): Bull.Queue<T> {
}, 2000); }, 2000);
queue.on('waiting', () => { queue.on('waiting', () => {
mJobReceivedCounter?.inc({ queue: queue.name }); mJobBlockedCounter?.inc({ queue: queue.name });
}); });
} }

View file

@ -1,8 +1,8 @@
import { metricCounter, metricGauge } from '@/server/api/MetricsService.js'; import { metricCounter, metricGauge } from '@/server/api/MetricsService.js';
export const mJobReceivedCounter = metricCounter({ export const mJobBlockedCounter = metricCounter({
name: 'misskey_queue_jobs_received_total', name: 'misskey_queue_jobs_blocked_total',
help: 'Total number of jobs received by queue', help: 'Total number of jobs waiting for a worker',
labelNames: ['queue'], labelNames: ['queue'],
}); });

View file

@ -30,9 +30,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { MiNote } from '@/models/Note.js'; import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js'; import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js'; import { DI } from '@/di-symbols.js';
import { metricCounter, metricHistogram } from '@/server/api/MetricsService.js';
import { QueueLoggerService } from '../QueueLoggerService.js'; import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js'; import type { InboxJobData } from '../types.js';
import { metricCounter, metricHistogram } from '@/server/api/MetricsService.js';
type UpdateInstanceJob = { type UpdateInstanceJob = {
latestRequestReceivedAt: Date, latestRequestReceivedAt: Date,

View file

@ -237,7 +237,9 @@ export class ServerService implements OnApplicationShutdown {
}, duration / 1000); }, duration / 1000);
} }
if (logPath === '/metrics' || logPath === '/healthz') { const addSlash = logPath + (logPath.endsWith('/') ? '' : '/');
if (addSlash.startsWith('/metrics/') || addSlash.startsWith('/healthz/')) {
done(); done();
return; return;
} }

View file

@ -36,11 +36,17 @@ export function metricHistogram<K extends string>(conf: prom.HistogramConfigurat
@Injectable() @Injectable()
export class MetricsService { export class MetricsService {
private workerRegistry: prom.AggregatorRegistry<prom.PrometheusContentType> | null = null;
constructor( constructor(
@Inject(DI.config) @Inject(DI.config)
private config: Config private config: Config,
) {} ) {}
@bindThis
public setWorkerRegistry(workerRegistry: prom.AggregatorRegistry<prom.PrometheusContentType>) {
this.workerRegistry = workerRegistry;
}
@bindThis @bindThis
public createServer(fastify: FastifyInstance, options: FastifyPluginOptions, done: (err?: Error) => void) { public createServer(fastify: FastifyInstance, options: FastifyPluginOptions, done: (err?: Error) => void) {
if (this.config.prometheusMetrics?.enable) { if (this.config.prometheusMetrics?.enable) {
@ -69,6 +75,36 @@ export class MetricsService {
reply.code(500); reply.code(500);
} }
}); });
fastify.get('/metrics/cluster', async (request, reply) => {
if (token) {
const bearer = request.headers.authorization;
if (!bearer) {
reply.code(401);
return;
}
const [type, t] = bearer.split(' ');
if (type !== 'Bearer' || t !== token) {
reply.code(403);
return;
}
}
if (!this.workerRegistry) {
reply.code(404);
return;
}
try {
reply.header('Content-Type', this.workerRegistry.contentType);
reply.send(await this.workerRegistry.clusterMetrics());
} catch (err) {
reply.code(500);
}
});
} }
done(); done();

View file

@ -11,7 +11,21 @@ prometheus.scrape "yumechinokuni" {
__scheme__ = "https", __scheme__ = "https",
environment = "test", environment = "test",
hostname = "test0.mi.yumechi.jp", hostname = "test0.mi.yumechi.jp",
} },
{
__metrics_path__ = "/metrics/cluster",
__address__ = "mi.yumechi.jp:443",
__scheme__ = "https",
environment = "prod",
hostname = "mi.yumechi.jp",
},
{
__metrics_path__ = "/metrics/cluster",
__address__ = "test0.mi.yumechi.jp:443",
__scheme__ = "https",
environment = "test",
hostname = "test0.mi.yumechi.jp",
},
] ]
forward_to = [prometheus.remote_write.mihari.receiver] forward_to = [prometheus.remote_write.mihari.receiver]