reliability and basic documentation

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
This commit is contained in:
ゆめ 2024-10-17 12:19:13 -05:00
parent 5bd9dd0024
commit 5dd87bbccb
No known key found for this signature in database
10 changed files with 185 additions and 13 deletions

64
Cargo.lock generated
View file

@ -119,6 +119,17 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.4.0"
@ -516,6 +527,7 @@ name = "fedivet"
version = "0.1.0"
dependencies = [
"async-trait",
"atty",
"axum",
"axum-server",
"chrono",
@ -531,6 +543,7 @@ dependencies = [
"serde_json",
"thiserror",
"tokio",
"tower-http",
"url",
]
@ -740,6 +753,15 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.9"
@ -1066,7 +1088,7 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.9",
"libc",
"wasi",
"windows-sys 0.52.0",
@ -1778,6 +1800,24 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower-http"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97"
dependencies = [
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"pin-project-lite",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
@ -1984,6 +2024,28 @@ dependencies = [
"rustix",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.52.0"

View file

@ -4,23 +4,34 @@ version = "0.1.0"
edition = "2021"
[features]
unstable = ["data-source"]
data-source = ["dep:lru"]
bin = ["dep:clap", "dep:env_logger", "dep:atty"]
tls = ["axum-server/tls-rustls", "axum-server/rustls-pemfile", "axum-server/tokio-rustls"]
[dependencies]
async-trait = "0.1.83"
atty = { version = "0.2.14", optional = true }
axum = "0.7.7"
axum-server = { version = "0.7.1" }
chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "4.5.20", features = ["derive"] }
clap = { version = "4.5.20", features = ["derive"], optional = true }
dashmap = "6.1.0"
env_logger = "0.11.5"
env_logger = { version = "0.11.5", optional = true }
flate2 = "1.0.34"
futures = "0.3.31"
log = "0.4.22"
lru = "0.12.5"
lru = { version = "0.12.5", optional = true }
reqwest = { version = "0.12.8", features = ["stream"] }
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
thiserror = "1.0.64"
tokio = { version = "1.40.0", features = ["rt", "rt-multi-thread", "macros", "net", "sync", "fs", "signal", "time"] }
tower-http = { version = "0.6.1", features = ["catch-panic"] }
url = { version = "2.5.2", features = ["serde"] }
[[bin]]
name = "fedivet"
path = "src/main.rs"
required-features = ["bin"]

View file

@ -59,7 +59,7 @@ pub enum AuditError {
SerializeError(#[from] serde_json::Error),
}
pub struct AuditState {
pub(crate) struct AuditState {
options: AuditOptions,
cur_file: RwLock<HashMap<String, Mutex<GzEncoder<File>>>>,
vacuum_counter: AtomicU32,
@ -68,6 +68,7 @@ pub struct AuditState {
impl AuditState {
pub fn new(options: AuditOptions) -> Self {
if !options.output.exists() {
#[allow(clippy::expect_used)]
std::fs::create_dir_all(&options.output).expect("Failed to create audit log directory");
}
Self {
@ -171,7 +172,10 @@ impl AuditState {
let read = self.cur_file.read().await;
let file = read.get(name).unwrap();
#[allow(clippy::expect_used)]
let file = read
.get(name)
.expect("The created file handle is missing, this should never happen");
serde_json::to_writer(file.lock().await.deref_mut(), &item)?;
write!(file.lock().await.deref_mut(), "\n\n")?;
@ -181,6 +185,7 @@ impl AuditState {
}
}
/// Audit evaluator
pub struct Audit<E: IntoResponse + 'static, I: HasAppState<E>> {
inner: I,
state: Arc<AuditState>,
@ -188,6 +193,7 @@ pub struct Audit<E: IntoResponse + 'static, I: HasAppState<E>> {
}
impl<E: IntoResponse + 'static, I: HasAppState<E>> Audit<E, I> {
/// Create a new audit evaluator
pub fn new(inner: I, options: AuditOptions) -> Self {
Self {
inner,
@ -210,6 +216,7 @@ impl<E: IntoResponse + 'static, I: HasAppState<E>> Clone for Audit<E, I> {
delegate!(state Audit::<E, I>.inner);
#[derive(Debug, Serialize)]
/// An audit item
pub struct AuditItem<'r, E: IntoResponse + Serialize> {
info: &'r crate::APRequestInfo<'r>,
ctx: &'r Option<serde_json::Value>,

View file

@ -10,12 +10,14 @@ use crate::{
};
#[derive(Clone)]
/// Extracts metadata from the request
pub struct Meta<E: IntoResponse + 'static, I: HasAppState<E>> {
inner: I,
_marker: std::marker::PhantomData<E>,
}
impl<E: IntoResponse + 'static, I: HasAppState<E>> Meta<E, I> {
/// Create a new meta extractor
pub fn new(inner: I) -> Self {
Self {
inner,
@ -24,6 +26,7 @@ impl<E: IntoResponse + 'static, I: HasAppState<E>> Meta<E, I> {
}
}
/// Extracts the host from the request
pub fn extract_host(act: &APRequestInfo) -> Option<String> {
act.activity
.as_ref()
@ -33,6 +36,7 @@ pub fn extract_host(act: &APRequestInfo) -> Option<String> {
.and_then(|s| Url::parse(s).ok()?.host_str().map(|s| s.to_string()))
}
/// Extracts the attributed_to field from the request
pub fn extract_attributed_to(act: &APRequestInfo) -> Option<String> {
match &act.resolved_obj {
Some(cow) => match cow.as_ref() {
@ -43,6 +47,7 @@ pub fn extract_attributed_to(act: &APRequestInfo) -> Option<String> {
}
}
/// Extracts the metadata from the request
pub fn extract_meta(act: &APRequestInfo) -> Option<MetaItem> {
Some(MetaItem {
instance_host: extract_host(act),
@ -53,6 +58,7 @@ pub fn extract_meta(act: &APRequestInfo) -> Option<MetaItem> {
delegate!(state Meta::<E, I>.inner);
#[derive(Debug, Clone, Serialize, Deserialize)]
/// Metadata extracted from the request
pub struct MetaItem {
instance_host: Option<String>,
attributed_to: Option<String>,

View file

@ -1,2 +1,5 @@
/// Audit module
pub mod audit;
/// Meta module
pub mod meta;

View file

@ -2,9 +2,11 @@ use axum::response::IntoResponse;
use reqwest::StatusCode;
use serde::Serialize;
/// Chain evaluation modules
pub mod chain;
#[macro_export]
/// Delegate a crate trait implementation to a field of a struct
macro_rules! delegate {
(state $str:ident::<E, I $(,$t:ty),*>.$field:ident) => {
use $crate::{BaseAppState, ClientCache};
@ -22,6 +24,7 @@ macro_rules! delegate {
use crate::{model::error::MisskeyError, APRequestInfo};
#[allow(missing_docs)]
pub const ERROR_DENIED: MisskeyError = MisskeyError::new_const(
StatusCode::FORBIDDEN,
"ACTIVITY_REJECTED",
@ -29,6 +32,7 @@ pub const ERROR_DENIED: MisskeyError = MisskeyError::new_const(
"This server cannot accept this activity.",
);
#[allow(missing_docs)]
pub const ERR_BAD_REQUEST: MisskeyError = MisskeyError::new_const(
StatusCode::BAD_REQUEST,
"UNPARSABLE_REQUEST",
@ -36,6 +40,7 @@ pub const ERR_BAD_REQUEST: MisskeyError = MisskeyError::new_const(
"The request is not HTTP compliant.",
);
#[allow(missing_docs)]
pub const ERR_INTERNAL_SERVER_ERROR: MisskeyError = MisskeyError::new_const(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_SERVER_ERROR",
@ -43,6 +48,7 @@ pub const ERR_INTERNAL_SERVER_ERROR: MisskeyError = MisskeyError::new_const(
"An internal server error occurred.",
);
#[allow(missing_docs)]
pub const ERR_SERVICE_TEMPORARILY_UNAVAILABLE: MisskeyError = MisskeyError::new_const(
StatusCode::SERVICE_UNAVAILABLE,
"SERVICE_TEMPORARILY_UNAVAILABLE",
@ -50,6 +56,7 @@ pub const ERR_SERVICE_TEMPORARILY_UNAVAILABLE: MisskeyError = MisskeyError::new_
"The service is temporarily unavailable.",
);
#[allow(missing_docs)]
pub const ERR_PAYLOAD_TOO_LARGE: MisskeyError = MisskeyError::new_const(
StatusCode::PAYLOAD_TOO_LARGE,
"PAYLOAD_TOO_LARGE",
@ -58,16 +65,23 @@ pub const ERR_PAYLOAD_TOO_LARGE: MisskeyError = MisskeyError::new_const(
);
#[derive(Debug, Serialize)]
/// Represents the disposition of an evaluator
pub enum Disposition<E: IntoResponse> {
/// Allow the request to continue and disregard the inner evaluator
Allow,
/// Allow the request to continue and pass the inner evaluator's context
Next,
/// Intercept the request and return an response
Intercept(E),
}
#[async_trait::async_trait]
/// An evaluator trait
pub trait Evaluator<E: IntoResponse> {
/// The name of the evaluator
fn name() -> &'static str;
/// Evaluate the request, optionally updating the context
async fn evaluate<'r>(
&self,
ctx: Option<serde_json::Value>,

View file

@ -24,7 +24,7 @@ use evaluate::{
ERR_SERVICE_TEMPORARILY_UNAVAILABLE,
};
use futures::TryStreamExt;
use model::ap::{Activity, AnyObject, MaybeRelayed, ResolveInto};
use model::ap::{AnyObject, MaybeRelayed, ResolveInto};
use model::{ap, error::MisskeyError};
use network::stream::LimitedStream;
use network::{new_backend_client, Either};
@ -39,6 +39,8 @@ pub mod model;
pub(crate) mod network;
/// Server implementation
pub mod serve;
#[cfg(feature = "data-source")]
/// Data sources
pub mod source;
@ -153,6 +155,7 @@ impl<E: IntoResponse + Send + Sync> Evaluator<E> for Arc<BaseAppState<E>> {
impl<E: IntoResponse> BaseAppState<E> {
/// Create a new application state.
pub fn new(backend: reqwest::Url) -> Self {
#[allow(clippy::expect_used)]
Self {
backend,
ctx_template: None,
@ -313,7 +316,7 @@ impl<
})
.await
}
Some(MaybeRelayed::Object(o)) => Some(Cow::Borrowed(&o)),
Some(MaybeRelayed::Object(o)) => Some(Cow::Borrowed(o)),
None => None,
},
Err(_) => None,

View file

@ -32,6 +32,14 @@ async fn build_state<E: IntoResponse + Clone + Serialize + Send + Sync + 'static
.audited(AuditOptions::new(PathBuf::from("inbox_audit")))
}
fn has_stderr() -> bool {
atty::is(atty::Stream::Stderr)
}
fn is_docker() -> bool {
std::fs::metadata("/.dockerenv").is_ok()
}
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
@ -49,11 +57,22 @@ async fn main() {
)
.await;
let panic_log_str = std::env::var("PANIC_LOG").ok();
serve::run(
state.clone(),
serve::start(
state,
&args.listen,
match panic_log_str.as_ref() {
Some(s) => Some(s),
None => {
if has_stderr() && !is_docker() {
None
} else {
Some("panic.log")
}
}
},
args.tls_cert.as_deref(),
args.tls_key.as_deref(),
)

View file

@ -1,6 +1,6 @@
// https://www.w3.org/TR/activitystreams-core/
use std::{borrow::Cow, collections::HashMap};
use std::{borrow::Cow, collections::HashMap, future::Future};
use futures::TryStreamExt;
use reqwest::Url;
@ -17,6 +17,7 @@ pub enum Either<A, B> {
B(B),
}
/// Marker trait for ActivityStreams objects.
pub trait IsAPObject: Serialize + DeserializeOwned {}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -82,7 +83,7 @@ pub struct NoteObject {
pub attributed_to: Option<String>,
pub sensitive: Option<bool>,
pub content: Option<String>,
pub content: String,
#[serde(rename = "contentMap")]
pub content_map: Option<HashMap<String, String>>,
@ -109,6 +110,7 @@ pub struct Attachment {
#[serde(untagged)]
/// Represents any parsable ActivityStreams object.
#[allow(missing_docs)]
#[allow(clippy::large_enum_variant)] // false positive
pub enum AnyObject {
NoteObject(NoteObject),
Object(Object),
@ -118,19 +120,31 @@ impl IsAPObject for AnyObject {}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
#[allow(missing_docs)]
/// Represents an object that may be relayed.
pub enum MaybeRelayed<O> {
Relayed(Url),
Object(O),
}
/// A trait for resolving a reference into a concrete object.
pub trait ResolveInto<E: From<reqwest::Error>, O> {
async fn resolve_ref<'a>(&self, client: &reqwest::Client) -> Result<Cow<'a, O>, E>
/// Resolve a reference by reference and return a Cow.
fn resolve_ref<'a>(
&self,
client: &reqwest::Client,
) -> impl Future<Output = Result<Cow<'a, O>, E>> + Send
where
O: Clone + 'a;
async fn resolve(self, client: &reqwest::Client) -> Result<O, E>;
/// Resolve a reference by value and return an owned object.
fn resolve(self, client: &reqwest::Client) -> impl Future<Output = Result<O, E>> + Send
where
O: Clone;
}
#[derive(Debug, thiserror::Error)]
/// Represents an error that may occur during resolution.
#[allow(missing_docs)]
pub enum ResolveError {
#[error("Payload too large")]
BodySizeExceeded,

View file

@ -1,4 +1,9 @@
use std::net::SocketAddr;
use std::{
backtrace::Backtrace,
io::Write,
net::SocketAddr,
sync::{Arc, Mutex},
};
use axum::{
response::IntoResponse,
@ -7,6 +12,7 @@ use axum::{
};
use axum_server::Handle;
use tokio::task::JoinHandle;
use tower_http::catch_panic::CatchPanicLayer;
use crate::{evaluate::Evaluator, HasAppState, ProxyApp};
@ -30,6 +36,7 @@ pub async fn start<
>(
state: S,
listen: &str,
panic_log: Option<&str>,
tls_cert: Option<&str>,
tls_key: Option<&str>,
) -> (JoinHandle<Result<(), std::io::Error>>, Handle) {
@ -48,8 +55,33 @@ pub async fn start<
)
.fallback(any(ProxyApp::<S, E>::pass_through));
let panic_log = Arc::new(Mutex::new(panic_log.map(|f| {
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(f)
.expect("Failed to open panic log")
})));
let ms = app
.with_state(state)
.layer(CatchPanicLayer::custom(
move |payload| -> axum::response::Response {
// we can't effectively continue as locks can be poisoned
let bt = Backtrace::capture();
log::error!("Panic: {:?}", payload);
log::error!("Backtrace: {}", bt);
if let Some(panic_log) = panic_log.lock().unwrap().as_mut() {
writeln!(panic_log, "{:?}\nBacktrace: {}", payload, bt)
.expect("Failed to write panic log");
panic_log.sync_all().expect("Failed to sync panic log");
}
std::process::exit(5);
},
))
.into_make_service_with_connect_info::<SocketAddr>();
let handle = Handle::new();
@ -99,6 +131,7 @@ pub async fn run<
let mut gc_ticker = tokio::time::interval(std::time::Duration::from_secs(300));
gc_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
#[allow(clippy::expect_used)]
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Failed to register SIGTERM handler");