Merge pull request 'prometheus - stage 2 workers' (#22) from develop into master
Some checks failed
Publish Docker image / Build (push) Successful in 4m35s
Lint / pnpm_install (push) Successful in 1m38s
Test (production install and build) / production (22.11.0) (push) Successful in 1m1s
Test (backend) / unit (22.11.0) (push) Successful in 7m45s
Test (backend) / e2e (22.11.0) (push) Successful in 11m38s
Lint / lint (backend) (push) Successful in 2m32s
Lint / lint (frontend-embed) (push) Successful in 2m38s
Lint / lint (frontend) (push) Successful in 2m52s
Lint / lint (misskey-bubble-game) (push) Failing after 1m42s
Lint / lint (frontend-shared) (push) Successful in 2m49s
Lint / lint (misskey-js) (push) Successful in 2m27s
Lint / lint (misskey-reversi) (push) Successful in 2m23s
Lint / lint (sw) (push) Successful in 2m23s
Lint / typecheck (backend) (push) Successful in 2m9s
Lint / typecheck (misskey-js) (push) Successful in 1m32s
Lint / typecheck (sw) (push) Successful in 1m37s
Some checks failed
Publish Docker image / Build (push) Successful in 4m35s
Lint / pnpm_install (push) Successful in 1m38s
Test (production install and build) / production (22.11.0) (push) Successful in 1m1s
Test (backend) / unit (22.11.0) (push) Successful in 7m45s
Test (backend) / e2e (22.11.0) (push) Successful in 11m38s
Lint / lint (backend) (push) Successful in 2m32s
Lint / lint (frontend-embed) (push) Successful in 2m38s
Lint / lint (frontend) (push) Successful in 2m52s
Lint / lint (misskey-bubble-game) (push) Failing after 1m42s
Lint / lint (frontend-shared) (push) Successful in 2m49s
Lint / lint (misskey-js) (push) Successful in 2m27s
Lint / lint (misskey-reversi) (push) Successful in 2m23s
Lint / lint (sw) (push) Successful in 2m23s
Lint / typecheck (backend) (push) Successful in 2m9s
Lint / typecheck (misskey-js) (push) Successful in 1m32s
Lint / typecheck (sw) (push) Successful in 1m37s
Reviewed-on: #22
This commit is contained in:
commit
d7080be847
11 changed files with 84 additions and 30 deletions
|
@ -32,7 +32,7 @@
|
||||||
"check:connect": "cd packages/backend && pnpm check:connect",
|
"check:connect": "cd packages/backend && pnpm check:connect",
|
||||||
"migrateandstart": "pnpm migrate && pnpm start",
|
"migrateandstart": "pnpm migrate && pnpm start",
|
||||||
"watch": "pnpm dev",
|
"watch": "pnpm dev",
|
||||||
"dev": "node scripts/dev.mjs",
|
"dev": "cross-env RUN_MODE=dev node scripts/dev.mjs",
|
||||||
"lint": "pnpm -r lint",
|
"lint": "pnpm -r lint",
|
||||||
"cy:open": "pnpm cypress open --browser --e2e --config-file=cypress.config.ts",
|
"cy:open": "pnpm cypress open --browser --e2e --config-file=cypress.config.ts",
|
||||||
"cy:run": "pnpm cypress run",
|
"cy:run": "pnpm cypress run",
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { NestFactory } from '@nestjs/core';
|
import { NestFactory } from '@nestjs/core';
|
||||||
|
import * as prom from 'prom-client';
|
||||||
import { ChartManagementService } from '@/core/chart/ChartManagementService.js';
|
import { ChartManagementService } from '@/core/chart/ChartManagementService.js';
|
||||||
import { QueueProcessorService } from '@/queue/QueueProcessorService.js';
|
import { QueueProcessorService } from '@/queue/QueueProcessorService.js';
|
||||||
import { NestLogger } from '@/NestLogger.js';
|
import { NestLogger } from '@/NestLogger.js';
|
||||||
|
@ -12,8 +13,9 @@ import { QueueStatsService } from '@/daemons/QueueStatsService.js';
|
||||||
import { ServerStatsService } from '@/daemons/ServerStatsService.js';
|
import { ServerStatsService } from '@/daemons/ServerStatsService.js';
|
||||||
import { ServerService } from '@/server/ServerService.js';
|
import { ServerService } from '@/server/ServerService.js';
|
||||||
import { MainModule } from '@/MainModule.js';
|
import { MainModule } from '@/MainModule.js';
|
||||||
|
import { MetricsService } from '@/server/api/MetricsService.js';
|
||||||
|
|
||||||
export async function server() {
|
export async function server(workerRegistry?: prom.AggregatorRegistry<prom.PrometheusContentType>) {
|
||||||
const app = await NestFactory.createApplicationContext(MainModule, {
|
const app = await NestFactory.createApplicationContext(MainModule, {
|
||||||
logger: new NestLogger(),
|
logger: new NestLogger(),
|
||||||
});
|
});
|
||||||
|
@ -22,6 +24,9 @@ export async function server() {
|
||||||
await serverService.launch();
|
await serverService.launch();
|
||||||
|
|
||||||
if (process.env.NODE_ENV !== 'test') {
|
if (process.env.NODE_ENV !== 'test') {
|
||||||
|
if (workerRegistry) {
|
||||||
|
app.get(MetricsService).setWorkerRegistry(workerRegistry);
|
||||||
|
}
|
||||||
app.get(ChartManagementService).start();
|
app.get(ChartManagementService).start();
|
||||||
app.get(QueueStatsService).start();
|
app.get(QueueStatsService).start();
|
||||||
app.get(ServerStatsService).start();
|
app.get(ServerStatsService).start();
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import cluster from 'node:cluster';
|
import cluster from 'node:cluster';
|
||||||
|
import * as prom from 'prom-client';
|
||||||
import { EventEmitter } from 'node:events';
|
import { EventEmitter } from 'node:events';
|
||||||
import chalk from 'chalk';
|
import chalk from 'chalk';
|
||||||
import Xev from 'xev';
|
import Xev from 'xev';
|
||||||
|
@ -17,6 +18,15 @@ import { masterMain } from './master.js';
|
||||||
import { workerMain } from './worker.js';
|
import { workerMain } from './worker.js';
|
||||||
import { readyRef } from './ready.js';
|
import { readyRef } from './ready.js';
|
||||||
|
|
||||||
|
const workerRegistry = new prom.AggregatorRegistry<prom.PrometheusContentType>();
|
||||||
|
|
||||||
|
prom.collectDefaultMetrics({
|
||||||
|
labels: {
|
||||||
|
cluster_type: `${cluster.isPrimary ? 'master' : 'worker'}`,
|
||||||
|
worker_id: cluster.worker?.id.toString() || 'none'
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
import 'reflect-metadata';
|
import 'reflect-metadata';
|
||||||
|
|
||||||
process.title = `Misskey (${cluster.isPrimary ? 'master' : 'worker'})`;
|
process.title = `Misskey (${cluster.isPrimary ? 'master' : 'worker'})`;
|
||||||
|
@ -69,7 +79,7 @@ process.on('exit', code => {
|
||||||
//#endregion
|
//#endregion
|
||||||
|
|
||||||
if (cluster.isPrimary || envOption.disableClustering) {
|
if (cluster.isPrimary || envOption.disableClustering) {
|
||||||
await masterMain();
|
await masterMain(workerRegistry);
|
||||||
|
|
||||||
if (cluster.isPrimary) {
|
if (cluster.isPrimary) {
|
||||||
ev.mount();
|
ev.mount();
|
||||||
|
|
|
@ -11,7 +11,6 @@ import * as prom from 'prom-client';
|
||||||
import cluster from 'node:cluster';
|
import cluster from 'node:cluster';
|
||||||
import chalk from 'chalk';
|
import chalk from 'chalk';
|
||||||
import chalkTemplate from 'chalk-template';
|
import chalkTemplate from 'chalk-template';
|
||||||
import { collectDefaultMetrics } from 'prom-client';
|
|
||||||
import * as Sentry from '@sentry/node';
|
import * as Sentry from '@sentry/node';
|
||||||
import { nodeProfilingIntegration } from '@sentry/profiling-node';
|
import { nodeProfilingIntegration } from '@sentry/profiling-node';
|
||||||
import Logger from '@/logger.js';
|
import Logger from '@/logger.js';
|
||||||
|
@ -75,7 +74,7 @@ function greet() {
|
||||||
/**
|
/**
|
||||||
* Init master process
|
* Init master process
|
||||||
*/
|
*/
|
||||||
export async function masterMain() {
|
export async function masterMain(workerRegistry?: prom.AggregatorRegistry<prom.PrometheusContentType>) {
|
||||||
let config!: Config;
|
let config!: Config;
|
||||||
|
|
||||||
// initialize app
|
// initialize app
|
||||||
|
@ -86,12 +85,6 @@ export async function masterMain() {
|
||||||
showNodejsVersion();
|
showNodejsVersion();
|
||||||
config = loadConfigBoot();
|
config = loadConfigBoot();
|
||||||
|
|
||||||
collectDefaultMetrics({
|
|
||||||
labels: {
|
|
||||||
cluster_type: 'master',
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
//await connectDb();
|
//await connectDb();
|
||||||
if (config.pidFile) fs.writeFileSync(config.pidFile, process.pid.toString());
|
if (config.pidFile) fs.writeFileSync(config.pidFile, process.pid.toString());
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
|
@ -123,11 +116,11 @@ export async function masterMain() {
|
||||||
|
|
||||||
if (envOption.disableClustering) {
|
if (envOption.disableClustering) {
|
||||||
if (envOption.onlyServer) {
|
if (envOption.onlyServer) {
|
||||||
await server();
|
await server(workerRegistry);
|
||||||
} else if (envOption.onlyQueue) {
|
} else if (envOption.onlyQueue) {
|
||||||
await jobQueue();
|
await jobQueue();
|
||||||
} else {
|
} else {
|
||||||
await server();
|
await server(workerRegistry);
|
||||||
await jobQueue();
|
await jobQueue();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -136,7 +129,7 @@ export async function masterMain() {
|
||||||
} else if (envOption.onlyQueue) {
|
} else if (envOption.onlyQueue) {
|
||||||
// nop
|
// nop
|
||||||
} else {
|
} else {
|
||||||
await server();
|
await server(workerRegistry);
|
||||||
}
|
}
|
||||||
|
|
||||||
await spawnWorkers(config.clusterLimit);
|
await spawnWorkers(config.clusterLimit);
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import cluster from 'node:cluster';
|
import cluster from 'node:cluster';
|
||||||
import { collectDefaultMetrics } from 'prom-client';
|
import { collectDefaultMetrics, AggregatorRegistry, RegistryContentType } from 'prom-client';
|
||||||
import * as Sentry from '@sentry/node';
|
import * as Sentry from '@sentry/node';
|
||||||
import { nodeProfilingIntegration } from '@sentry/profiling-node';
|
import { nodeProfilingIntegration } from '@sentry/profiling-node';
|
||||||
import { envOption } from '@/env.js';
|
import { envOption } from '@/env.js';
|
||||||
|
@ -17,12 +17,6 @@ import { jobQueue, server } from './common.js';
|
||||||
export async function workerMain() {
|
export async function workerMain() {
|
||||||
const config = loadConfig();
|
const config = loadConfig();
|
||||||
|
|
||||||
collectDefaultMetrics({
|
|
||||||
labels: {
|
|
||||||
cluster_type: 'worker',
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (config.sentryForBackend) {
|
if (config.sentryForBackend) {
|
||||||
Sentry.init({
|
Sentry.init({
|
||||||
integrations: [
|
integrations: [
|
||||||
|
|
|
@ -18,7 +18,7 @@ import {
|
||||||
SystemWebhookDeliverJobData,
|
SystemWebhookDeliverJobData,
|
||||||
} from '../queue/types.js';
|
} from '../queue/types.js';
|
||||||
import type { Provider } from '@nestjs/common';
|
import type { Provider } from '@nestjs/common';
|
||||||
import { mActiveJobs, mDelayedJobs, mJobReceivedCounter, mWaitingJobs } from '@/queue/metrics.js';
|
import { mActiveJobs, mDelayedJobs, mJobBlockedCounter, mWaitingJobs } from '@/queue/metrics.js';
|
||||||
|
|
||||||
export type SystemQueue = Bull.Queue<Record<string, unknown>>;
|
export type SystemQueue = Bull.Queue<Record<string, unknown>>;
|
||||||
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
|
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
|
||||||
|
@ -39,7 +39,7 @@ function withMetrics<T>(queue: Bull.Queue<T>): Bull.Queue<T> {
|
||||||
}, 2000);
|
}, 2000);
|
||||||
|
|
||||||
queue.on('waiting', () => {
|
queue.on('waiting', () => {
|
||||||
mJobReceivedCounter?.inc({ queue: queue.name });
|
mJobBlockedCounter?.inc({ queue: queue.name });
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
import { metricCounter, metricGauge } from '@/server/api/MetricsService.js';
|
import { metricCounter, metricGauge } from '@/server/api/MetricsService.js';
|
||||||
|
|
||||||
export const mJobReceivedCounter = metricCounter({
|
export const mJobBlockedCounter = metricCounter({
|
||||||
name: 'misskey_queue_jobs_received_total',
|
name: 'misskey_queue_jobs_blocked_total',
|
||||||
help: 'Total number of jobs received by queue',
|
help: 'Total number of jobs waiting for a worker',
|
||||||
labelNames: ['queue'],
|
labelNames: ['queue'],
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -30,9 +30,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js';
|
||||||
import { MiNote } from '@/models/Note.js';
|
import { MiNote } from '@/models/Note.js';
|
||||||
import { MiMeta } from '@/models/Meta.js';
|
import { MiMeta } from '@/models/Meta.js';
|
||||||
import { DI } from '@/di-symbols.js';
|
import { DI } from '@/di-symbols.js';
|
||||||
|
import { metricCounter, metricHistogram } from '@/server/api/MetricsService.js';
|
||||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||||
import type { InboxJobData } from '../types.js';
|
import type { InboxJobData } from '../types.js';
|
||||||
import { metricCounter, metricHistogram } from '@/server/api/MetricsService.js';
|
|
||||||
|
|
||||||
type UpdateInstanceJob = {
|
type UpdateInstanceJob = {
|
||||||
latestRequestReceivedAt: Date,
|
latestRequestReceivedAt: Date,
|
||||||
|
|
|
@ -237,7 +237,9 @@ export class ServerService implements OnApplicationShutdown {
|
||||||
}, duration / 1000);
|
}, duration / 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logPath === '/metrics' || logPath === '/healthz') {
|
const addSlash = logPath + (logPath.endsWith('/') ? '' : '/');
|
||||||
|
|
||||||
|
if (addSlash.startsWith('/metrics/') || addSlash.startsWith('/healthz/')) {
|
||||||
done();
|
done();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,11 +36,17 @@ export function metricHistogram<K extends string>(conf: prom.HistogramConfigurat
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class MetricsService {
|
export class MetricsService {
|
||||||
|
private workerRegistry: prom.AggregatorRegistry<prom.PrometheusContentType> | null = null;
|
||||||
constructor(
|
constructor(
|
||||||
@Inject(DI.config)
|
@Inject(DI.config)
|
||||||
private config: Config
|
private config: Config,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public setWorkerRegistry(workerRegistry: prom.AggregatorRegistry<prom.PrometheusContentType>) {
|
||||||
|
this.workerRegistry = workerRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
public createServer(fastify: FastifyInstance, options: FastifyPluginOptions, done: (err?: Error) => void) {
|
public createServer(fastify: FastifyInstance, options: FastifyPluginOptions, done: (err?: Error) => void) {
|
||||||
if (this.config.prometheusMetrics?.enable) {
|
if (this.config.prometheusMetrics?.enable) {
|
||||||
|
@ -69,6 +75,36 @@ export class MetricsService {
|
||||||
reply.code(500);
|
reply.code(500);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
fastify.get('/metrics/cluster', async (request, reply) => {
|
||||||
|
if (token) {
|
||||||
|
const bearer = request.headers.authorization;
|
||||||
|
|
||||||
|
if (!bearer) {
|
||||||
|
reply.code(401);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const [type, t] = bearer.split(' ');
|
||||||
|
|
||||||
|
if (type !== 'Bearer' || t !== token) {
|
||||||
|
reply.code(403);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.workerRegistry) {
|
||||||
|
reply.code(404);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
reply.header('Content-Type', this.workerRegistry.contentType);
|
||||||
|
reply.send(await this.workerRegistry.clusterMetrics());
|
||||||
|
} catch (err) {
|
||||||
|
reply.code(500);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
done();
|
done();
|
||||||
|
|
|
@ -11,7 +11,21 @@ prometheus.scrape "yumechinokuni" {
|
||||||
__scheme__ = "https",
|
__scheme__ = "https",
|
||||||
environment = "test",
|
environment = "test",
|
||||||
hostname = "test0.mi.yumechi.jp",
|
hostname = "test0.mi.yumechi.jp",
|
||||||
}
|
},
|
||||||
|
{
|
||||||
|
__metrics_path__ = "/metrics/cluster",
|
||||||
|
__address__ = "mi.yumechi.jp:443",
|
||||||
|
__scheme__ = "https",
|
||||||
|
environment = "prod",
|
||||||
|
hostname = "mi.yumechi.jp",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
__metrics_path__ = "/metrics/cluster",
|
||||||
|
__address__ = "test0.mi.yumechi.jp:443",
|
||||||
|
__scheme__ = "https",
|
||||||
|
environment = "test",
|
||||||
|
hostname = "test0.mi.yumechi.jp",
|
||||||
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
forward_to = [prometheus.remote_write.mihari.receiver]
|
forward_to = [prometheus.remote_write.mihari.receiver]
|
||||||
|
|
Loading…
Reference in a new issue