add metrics endpoint

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
This commit is contained in:
ゆめ 2024-11-20 05:44:08 -06:00
parent 5df98f8f05
commit 02f4c6c10b
No known key found for this signature in database
3 changed files with 169 additions and 4 deletions

28
Cargo.lock generated
View file

@ -1276,6 +1276,12 @@ dependencies = [
"smallvec", "smallvec",
] ]
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.164" version = "0.2.164"
@ -1713,6 +1719,27 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "prometheus"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]] [[package]]
name = "psm" name = "psm"
version = "0.1.24" version = "0.1.24"
@ -3230,6 +3257,7 @@ dependencies = [
"libc", "libc",
"log", "log",
"lru", "lru",
"prometheus",
"quote", "quote",
"reqwest", "reqwest",
"resvg", "resvg",

View file

@ -37,10 +37,12 @@ env-local = ["axum/http1", "axum/http2",
"image/ico", "image/ico",
"lossy-webp", "lossy-webp",
"tower-http", "tower-http",
"metrics",
"svg-text", "resvg/system-fonts", "resvg/raster-images", "fontdb/fontconfig" "svg-text", "resvg/system-fonts", "resvg/raster-images", "fontdb/fontconfig"
] ]
reuse-port = [] reuse-port = []
cf-worker = ["dep:worker", "dep:worker-macros"] metrics = ["prometheus"]
cf-worker = ["dep:worker", "dep:worker-macros", "dep:wasm-bindgen"]
cf-worker-paid = ["cf-worker", "resvg/raster-images", "resvg/text", "image/ico", "panic-console-error"] cf-worker-paid = ["cf-worker", "resvg/raster-images", "resvg/text", "image/ico", "panic-console-error"]
panic-console-error = ["dep:console_error_panic_hook"] panic-console-error = ["dep:console_error_panic_hook"]
apparmor = ["dep:siphasher", "dep:libc"] apparmor = ["dep:siphasher", "dep:libc"]
@ -52,6 +54,7 @@ governor = ["dep:governor"]
axum-server = ["dep:axum-server", "tower-http"] axum-server = ["dep:axum-server", "tower-http"]
lossy-webp = ["dep:webp"] lossy-webp = ["dep:webp"]
tower-http = ["dep:tower-http"] tower-http = ["dep:tower-http"]
prometheus = ["dep:prometheus"]
[dependencies] [dependencies]
worker = { version="0.4.2", features=['http', 'axum'], optional = true } worker = { version="0.4.2", features=['http', 'axum'], optional = true }
@ -74,7 +77,7 @@ governor = { version = "0.7.0", features = ["dashmap"], optional = true }
resvg = { version = "0.44.0", default-features = false, features = ["gif", "image-webp"] } resvg = { version = "0.44.0", default-features = false, features = ["gif", "image-webp"] }
thiserror = "2.0" thiserror = "2.0"
serde_json = "1" serde_json = "1"
wasm-bindgen = { version = "0.2" } wasm-bindgen = { version = "0.2", optional = true }
libc = { version = "0.2.162", optional = true } libc = { version = "0.2.162", optional = true }
axum-server = { version = "0.7.1", optional = true } axum-server = { version = "0.7.1", optional = true }
fontdb = { version = "0.23", optional = true } fontdb = { version = "0.23", optional = true }
@ -83,6 +86,7 @@ url = { version = "2", optional = true }
tower-http = { version = "0.6.2", features = ["catch-panic", "timeout"], optional = true } tower-http = { version = "0.6.2", features = ["catch-panic", "timeout"], optional = true }
dashmap = "6.1.0" dashmap = "6.1.0"
lru = "0.12.5" lru = "0.12.5"
prometheus = { version = "0.13.4", optional = true }
[patch.crates-io] [patch.crates-io]
# licensing and webp dependencies # licensing and webp dependencies

View file

@ -1,7 +1,16 @@
use std::{ffi::c_int, net::SocketAddr, os::fd::AsRawFd, time::Duration}; use std::{convert::Infallible, ffi::c_int, net::SocketAddr, os::fd::AsRawFd, time::Duration};
use axum::{
http::HeaderMap,
middleware::{self, Next},
};
use clap::Parser; use clap::Parser;
use futures::TryFutureExt;
use prometheus::core::{GenericCounterVec, GenericGauge};
#[cfg(feature = "metrics")]
use prometheus::{core::AtomicU64, Opts};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tower_service::Service;
use yumechi_no_kuni_proxy_worker::{ use yumechi_no_kuni_proxy_worker::{
config::{Config, SandboxConfig}, config::{Config, SandboxConfig},
router, router,
@ -63,7 +72,8 @@ fn main() {
#[cfg(feature = "apparmor")] #[cfg(feature = "apparmor")]
let mut serve_profile = None; let mut serve_profile = None;
let router = match config.sandbox { #[allow(unused_mut)]
let mut router = match config.sandbox {
#[cfg(feature = "apparmor")] #[cfg(feature = "apparmor")]
SandboxConfig::AppArmor(ref prof) => { SandboxConfig::AppArmor(ref prof) => {
serve_profile = Some(prof.serve.clone()); serve_profile = Some(prof.serve.clone());
@ -78,6 +88,85 @@ fn main() {
_ => panic!("Unsupported sandbox configuration, did you forget to enable the feature?"), _ => panic!("Unsupported sandbox configuration, did you forget to enable the feature?"),
}; };
#[cfg(feature = "metrics")]
{
let reg = prometheus::default_registry();
let counter_requests_served = GenericCounterVec::<AtomicU64>::new(
Opts::new(
"misskey_media_proxy_requests_served",
"Number of requests served",
),
&["status", "postprocess"],
)
.expect("Failed to create prometheus counter");
reg.register(Box::new(counter_requests_served.clone()))
.expect("Failed to register prometheus counter");
router = router
.layer(middleware::from_fn(move |req, mut next: Next| {
let counter_requests_served = counter_requests_served.clone();
next.call(req)
.map_ok(move |res| {
let status = match res.status().as_u16() {
200 => "200",
201 => "201",
202 => "202",
204 => "204",
400 => "400",
401 => "401",
403 => "403",
404 => "404",
429 => "429",
500 => "500",
502 => "502",
503 => "503",
504 => "504",
100..=199 => "1xx",
200..=299 => "2xx",
300..=399 => "3xx",
400..=499 => "4xx",
500..=599 => "5xx",
_ => "oob",
};
let post_process = res
.headers()
.get("x-proxy-post-process")
.map(|v| v.to_str().unwrap_or("unknown"))
.unwrap_or("unknown");
counter_requests_served
.with_label_values(&[&status, post_process])
.inc();
res
})
.map_err(|e: Infallible| e)
}))
.route(
"/metrics",
axum::routing::get(|| async {
use prometheus::Encoder;
let reg = prometheus::default_registry();
let enc = prometheus::TextEncoder::new();
let metric_families = reg.gather();
let mut buffer = Vec::new();
enc.encode(&metric_families, &mut buffer)
.expect("Failed to encode metrics");
let mut hdr = HeaderMap::new();
hdr.insert("Content-Type", enc.format_type().try_into().unwrap());
(hdr, buffer)
}),
);
}
let ms = router.into_make_service_with_connect_info::<SocketAddr>(); let ms = router.into_make_service_with_connect_info::<SocketAddr>();
let listener = std::net::TcpListener::bind(&listen).expect("Failed to bind listener"); let listener = std::net::TcpListener::bind(&listen).expect("Failed to bind listener");
@ -151,6 +240,50 @@ fn main() {
return; return;
} }
{
let reg = prometheus::default_registry();
let counter_alive_tasks = GenericGauge::<AtomicU64>::with_opts(Opts::new(
"misskey_media_proxy_tokio_alive_tasks",
"Number of alive tasks",
))
.expect("Failed to create prometheus gauge");
let counter_num_worker_threads = GenericGauge::<AtomicU64>::with_opts(Opts::new(
"misskey_media_proxy_tokio_num_worker_threads",
"Number of worker threads",
))
.expect("Failed to create prometheus gauge");
let counter_global_queue_depth = GenericGauge::<AtomicU64>::with_opts(Opts::new(
"misskey_media_proxy_tokio_global_queue_depth",
"Global queue depth",
))
.expect("Failed to create prometheus gauge");
reg.register(Box::new(counter_alive_tasks.clone()))
.expect("Failed to register prometheus counter");
reg.register(Box::new(counter_num_worker_threads.clone()))
.expect("Failed to register prometheus counter");
reg.register(Box::new(counter_global_queue_depth.clone()))
.expect("Failed to register prometheus counter");
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
let metrics = tokio::runtime::Handle::current().metrics();
counter_alive_tasks.set(metrics.num_alive_tasks() as _);
counter_num_worker_threads.set(metrics.num_workers() as _);
counter_global_queue_depth.set(metrics.global_queue_depth() as _);
}
});
}
#[cfg(feature = "axum-server")] #[cfg(feature = "axum-server")]
{ {
let handle = axum_server::Handle::new(); let handle = axum_server::Handle::new();