Add timing info

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
This commit is contained in:
ゆめ 2024-11-13 15:12:55 -06:00
parent 5ffbf3bcf8
commit a3eb9f7fb1
No known key found for this signature in database
12 changed files with 599 additions and 203 deletions

1
Cargo.lock generated
View file

@ -3156,6 +3156,7 @@ dependencies = [
"tokio", "tokio",
"toml", "toml",
"tower-service", "tower-service",
"wasm-bindgen",
"worker", "worker",
"worker-macros", "worker-macros",
] ]

View file

@ -26,13 +26,13 @@ panic = "unwind"
[features] [features]
default = [] default = []
env-local = ["axum/tokio", "axum/http1", "axum/http2", "reqwest", "tokio", "env_logger", "governor", "clap", "toml", "image/rayon"] env-local = ["axum/http1", "axum/http2", "reqwest", "tokio", "env_logger", "governor", "clap", "toml", "image/rayon"]
cf-worker = ["dep:worker", "dep:worker-macros"] cf-worker = ["dep:worker", "dep:worker-macros"]
panic-console-error = ["dep:console_error_panic_hook"] panic-console-error = ["dep:console_error_panic_hook"]
apparmor = ["dep:rand_core", "dep:siphasher"] apparmor = ["dep:rand_core", "dep:siphasher"]
reqwest = ["dep:reqwest"] reqwest = ["dep:reqwest"]
svg-text = ["resvg/text"] svg-text = ["resvg/text"]
tokio = ["dep:tokio"] tokio = ["dep:tokio", "axum/tokio"]
env_logger = ["dep:env_logger"] env_logger = ["dep:env_logger"]
governor = ["dep:governor"] governor = ["dep:governor"]
@ -44,7 +44,7 @@ tower-service = "0.3"
console_error_panic_hook = { version = "0.1.1", optional = true } console_error_panic_hook = { version = "0.1.1", optional = true }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
futures = { version = "0.3.31", default-features = false, features = ["std"] } futures = { version = "0.3.31", default-features = false, features = ["std"] }
image = { version = "0.25.5", default-features = false, features = ["avif", "bmp", "gif", "ico", "jpeg", "png", "webp"] } image = { version = "0.25.5", default-features = false, features = ["avif", "bmp", "gif", "jpeg", "png", "webp"] }
reqwest = { version = "0.12.9", features = ["brotli", "gzip", "stream", "zstd"], optional = true } reqwest = { version = "0.12.9", features = ["brotli", "gzip", "stream", "zstd"], optional = true }
rand_core = { version = "0.6.4", features = ["getrandom"], optional = true } rand_core = { version = "0.6.4", features = ["getrandom"], optional = true }
siphasher = { version = "1.0.1", optional = true } siphasher = { version = "1.0.1", optional = true }
@ -56,6 +56,8 @@ env_logger = { version = "0.11", optional = true }
governor = { version = "0.7.0", features = ["dashmap"], optional = true } governor = { version = "0.7.0", features = ["dashmap"], optional = true }
resvg = { version = "0.44.0", default-features = false, features = ["gif", "image-webp"] } resvg = { version = "0.44.0", default-features = false, features = ["gif", "image-webp"] }
thiserror = "2.0" thiserror = "2.0"
serde_json = "1"
wasm-bindgen = { version = "0.2" }
[build-dependencies] [build-dependencies]
chumsky = "0.9.3" chumsky = "0.9.3"

View file

@ -14,9 +14,9 @@ Work in progress! Currently to do:
- [X] HTTPs only mode and X-Forwarded-Proto reflection - [X] HTTPs only mode and X-Forwarded-Proto reflection
- [X] Cache-Control header - [X] Cache-Control header
- [X] Rate-limiting on local deployment (untested) - [X] Rate-limiting on local deployment (untested)
- [ ] Read config from Cloudflare - [X] Read config from Cloudflare
- [ ] Handle all possible panics - [ ] Handle all possible panics reported by Clippy
- [ ] Sandboxing the image rendering
## Demo ## Demo

164
src/config.rs Normal file
View file

@ -0,0 +1,164 @@
#[cfg(feature = "governor")]
use std::num::NonZero;
#[cfg(feature = "cf-worker")]
#[allow(unsafe_code)]
mod json {
use wasm_bindgen::prelude::*;
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = JSON)]
pub fn stringify(value: &JsValue) -> String;
}
}
/// Application configuration
#[derive(Debug, Clone, serde::Deserialize)]
pub struct Config {
/// The listen address
pub listen: Option<String>,
/// Send Cache-Control headers
pub enable_cache: bool,
/// Index page configuration
pub index_redirect: IndexConfig,
/// Whether to only serve media with a known safe signature
pub allow_unknown: bool,
/// Fetch configuration
pub fetch: FetchConfig,
/// Post-processing configuration
pub post_process: PostProcessConfig,
#[cfg(feature = "governor")]
/// Governor configuration
pub rate_limit: RateLimitConfig,
}
/// Governor configuration
#[cfg(feature = "governor")]
#[derive(Debug, Clone, serde::Deserialize)]
pub struct RateLimitConfig {
/// The rate limit replenish interval in milliseconds
pub replenish_every: u64,
/// The rate limit burst size
pub burst: NonZero<u32>,
}
#[cfg(feature = "cf-worker")]
#[derive(Debug, thiserror::Error)]
/// Configuration error
pub enum CfConfigError {
#[error("Failed to convert env")]
/// Failed to convert the environment to a JS object
EnvConversion,
#[error("Failed to parse JSON")]
/// Failed to parse JSON
JsonParse(#[from] serde_json::Error),
}
#[cfg(feature = "cf-worker")]
impl Config {
/// Load the configuration from the Cloudflare Worker environment
pub fn load_from_cf_env(env: worker::Env) -> Result<Self, CfConfigError> {
use wasm_bindgen::JsCast;
let obj = env.dyn_into().map_err(|_| CfConfigError::EnvConversion)?;
let json = json::stringify(&obj);
serde_json::from_str(&json).map_err(CfConfigError::JsonParse)
}
}
impl Default for Config {
fn default() -> Self {
Config {
listen: Some("127.0.0.1:3000".to_string()),
enable_cache: false,
fetch: FetchConfig {
allow_http: false,
via: concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")).to_string(),
user_agent: concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"))
.to_string(),
},
index_redirect: IndexConfig::Message(format!(
"Welcome to {}",
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")),
)),
allow_unknown: false,
post_process: PostProcessConfig {
enable_redirects: false,
normalization: NormalizationPolicy::Opportunistic,
allow_svg_passthrough: false,
},
#[cfg(feature = "governor")]
rate_limit: RateLimitConfig {
replenish_every: 2000,
burst: NonZero::new(32).unwrap(),
},
}
}
}
/// Fetch configuration
#[derive(Debug, Clone, serde::Deserialize)]
pub struct FetchConfig {
/// Whether to allow HTTP requests
pub allow_http: bool,
/// The via string to use when fetching media
pub via: String,
/// The user agent to use when fetching media
pub user_agent: String,
}
/// Post-processing configuration
#[derive(Debug, Clone, serde::Deserialize)]
pub struct PostProcessConfig {
/// Opportunistically redirect to the original URL if the media is not modified
///
/// Potentially leaks the user's IP address and other metadata
pub enable_redirects: bool,
/// Whether to normalize media files when the request specifically asks for a format
pub normalization: NormalizationPolicy,
/// Whether to allow SVG passthrough
///
/// This opens up the possibility of SVG-based attacks
pub allow_svg_passthrough: bool,
}
/// Normalization policy
#[derive(Copy, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NormalizationPolicy {
/// Ignore the requested format and return the original
Never,
/// Don't convert the media if we don't have to
///
///
/// This is the default for Cloudflare Workers
Lazy,
/// Only return the requested format if other conversions are necessary
///
/// This is the default for local environments
Opportunistic,
/// Always make an attempt to return the requested format
Aggressive,
}
impl Default for NormalizationPolicy {
fn default() -> Self {
Self::Opportunistic
}
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(untagged)]
/// Index page configuration
pub enum IndexConfig {
/// Redirect to a URL
#[allow(missing_docs)]
Redirect { permanent: bool, url: String },
/// Display a message
Message(String),
}

View file

@ -1 +0,0 @@

View file

@ -12,6 +12,7 @@ pub const DEFAULT_MAX_REDIRECTS: usize = 6;
/// Some context about the request for writing response and logging /// Some context about the request for writing response and logging
#[allow(missing_docs)] #[allow(missing_docs)]
pub struct RequestCtx<'a> { pub struct RequestCtx<'a> {
pub time_to_body: std::time::Duration,
pub url: &'a str, pub url: &'a str,
pub secure: bool, pub secure: bool,
} }
@ -198,7 +199,10 @@ pub mod reqwest {
} }
/// Response from Reqwest /// Response from Reqwest
pub struct ReqwestResponse(::reqwest::Response); pub struct ReqwestResponse {
time_to_body: std::time::Duration,
resp: ::reqwest::Response,
}
impl HTTPResponse for ReqwestResponse { impl HTTPResponse for ReqwestResponse {
type Bytes = Bytes; type Bytes = Bytes;
@ -208,17 +212,18 @@ pub mod reqwest {
fn request(&self) -> RequestCtx<'_> { fn request(&self) -> RequestCtx<'_> {
RequestCtx { RequestCtx {
url: self.0.url().as_str(), time_to_body: self.time_to_body,
secure: self.0.url().scheme().eq_ignore_ascii_case("https"), url: self.resp.url().as_str(),
secure: self.resp.url().scheme().eq_ignore_ascii_case("https"),
} }
} }
fn status(&self) -> u16 { fn status(&self) -> u16 {
self.0.status().as_u16() self.resp.status().as_u16()
} }
fn header_one<'a>(&'a self, name: &str) -> Result<Option<Cow<'a, str>>, ErrorResponse> { fn header_one<'a>(&'a self, name: &str) -> Result<Option<Cow<'a, str>>, ErrorResponse> {
self.0 self.resp
.headers() .headers()
.get(name) .get(name)
.map(|v| v.to_str().map(Cow::Borrowed)) .map(|v| v.to_str().map(Cow::Borrowed))
@ -227,7 +232,7 @@ pub mod reqwest {
} }
fn header_walk<F: FnMut(&str, &str) -> bool>(&self, mut f: F) { fn header_walk<F: FnMut(&str, &str) -> bool>(&self, mut f: F) {
for (name, value) in self.0.headers() { for (name, value) in self.resp.headers() {
if !f(name.as_str(), value.to_str().unwrap_or_default()) { if !f(name.as_str(), value.to_str().unwrap_or_default()) {
break; break;
} }
@ -235,14 +240,14 @@ pub mod reqwest {
} }
fn header_collect(&self, out: &mut HeaderMap) -> Result<(), ErrorResponse> { fn header_collect(&self, out: &mut HeaderMap) -> Result<(), ErrorResponse> {
for (name, value) in self.0.headers() { for (name, value) in self.resp.headers() {
out.insert(name, value.clone()); out.insert(name, value.clone());
} }
Ok(()) Ok(())
} }
fn body(self) -> Self::BodyStream { fn body(self) -> Self::BodyStream {
Box::pin(self.0.bytes_stream().map_err(Into::into)) Box::pin(self.resp.bytes_stream().map_err(Into::into))
} }
} }
@ -289,6 +294,8 @@ pub mod reqwest {
return Err(ErrorResponse::insecure_request()); return Err(ErrorResponse::insecure_request());
} }
let begin = crate::timing::Instant::now();
let resp = self let resp = self
.client .client
.get(url_parsed) .get(url_parsed)
@ -345,7 +352,10 @@ pub mod reqwest {
} }
} }
Ok(ReqwestResponse(resp)) Ok(ReqwestResponse {
time_to_body: begin.elapsed(),
resp,
})
} }
} }
} }
@ -416,6 +426,7 @@ pub mod cf_worker {
/// Response from Cloudflare Workers /// Response from Cloudflare Workers
pub struct CfWorkerResponse { pub struct CfWorkerResponse {
time_to_body: std::time::Duration,
resp: worker::Response, resp: worker::Response,
url: Url, url: Url,
} }
@ -426,6 +437,7 @@ pub mod cf_worker {
fn request(&self) -> RequestCtx<'_> { fn request(&self) -> RequestCtx<'_> {
RequestCtx { RequestCtx {
time_to_body: self.time_to_body,
url: self.url.as_str(), url: self.url.as_str(),
secure: self.url.scheme().eq_ignore_ascii_case("https"), secure: self.url.scheme().eq_ignore_ascii_case("https"),
} }
@ -536,6 +548,8 @@ pub mod cf_worker {
secure &= url_parsed.scheme().eq_ignore_ascii_case("http"); secure &= url_parsed.scheme().eq_ignore_ascii_case("http");
let begin = crate::timing::Instant::now();
let req = Request::new_with_init(url, init)?; let req = Request::new_with_init(url, init)?;
let abc = AbortController::default(); let abc = AbortController::default();
@ -590,6 +604,7 @@ pub mod cf_worker {
} }
Ok(CfWorkerResponse { Ok(CfWorkerResponse {
time_to_body: begin.elapsed(),
resp, resp,
url: url_parsed, url: url_parsed,
}) })

View file

@ -4,9 +4,9 @@
#![warn(missing_docs)] #![warn(missing_docs)]
#![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)] #![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)]
use std::{borrow::Cow, fmt::Display, marker::PhantomData, sync::Arc};
#[cfg(feature = "governor")] #[cfg(feature = "governor")]
use std::{net::SocketAddr, num::NonZero}; use std::net::SocketAddr;
use std::{borrow::Cow, fmt::Display, marker::PhantomData, sync::Arc};
#[cfg(feature = "governor")] #[cfg(feature = "governor")]
use axum::extract::ConnectInfo; use axum::extract::ConnectInfo;
@ -31,8 +31,8 @@ use serde::Deserialize;
#[cfg(feature = "cf-worker")] #[cfg(feature = "cf-worker")]
use worker::{event, Context, Env, HttpRequest, Result as WorkerResult}; use worker::{event, Context, Env, HttpRequest, Result as WorkerResult};
/// Module for delivering the final processed media to the client use config::*;
pub mod deliver;
/// Module for fetching media from upstream /// Module for fetching media from upstream
pub mod fetch; pub mod fetch;
/// Module for post-processing media /// Module for post-processing media
@ -42,6 +42,12 @@ pub mod sandbox;
/// Stream utilities /// Stream utilities
pub mod stream; pub mod stream;
/// Configuration utilities
pub mod config;
/// Cross platform timing utilities
pub mod timing;
const MAX_SIZE: usize = 32 << 20; const MAX_SIZE: usize = 32 << 20;
#[cfg(all(not(feature = "cf-worker"), not(feature = "reqwest")))] #[cfg(all(not(feature = "cf-worker"), not(feature = "reqwest")))]
@ -55,128 +61,29 @@ pub type Upstream = crate::fetch::cf_worker::CfWorkerClient;
/// The upstream client chosen by the build configuration /// The upstream client chosen by the build configuration
pub type Upstream = crate::fetch::reqwest::ReqwestClient; pub type Upstream = crate::fetch::reqwest::ReqwestClient;
/// Application configuration #[cfg(feature = "cf-worker")]
#[derive(Debug, Clone, serde::Deserialize)] #[event(fetch)]
pub struct Config { async fn fetch(
/// The listen address req: HttpRequest,
pub listen: String, env: Env,
_ctx: Context,
) -> WorkerResult<axum::http::Response<axum::body::Body>> {
use fetch::cf_worker::CfWorkerClient;
use tower_service::Service;
/// Send Cache-Control headers let config = match Config::load_from_cf_env(env) {
pub enable_cache: bool, Ok(config) => config,
Err(e) => {
/// Index page configuration log::error!("Failed to load configuration: {}", e);
pub index_redirect: IndexConfig, return Ok(ErrorResponse::worker_config_error(e).into_response());
/// Whether to only serve media with a known safe signature
pub strictly_secure: bool,
/// Fetch configuration
pub fetch: FetchConfig,
/// Post-processing configuration
pub post_process: PostProcessConfig,
#[cfg(feature = "governor")]
/// Governor configuration
pub rate_limit: RateLimitConfig,
} }
};
/// Governor configuration #[cfg(feature = "panic-console-error")]
#[cfg(feature = "governor")] console_error_panic_hook::set_once();
#[derive(Debug, Clone, serde::Deserialize)] Ok(router::<CfWorkerClient, NoSandbox>(config)
pub struct RateLimitConfig { .call(req)
/// The rate limit replenish interval in milliseconds .await?)
pub replenish_every: u64,
/// The rate limit burst size
pub burst: NonZero<u32>,
}
impl Default for Config {
fn default() -> Self {
Config {
listen: "127.0.0.1:3000".to_string(),
enable_cache: false,
fetch: FetchConfig {
allow_http: false,
via: concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")).to_string(),
user_agent: concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"))
.to_string(),
},
index_redirect: IndexConfig::Message(format!(
"Welcome to {}",
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")),
)),
strictly_secure: true,
post_process: PostProcessConfig {
enable_redirects: false,
normalization: NormalizationPolicy::Opportunistic,
allow_svg_passthrough: false,
},
#[cfg(feature = "governor")]
rate_limit: RateLimitConfig {
replenish_every: 2000,
burst: NonZero::new(32).unwrap(),
},
}
}
}
/// Fetch configuration
#[derive(Debug, Clone, serde::Deserialize)]
pub struct FetchConfig {
/// Whether to allow HTTP requests
pub allow_http: bool,
/// The via string to use when fetching media
pub via: String,
/// The user agent to use when fetching media
pub user_agent: String,
}
/// Post-processing configuration
#[derive(Debug, Clone, serde::Deserialize)]
pub struct PostProcessConfig {
/// Opportunistically redirect to the original URL if the media is not modified
///
/// Potentially leaks the user's IP address and other metadata
pub enable_redirects: bool,
/// Whether to normalize media files when the request specifically asks for a format
pub normalization: NormalizationPolicy,
/// Whether to allow SVG passthrough
///
/// This opens up the possibility of SVG-based attacks
pub allow_svg_passthrough: bool,
}
/// Normalization policy
#[derive(Copy, Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NormalizationPolicy {
/// Only return the requested format and fail if it can't be provided
Enforce,
/// Always make an attempt to return the requested format
Always,
/// Only return the requested format if the conversion does not result in significant changes
///
/// This is the default
Opportunistic,
/// Ignore the requested format and return the original
Never,
}
impl Default for NormalizationPolicy {
fn default() -> Self {
Self::Opportunistic
}
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(untagged)]
/// Index page configuration
pub enum IndexConfig {
/// Redirect to a URL
#[allow(missing_docs)]
Redirect { permanent: bool, url: String },
/// Display a message
Message(String),
} }
#[cfg(any(feature = "cf-worker", feature = "reqwest"))] #[cfg(any(feature = "cf-worker", feature = "reqwest"))]
@ -340,23 +247,6 @@ pub async fn rate_limit_middleware(
} }
} }
#[cfg(feature = "cf-worker")]
#[event(fetch)]
async fn fetch(
req: HttpRequest,
_env: Env,
_ctx: Context,
) -> WorkerResult<axum::http::Response<axum::body::Body>> {
use fetch::cf_worker::CfWorkerClient;
use tower_service::Service;
#[cfg(feature = "panic-console-error")]
console_error_panic_hook::set_once();
Ok(router::<CfWorkerClient, NoSandbox>(Config::default())
.call(req)
.await?)
}
/// Query parameters for the proxy endpoint /// Query parameters for the proxy endpoint
#[derive(Debug, Clone, serde::Deserialize)] #[derive(Debug, Clone, serde::Deserialize)]
#[allow(missing_docs)] #[allow(missing_docs)]
@ -403,12 +293,10 @@ pub struct ImageOptions {
} }
impl ImageOptions { impl ImageOptions {
/// Whether post-processing is requested /// Whether resizing is requested
#[must_use] #[must_use]
pub fn requested_postprocess(&self) -> bool { pub fn requested_resize(&self) -> bool {
self.format.is_some() self.avatar.is_some()
|| self.avatar.is_some()
|| self.static_.is_some()
|| self.preview.is_some() || self.preview.is_some()
|| self.badge.is_some() || self.badge.is_some()
|| self.emoji.is_some() || self.emoji.is_some()
@ -467,6 +355,21 @@ impl Display for ErrorResponse {
impl std::error::Error for ErrorResponse {} impl std::error::Error for ErrorResponse {}
impl ErrorResponse { impl ErrorResponse {
/// Worker is configured improperly
#[cfg(feature = "cf-worker")]
#[must_use]
pub fn worker_config_error(err: CfConfigError) -> Self {
match err {
CfConfigError::EnvConversion => Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
message: Cow::Borrowed("Failed to convert config to JS object"),
},
CfConfigError::JsonParse(e) => Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
message: format!("Failed to parse config object: {}", e).into(),
},
}
}
/// Method not allowed /// Method not allowed
#[must_use] #[must_use]
pub const fn method_not_allowed() -> Self { pub const fn method_not_allowed() -> Self {
@ -710,6 +613,7 @@ impl<C: UpstreamClient + 'static, S: Sandboxing + 'static> App<C, S> {
let media = Box::pin(MediaResponse::from_upstream_response( let media = Box::pin(MediaResponse::from_upstream_response(
resp, resp,
state.config.allow_unknown,
&state.config.post_process, &state.config.post_process,
options, options,
)) ))

View file

@ -1,7 +1,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use clap::Parser; use clap::Parser;
use yumechi_no_kuni_proxy_worker::{router, sandbox::NoSandbox, Config, Upstream}; use yumechi_no_kuni_proxy_worker::{config::Config, router, sandbox::NoSandbox, Upstream};
#[derive(Parser)] #[derive(Parser)]
struct Cli { struct Cli {
@ -19,7 +19,7 @@ async fn main() {
.expect("Failed to read config file"); .expect("Failed to read config file");
let config: Config = toml::from_str(&config_bytes).expect("Failed to parse config file"); let config: Config = toml::from_str(&config_bytes).expect("Failed to parse config file");
let listen = config.listen.clone(); let listen = config.listen.clone().expect("No listen address provided");
let router = router::<Upstream, NoSandbox>(config); let router = router::<Upstream, NoSandbox>(config);

View file

@ -36,7 +36,7 @@ pub fn postprocess_webp_image(
) -> ImageResult<Option<DynamicImage>> { ) -> ImageResult<Option<DynamicImage>> {
let dec = WebPDecoder::new(Cursor::new(data))?; let dec = WebPDecoder::new(Cursor::new(data))?;
if !dec.has_animation() { if !dec.has_animation() && opt.requested_resize() {
return Ok(Some(postprocess_static_image(data, opt)?)); return Ok(Some(postprocess_static_image(data, opt)?));
} }
@ -58,7 +58,7 @@ pub fn postprocess_webp_image(
pub fn postprocess_png_image(data: &[u8], opt: &ImageOptions) -> ImageResult<Option<DynamicImage>> { pub fn postprocess_png_image(data: &[u8], opt: &ImageOptions) -> ImageResult<Option<DynamicImage>> {
let dec = PngDecoder::new(Cursor::new(data))?; let dec = PngDecoder::new(Cursor::new(data))?;
if dec.is_apng()? { if !dec.is_apng()? && opt.requested_resize() {
return Ok(Some(postprocess_static_image(data, opt)?)); return Ok(Some(postprocess_static_image(data, opt)?));
} }

View file

@ -3,6 +3,7 @@ use std::{
pin::Pin, pin::Pin,
}; };
use crate::timing::{IntoResponseExt, WithMaybeTimingInfo, WithTimingInfo};
use axum::{ use axum::{
body::{Body, Bytes}, body::{Body, Bytes},
http::HeaderValue, http::HeaderValue,
@ -15,7 +16,9 @@ use image::{
}; };
use sniff::SniffingStream; use sniff::SniffingStream;
use crate::{fetch::HTTPResponse, ErrorResponse, ImageOptions, PostProcessConfig}; use crate::{
fetch::HTTPResponse, ErrorResponse, ImageOptions, NormalizationPolicy, PostProcessConfig,
};
const SLURP_LIMIT: usize = 32 << 20; const SLURP_LIMIT: usize = 32 << 20;
const MTU_BUFFER_SIZE: usize = 8192; const MTU_BUFFER_SIZE: usize = 8192;
@ -47,6 +50,22 @@ where
PassThru(PassThru<R>), PassThru(PassThru<R>),
} }
#[cfg(feature = "tokio")]
macro_rules! run_blocking {
($($tt:tt)*) => {
tokio::task::block_in_place(|| {
$($tt)*
})
};
}
#[cfg(not(feature = "tokio"))]
macro_rules! run_blocking {
($($tt:tt)*) => {
$($tt)*
}
}
impl<'a, R: HTTPResponse + 'a> MediaResponse<'a, R> impl<'a, R: HTTPResponse + 'a> MediaResponse<'a, R>
where where
<R as HTTPResponse>::BodyStream: Unpin, <R as HTTPResponse>::BodyStream: Unpin,
@ -54,9 +73,17 @@ where
/// Create a new media response from a redirect /// Create a new media response from a redirect
pub async fn from_upstream_response( pub async fn from_upstream_response(
response: R, response: R,
allow_unknown: bool,
config: &PostProcessConfig, config: &PostProcessConfig,
options: ImageOptions, options: ImageOptions,
) -> Result<Self, ErrorResponse> { ) -> Result<WithTimingInfo<WithMaybeTimingInfo<WithTimingInfo<Self>>>, ErrorResponse> {
let begin = crate::timing::Instant::now();
const TIME_TO_FIRST_BYTE_KEY: &str = "fetch-first-byte";
const TIMING_KEY: &str = "post-process";
const SLURP_TIMING_KEY: &str = "slurp-data";
let ttfb = response.request().time_to_body;
let content_length = response let content_length = response
.header_one("content-length") .header_one("content-length")
.ok() .ok()
@ -72,18 +99,22 @@ where
// first if the media type is not something we can handle // first if the media type is not something we can handle
if !is_svg if !is_svg
&& (!options.requested_postprocess() && (claimed_ct.is_some_and(|ct| ct.starts_with("video/") || ct.starts_with("audio/")))
|| claimed_ct
.is_some_and(|ct| ct.starts_with("video/") || ct.starts_with("audio/")))
{ {
if config.enable_redirects if config.enable_redirects
&& options.origin != Some(true) && options.origin != Some(true)
&& content_length.map_or(false, |cl| cl > 1 << 20) && content_length.map_or(false, |cl| cl > 1 << 20)
{ {
return Ok(MediaResponse::Redirect(response.request().url.to_string())); return Ok(MediaResponse::Redirect(response.request().url.to_string())
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, response.request().time_to_body)
.with_opt_timing_info(SLURP_TIMING_KEY, None)
.with_timing_info(TIMING_KEY, begin.elapsed()));
} }
return MediaResponse::probe_then_through(response).await.map(|r| {
return MediaResponse::probe_then_through(response).await; r.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(SLURP_TIMING_KEY, None)
.with_timing_info(TIMING_KEY, begin.elapsed())
});
} }
let is_https = response.request().secure; let is_https = response.request().secure;
@ -127,14 +158,17 @@ where
buf.extend_from_slice(bytes.as_ref()); buf.extend_from_slice(bytes.as_ref());
} }
let img = image_processing::postprocess_svg_image(&buf, &options) let img = run_blocking!(image_processing::postprocess_svg_image(&buf, &options))
.map_err(|e| ErrorResponse::postprocess_failed(e.to_string().into()))?; .map_err(|e| ErrorResponse::postprocess_failed(e.to_string().into()))?;
Ok(MediaResponse::ProcessedStaticImage(StaticImage { Ok(MediaResponse::ProcessedStaticImage(StaticImage {
data: img, data: img,
format: ImageFormat::WebP, format: ImageFormat::WebP,
is_https, is_https,
})) })
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(SLURP_TIMING_KEY, None)
.with_timing_info(TIMING_KEY, begin.elapsed()))
} }
Some(rs) => { Some(rs) => {
if rs.maybe_unsafe { if rs.maybe_unsafe {
@ -142,10 +176,45 @@ where
} }
match rs.sniffed_mime { match rs.sniffed_mime {
Some(mime) => { Some(mime) if config.normalization > NormalizationPolicy::Never => {
if mime.starts_with("image/") { if mime.starts_with("image/") {
// slurp it up if config.normalization <= NormalizationPolicy::Opportunistic
&& !options.requested_resize()
&& (config.normalization <= NormalizationPolicy::Lazy
|| match (mime, options.format.as_deref()) {
(_, None) => true,
("image/png" | "image/apng", Some(f)) => {
f.eq_ignore_ascii_case("png")
|| f.eq_ignore_ascii_case("apng")
}
("image/jpeg", Some(f)) => f.eq_ignore_ascii_case("jpeg"),
("image/webp", Some(f)) => f.eq_ignore_ascii_case("webp"),
("image/gif", Some(f)) => f.eq_ignore_ascii_case("gif"),
(_, Some(_)) => false,
})
&& (mime.starts_with("image/png")
|| mime.starts_with("image/apng")
|| mime.starts_with("image/webp")
|| mime.starts_with("image/gif")
|| mime.starts_with("image/jpeg"))
{
return Ok(MediaResponse::PassThru(PassThru {
header_len: header
.position()
.try_into()
.map_err(|_| ErrorResponse::payload_too_large())?,
header: Box::new(header.into_inner()),
remaining_body,
content_type: Some(mime.to_string()),
is_https,
})
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(SLURP_TIMING_KEY, None)
.with_timing_info(TIMING_KEY, begin.elapsed()));
}
// slurp it up
let slurp_begin = crate::timing::Instant::now();
let header_len = header.position(); let header_len = header.position();
let header = header.into_inner(); let header = header.into_inner();
let mut buf = if let Some(cl) = content_length { let mut buf = if let Some(cl) = content_length {
@ -187,12 +256,11 @@ where
if options.format.is_none() { if options.format.is_none() {
if mime.starts_with("image/png") || mime.starts_with("image/apng") { if mime.starts_with("image/png") || mime.starts_with("image/apng") {
let result = let result = run_blocking!(
image_processing::postprocess_png_image(&buf, &options) image_processing::postprocess_png_image(&buf, &options)
.map_err(|e| {
ErrorResponse::postprocess_failed(
e.to_string().into(),
) )
.map_err(|e| {
ErrorResponse::postprocess_failed(e.to_string().into())
})?; })?;
return match result { return match result {
@ -201,21 +269,32 @@ where
data: img, data: img,
format: output_static_format, format: output_static_format,
is_https, is_https,
})) })
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(
SLURP_TIMING_KEY,
Some(slurp_begin.elapsed()),
)
.with_timing_info(TIMING_KEY, begin.elapsed()))
} }
None => Ok(MediaResponse::Buffer { None => Ok(MediaResponse::Buffer {
data: buf, data: buf,
content_type: Some("image/png".into()), content_type: Some("image/png".into()),
}), }
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(
SLURP_TIMING_KEY,
Some(slurp_begin.elapsed()),
)
.with_timing_info(TIMING_KEY, begin.elapsed())),
}; };
} }
if mime.starts_with("image/webp") { if mime.starts_with("image/webp") {
let result = let result = run_blocking!(
image_processing::postprocess_webp_image(&buf, &options) image_processing::postprocess_webp_image(&buf, &options)
.map_err(|e| {
ErrorResponse::postprocess_failed(
e.to_string().into(),
) )
.map_err(|e| {
ErrorResponse::postprocess_failed(e.to_string().into())
})?; })?;
return match result { return match result {
@ -224,26 +303,41 @@ where
data: img, data: img,
format: output_static_format, format: output_static_format,
is_https, is_https,
})) })
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(
SLURP_TIMING_KEY,
Some(slurp_begin.elapsed()),
)
.with_timing_info(TIMING_KEY, begin.elapsed()))
} }
None => Ok(MediaResponse::Buffer { None => Ok(MediaResponse::Buffer {
data: buf, data: buf,
content_type: Some("image/webp".into()), content_type: Some("image/webp".into()),
}), }
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(
SLURP_TIMING_KEY,
Some(slurp_begin.elapsed()),
)
.with_timing_info(TIMING_KEY, begin.elapsed())),
}; };
} }
} }
let result = image_processing::postprocess_static_image(&buf, &options) let result = run_blocking!(image_processing::postprocess_static_image(
.map_err(|e| { &buf, &options
ErrorResponse::postprocess_failed(e.to_string().into()) ))
})?; .map_err(|e| ErrorResponse::postprocess_failed(e.to_string().into()))?;
Ok(MediaResponse::ProcessedStaticImage(StaticImage { Ok(MediaResponse::ProcessedStaticImage(StaticImage {
data: result, data: result,
format: output_static_format, format: output_static_format,
is_https, is_https,
})) })
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(SLURP_TIMING_KEY, Some(slurp_begin.elapsed()))
.with_timing_info(TIMING_KEY, begin.elapsed()))
} else { } else {
Ok(MediaResponse::PassThru(PassThru { Ok(MediaResponse::PassThru(PassThru {
header_len: header header_len: header
@ -254,10 +348,13 @@ where
remaining_body, remaining_body,
content_type: Some(mime.to_string()), content_type: Some(mime.to_string()),
is_https, is_https,
})) })
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(SLURP_TIMING_KEY, None)
.with_timing_info(TIMING_KEY, begin.elapsed()))
} }
} }
None => Ok(MediaResponse::PassThru(PassThru { _ => Ok(MediaResponse::PassThru(PassThru {
header_len: header header_len: header
.position() .position()
.try_into() .try_into()
@ -266,9 +363,25 @@ where
remaining_body, remaining_body,
content_type: None, content_type: None,
is_https, is_https,
})), })
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(SLURP_TIMING_KEY, None)
.with_timing_info(TIMING_KEY, begin.elapsed())),
} }
} }
None if allow_unknown => Ok(MediaResponse::PassThru(PassThru {
header_len: header
.position()
.try_into()
.map_err(|_| ErrorResponse::payload_too_large())?,
header: Box::new(header.into_inner()),
remaining_body,
content_type: None,
is_https,
})
.with_timing_info(TIME_TO_FIRST_BYTE_KEY, ttfb)
.with_opt_timing_info(SLURP_TIMING_KEY, None)
.with_timing_info(TIMING_KEY, begin.elapsed())),
None => Err(ErrorResponse::unsupported_media()), None => Err(ErrorResponse::unsupported_media()),
} }
} }
@ -285,6 +398,8 @@ where
if let Some(ct) = content_type { if let Some(ct) = content_type {
resp.headers_mut() resp.headers_mut()
.insert("content-type", HeaderValue::from_str(&ct).unwrap()); .insert("content-type", HeaderValue::from_str(&ct).unwrap());
resp.headers_mut()
.insert("X-Proxy-Post-Process", HeaderValue::from_static("buffered"));
} }
resp.into_response() resp.into_response()
} }
@ -397,6 +512,10 @@ impl<R: HTTPResponse> IntoResponse for PassThru<R> {
}), }),
))); )));
resp.headers_mut().insert("x-forwarded-proto", proto); resp.headers_mut().insert("x-forwarded-proto", proto);
resp.headers_mut().insert(
"X-Proxy-Post-Process",
HeaderValue::from_static("pass-thru"),
);
if let Ok(ct) = content_type { if let Ok(ct) = content_type {
resp.headers_mut().insert("content-type", ct); resp.headers_mut().insert("content-type", ct);
} }
@ -414,8 +533,12 @@ pub struct StaticImage {
impl IntoResponse for StaticImage { impl IntoResponse for StaticImage {
fn into_response(self) -> axum::response::Response { fn into_response(self) -> axum::response::Response {
const TIMING_KEY: &str = "image-reencode-static";
let encoding_begin = crate::timing::Instant::now();
let mut buf = BufWriter::new(Cursor::new(Vec::new())); let mut buf = BufWriter::new(Cursor::new(Vec::new()));
self.data.write_to(&mut buf, self.format).unwrap(); self.data.write_to(&mut buf, self.format).unwrap();
let mut resp = let mut resp =
axum::http::Response::new(Body::from(buf.into_inner().unwrap().into_inner())); axum::http::Response::new(Body::from(buf.into_inner().unwrap().into_inner()));
@ -437,10 +560,16 @@ impl IntoResponse for StaticImage {
HeaderValue::from_static(if self.is_https { "https" } else { "http" }), HeaderValue::from_static(if self.is_https { "https" } else { "http" }),
); );
resp.headers_mut().insert(
"X-Proxy-Post-Process",
HeaderValue::from_static("img-static"),
);
resp.headers_mut() resp.headers_mut()
.insert("content-type", HeaderValue::from_static(mime)); .insert("content-type", HeaderValue::from_static(mime));
resp.into_response() resp.with_timing_info(TIMING_KEY, encoding_begin.elapsed())
.into_response()
} }
} }
@ -453,6 +582,9 @@ pub struct AnimatedImage<'a> {
impl<'a> IntoResponse for AnimatedImage<'a> { impl<'a> IntoResponse for AnimatedImage<'a> {
fn into_response(self) -> axum::response::Response { fn into_response(self) -> axum::response::Response {
const TIMING_KEY: &str = "image-reencode-animated";
let encoding_begin = crate::timing::Instant::now();
let mut buf = BufWriter::new(Cursor::new(Vec::new())); let mut buf = BufWriter::new(Cursor::new(Vec::new()));
let mut gif = GifEncoder::new(&mut buf); let mut gif = GifEncoder::new(&mut buf);
gif.set_repeat(Repeat::Infinite).unwrap(); gif.set_repeat(Repeat::Infinite).unwrap();
@ -472,9 +604,15 @@ impl<'a> IntoResponse for AnimatedImage<'a> {
HeaderValue::from_static(if self.is_https { "https" } else { "http" }), HeaderValue::from_static(if self.is_https { "https" } else { "http" }),
); );
resp.headers_mut().insert(
"X-Proxy-Post-Process",
HeaderValue::from_static("img-animated"),
);
resp.headers_mut() resp.headers_mut()
.insert("content-type", HeaderValue::from_static("image/gif")); .insert("content-type", HeaderValue::from_static("image/gif"));
resp.into_response() resp.with_timing_info(TIMING_KEY, encoding_begin.elapsed())
.into_response()
} }
} }

158
src/timing.rs Normal file
View file

@ -0,0 +1,158 @@
use axum::{
http::{HeaderName, HeaderValue},
response::IntoResponse,
};
/// A cross-platform `Instant` implementation for measuring time
#[cfg(not(target_arch = "wasm32"))]
pub struct Instant(std::time::Instant);
impl Instant {
/// Create a new `Instant` from the current time
#[cfg(not(target_arch = "wasm32"))]
pub fn now() -> Self {
Self(std::time::Instant::now())
}
/// Get the elapsed time since the instant was created
#[cfg(not(target_arch = "wasm32"))]
pub fn elapsed(&self) -> std::time::Duration {
self.0.elapsed()
}
}
#[cfg(all(target_arch = "wasm32", not(feature = "cf-worker")))]
mod js_performance {
use wasm_bindgen::prelude::*;
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = performance)]
pub fn now() -> f64;
}
}
/// A cross-platform `Instant` implementation for measuring time
#[cfg(target_arch = "wasm32")]
pub struct Instant(f64);
#[cfg(all(target_arch = "wasm32", not(feature = "cf-worker")))]
impl Instant {
/// Create a new `Instant` from the current time
pub fn now() -> Self {
Self(js_performance::now())
}
/// Get the elapsed time since the instant was created
pub fn elapsed(&self) -> std::time::Duration {
std::time::Duration::from_secs_f64((js_performance::now() - self.0) / 1000.0)
}
}
#[cfg(all(target_arch = "wasm32", feature = "cf-worker"))]
impl Instant {
/// Create a new `Instant` from the current time
pub fn now() -> Self {
Self(worker::Date::now().as_millis() as f64)
}
/// Get the elapsed time since the instant was created
pub fn elapsed(&self) -> std::time::Duration {
std::time::Duration::from_secs_f64(
(worker::Date::now().as_millis() as f64 - self.0) / 1000.0,
)
}
}
/// A response that includes timing information
pub struct WithTimingInfo<T> {
pub(crate) key: &'static str,
pub(crate) inner: T,
pub(crate) dur: std::time::Duration,
}
impl<T> WithTimingInfo<T> {
pub(crate) fn new(key: &'static str, inner: T, dur: std::time::Duration) -> Self {
Self { key, inner, dur }
}
/// Get the duration of the response
pub fn duration(&self) -> std::time::Duration {
self.dur
}
}
impl<T: IntoResponse> IntoResponse for WithTimingInfo<T> {
fn into_response(self) -> axum::response::Response {
let mut res = self.inner.into_response();
res.headers_mut().insert(
HeaderName::from_lowercase(format!("x-timing-{}-ms", self.key).as_bytes()).unwrap(),
if cfg!(feature = "cf-worker") && self.dur.is_zero() {
HeaderValue::from_static("N/A") // Cloudflare redacts some timing information
} else {
self.dur.as_millis().to_string().parse().unwrap()
},
);
res
}
}
/// A response that includes timing information, if available
pub struct WithMaybeTimingInfo<T> {
pub(crate) key: &'static str,
pub(crate) inner: T,
pub(crate) dur: Option<std::time::Duration>,
}
impl<T> WithMaybeTimingInfo<T> {
pub(crate) fn new(key: &'static str, inner: T, dur: Option<std::time::Duration>) -> Self {
Self { key, inner, dur }
}
/// Get the duration of the response
pub fn duration(&self) -> Option<std::time::Duration> {
self.dur
}
}
impl<T: IntoResponse> IntoResponse for WithMaybeTimingInfo<T> {
fn into_response(self) -> axum::response::Response {
let mut res = self.inner.into_response();
if let Some(dur) = self.dur {
res.headers_mut().insert(
HeaderName::from_lowercase(format!("x-timing-{}-ms", self.key).as_bytes()).unwrap(),
if cfg!(feature = "cf-worker") && dur.is_zero() {
HeaderValue::from_static("N/A") // Cloudflare redacts some timing information
} else {
dur.as_millis().to_string().parse().unwrap()
},
);
}
res
}
}
pub(crate) trait IntoResponseExt: Sized {
fn with_opt_timing_info(
self,
key: &'static str,
dur: Option<std::time::Duration>,
) -> WithMaybeTimingInfo<Self>;
fn with_timing_info(self, key: &'static str, dur: std::time::Duration) -> WithTimingInfo<Self>;
}
impl<T: IntoResponse> IntoResponseExt for T {
fn with_opt_timing_info(
self,
key: &'static str,
dur: Option<std::time::Duration>,
) -> WithMaybeTimingInfo<Self> {
WithMaybeTimingInfo::new(key, self, dur)
}
fn with_timing_info(self, key: &'static str, dur: std::time::Duration) -> WithTimingInfo<Self> {
WithTimingInfo::new(key, self, dur)
}
}

View file

@ -8,3 +8,18 @@ command = "cargo install -q worker-build && worker-build --release --features cf
[observability] [observability]
enabled = true enabled = true
head_sampling_rate = 1 head_sampling_rate = 1
[vars]
enable_cache = true
index_redirect = { permanent = false, url = "https://mi.yumechi.jp/" }
allow_unknown = false
[vars.fetch]
allow_http = false
via = "yumechi-no-kuni-proxy-worker"
user_agent = "Yumechi-no-Kuni-Proxy-Worker"
[vars.post_process]
enable_redirects = false
normalization = "lazy"
allow_svg_passthrough = true