diff --git a/Cargo.lock b/Cargo.lock index d506ff1..8ccf6c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1276,6 +1276,12 @@ dependencies = [ "smallvec", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.164" @@ -1713,6 +1719,27 @@ dependencies = [ "syn", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "psm" version = "0.1.24" @@ -3230,6 +3257,7 @@ dependencies = [ "libc", "log", "lru", + "prometheus", "quote", "reqwest", "resvg", diff --git a/Cargo.toml b/Cargo.toml index 39a6e3e..6a8bac6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,10 +37,12 @@ env-local = ["axum/http1", "axum/http2", "image/ico", "lossy-webp", "tower-http", + "metrics", "svg-text", "resvg/system-fonts", "resvg/raster-images", "fontdb/fontconfig" ] reuse-port = [] -cf-worker = ["dep:worker", "dep:worker-macros"] +metrics = ["prometheus"] +cf-worker = ["dep:worker", "dep:worker-macros", "dep:wasm-bindgen"] cf-worker-paid = ["cf-worker", "resvg/raster-images", "resvg/text", "image/ico", "panic-console-error"] panic-console-error = ["dep:console_error_panic_hook"] apparmor = ["dep:siphasher", "dep:libc"] @@ -52,6 +54,7 @@ governor = ["dep:governor"] axum-server = ["dep:axum-server", "tower-http"] lossy-webp = ["dep:webp"] tower-http = ["dep:tower-http"] +prometheus = ["dep:prometheus"] [dependencies] worker = { version="0.4.2", features=['http', 'axum'], optional = true } @@ -74,7 +77,7 @@ governor = { version = "0.7.0", features = ["dashmap"], optional = true } resvg = { version = "0.44.0", default-features = false, features = ["gif", "image-webp"] } thiserror = "2.0" serde_json = "1" -wasm-bindgen = { version = "0.2" } +wasm-bindgen = { version = "0.2", optional = true } libc = { version = "0.2.162", optional = true } axum-server = { version = "0.7.1", optional = true } fontdb = { version = "0.23", optional = true } @@ -83,6 +86,7 @@ url = { version = "2", optional = true } tower-http = { version = "0.6.2", features = ["catch-panic", "timeout"], optional = true } dashmap = "6.1.0" lru = "0.12.5" +prometheus = { version = "0.13.4", optional = true } [patch.crates-io] # licensing and webp dependencies diff --git a/src/main.rs b/src/main.rs index cbbb820..4ba418a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,16 @@ -use std::{ffi::c_int, net::SocketAddr, os::fd::AsRawFd, time::Duration}; +use std::{convert::Infallible, ffi::c_int, net::SocketAddr, os::fd::AsRawFd, time::Duration}; +use axum::{ + http::HeaderMap, + middleware::{self, Next}, +}; use clap::Parser; +use futures::TryFutureExt; +use prometheus::core::{GenericCounterVec, GenericGauge}; +#[cfg(feature = "metrics")] +use prometheus::{core::AtomicU64, Opts}; use tokio::sync::mpsc; +use tower_service::Service; use yumechi_no_kuni_proxy_worker::{ config::{Config, SandboxConfig}, router, @@ -63,7 +72,8 @@ fn main() { #[cfg(feature = "apparmor")] let mut serve_profile = None; - let router = match config.sandbox { + #[allow(unused_mut)] + let mut router = match config.sandbox { #[cfg(feature = "apparmor")] SandboxConfig::AppArmor(ref prof) => { serve_profile = Some(prof.serve.clone()); @@ -78,6 +88,85 @@ fn main() { _ => panic!("Unsupported sandbox configuration, did you forget to enable the feature?"), }; + #[cfg(feature = "metrics")] + { + let reg = prometheus::default_registry(); + let counter_requests_served = GenericCounterVec::::new( + Opts::new( + "misskey_media_proxy_requests_served", + "Number of requests served", + ), + &["status", "postprocess"], + ) + .expect("Failed to create prometheus counter"); + + reg.register(Box::new(counter_requests_served.clone())) + .expect("Failed to register prometheus counter"); + + router = router + .layer(middleware::from_fn(move |req, mut next: Next| { + let counter_requests_served = counter_requests_served.clone(); + next.call(req) + .map_ok(move |res| { + let status = match res.status().as_u16() { + 200 => "200", + 201 => "201", + 202 => "202", + 204 => "204", + 400 => "400", + 401 => "401", + 403 => "403", + 404 => "404", + 429 => "429", + 500 => "500", + 502 => "502", + 503 => "503", + 504 => "504", + 100..=199 => "1xx", + 200..=299 => "2xx", + 300..=399 => "3xx", + 400..=499 => "4xx", + 500..=599 => "5xx", + _ => "oob", + }; + + let post_process = res + .headers() + .get("x-proxy-post-process") + .map(|v| v.to_str().unwrap_or("unknown")) + .unwrap_or("unknown"); + + counter_requests_served + .with_label_values(&[&status, post_process]) + .inc(); + res + }) + .map_err(|e: Infallible| e) + })) + .route( + "/metrics", + axum::routing::get(|| async { + use prometheus::Encoder; + let reg = prometheus::default_registry(); + + let enc = prometheus::TextEncoder::new(); + + let metric_families = reg.gather(); + + let mut buffer = Vec::new(); + + enc.encode(&metric_families, &mut buffer) + .expect("Failed to encode metrics"); + + let mut hdr = HeaderMap::new(); + + hdr.insert("Content-Type", enc.format_type().try_into().unwrap()); + + (hdr, buffer) + }), + ); + } + let ms = router.into_make_service_with_connect_info::(); let listener = std::net::TcpListener::bind(&listen).expect("Failed to bind listener"); @@ -151,6 +240,50 @@ fn main() { return; } + { + let reg = prometheus::default_registry(); + + let counter_alive_tasks = GenericGauge::::with_opts(Opts::new( + "misskey_media_proxy_tokio_alive_tasks", + "Number of alive tasks", + )) + .expect("Failed to create prometheus gauge"); + + let counter_num_worker_threads = GenericGauge::::with_opts(Opts::new( + "misskey_media_proxy_tokio_num_worker_threads", + "Number of worker threads", + )) + .expect("Failed to create prometheus gauge"); + + let counter_global_queue_depth = GenericGauge::::with_opts(Opts::new( + "misskey_media_proxy_tokio_global_queue_depth", + "Global queue depth", + )) + .expect("Failed to create prometheus gauge"); + + reg.register(Box::new(counter_alive_tasks.clone())) + .expect("Failed to register prometheus counter"); + + reg.register(Box::new(counter_num_worker_threads.clone())) + .expect("Failed to register prometheus counter"); + + reg.register(Box::new(counter_global_queue_depth.clone())) + .expect("Failed to register prometheus counter"); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + + loop { + interval.tick().await; + let metrics = tokio::runtime::Handle::current().metrics(); + + counter_alive_tasks.set(metrics.num_alive_tasks() as _); + counter_num_worker_threads.set(metrics.num_workers() as _); + counter_global_queue_depth.set(metrics.global_queue_depth() as _); + } + }); + } + #[cfg(feature = "axum-server")] { let handle = axum_server::Handle::new();