Compare commits

...

3 commits

Author SHA1 Message Date
b6dea3fd06
Confine User inbox as well
Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
2024-10-16 19:30:01 -05:00
f1c0741089
Add shorthand for meta()
Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
2024-10-16 19:13:15 -05:00
1af251420e
gzip compress audit file
Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
2024-10-16 18:34:45 -05:00
5 changed files with 67 additions and 8 deletions

View file

@ -1,7 +1,10 @@
use axum::response::IntoResponse;
use flate2::write::GzEncoder;
use flate2::{Compression, GzBuilder};
use serde::Serialize;
use std::collections::HashSet;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter};
use std::sync::atomic::AtomicU32;
use std::{collections::HashMap, fmt::Debug, ops::DerefMut, path::PathBuf, sync::Arc};
use tokio::sync::{Mutex, RwLock};
@ -57,7 +60,7 @@ pub enum AuditError {
pub struct AuditState {
options: AuditOptions,
cur_file: Option<RwLock<HashMap<String, Mutex<File>>>>,
cur_file: Option<RwLock<HashMap<String, Mutex<GzEncoder<File>>>>>,
vacuum_counter: AtomicU32,
}
@ -81,7 +84,7 @@ impl AuditState {
.filter_map(|f| f.ok())
.filter(|f| {
f.file_type().map(|t| t.is_file()).unwrap_or(false)
&& f.file_name().to_string_lossy().ends_with(".json")
&& f.file_name().to_string_lossy().ends_with(".json.gz")
})
.filter(|f| !in_use_files.contains(&f.file_name().to_string_lossy().to_string()));
@ -108,7 +111,7 @@ impl AuditState {
let mut write = self.cur_file.as_ref().unwrap().write().await;
let full_name = format!("{}_{}.json", name, time_str);
let full_name = format!("{}_{}.json.gz", name, time_str);
let file = OpenOptions::new()
.create(true)
@ -116,7 +119,9 @@ impl AuditState {
.append(true)
.open(&self.options.output.join(&full_name))?;
write.insert(name.to_string(), Mutex::new(file));
let gz = GzEncoder::new(file, Compression::default());
write.insert(name.to_string(), Mutex::new(gz));
Ok(())
}
@ -143,7 +148,7 @@ impl AuditState {
serde_json::to_writer(f.deref_mut(), &item)?;
let meta = f.metadata()?;
let meta = f.get_ref().metadata()?;
if let Some(size) = self.options.rotate_size {
if meta.len() >= size {
@ -161,6 +166,10 @@ impl AuditState {
let file = File::create(&self.options.output.join(name))?;
let file = GzBuilder::new()
.filename(name)
.write(file, Compression::default());
write.insert(name.to_string(), Mutex::new(file));
// this is deliberately out of order to make sure we don't create endless files if serialization fails

View file

@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use crate::{
delegate,
evaluate::{Disposition, Evaluator},
model::ap::{AnyObject, NoteObject},
APRequestInfo, HasAppState,
};
@ -14,6 +15,15 @@ pub struct Meta<E: IntoResponse + 'static, I: HasAppState<E>> {
_marker: std::marker::PhantomData<E>,
}
impl<E: IntoResponse + 'static, I: HasAppState<E>> Meta<E, I> {
pub fn new(inner: I) -> Self {
Self {
inner,
_marker: std::marker::PhantomData,
}
}
}
pub fn extract_host(act: &APRequestInfo) -> Option<String> {
act.activity
.as_ref()
@ -50,3 +60,26 @@ pub struct MetaItem {
instance_host: Option<String>,
attributed_to: Option<String>,
}
#[async_trait::async_trait]
impl<
E: IntoResponse + Serialize + Send + Sync + 'static,
I: HasAppState<E> + Evaluator<E> + Sync,
> Evaluator<E> for Meta<E, I>
{
fn name() -> &'static str {
"ExtractMeta"
}
async fn evaluate<'r>(
&self,
ctx: Option<serde_json::Value>,
info: &APRequestInfo<'r>,
) -> (Disposition<E>, Option<serde_json::Value>) {
let ctx = ctx.map(|mut c| {
c["meta"] = serde_json::to_value(extract_meta(info)).unwrap();
c
});
self.inner.evaluate(ctx, info).await
}
}

View file

@ -17,6 +17,7 @@ use axum::{
};
use client::ClientCache;
use evaluate::chain::audit::{Audit, AuditOptions};
use evaluate::chain::meta::Meta;
use evaluate::{
Disposition, Evaluator, ERR_BAD_REQUEST, ERR_INTERNAL_SERVER_ERROR, ERR_PAYLOAD_TOO_LARGE,
ERR_SERVICE_TEMPORARILY_UNAVAILABLE,
@ -103,6 +104,15 @@ pub trait HasAppState<E: IntoResponse + 'static>: Clone {
{
Audit::new(self, opts)
}
/// Wrap the evaluator in a meta chain.
fn extract_meta(self) -> Meta<E, Self>
where
Self: Sized + Send + Sync + Evaluator<E>,
E: Send + Sync,
{
Meta::new(self.clone())
}
}
/// Application state.

View file

@ -29,6 +29,7 @@ async fn build_state<E: IntoResponse + Clone + Serialize + Send + Sync + 'static
_args: &Args,
) -> impl HasAppState<E> + Evaluator<E> {
base.audited(AuditOptions::new(PathBuf::from("inbox_audit")))
.extract_meta()
}
#[tokio::main]
@ -41,9 +42,9 @@ async fn main() {
let args = Args::parse();
let state = build_state::<MisskeyError>(
Arc::new(BaseAppState::new(
args.backend.parse().expect("Invalid backend URL"),
)),
Arc::new(
BaseAppState::new(args.backend.parse().expect("Invalid backend URL")).with_empty_ctx(),
),
&args,
)
.await;

View file

@ -40,6 +40,12 @@ pub async fn start<
.put(ProxyApp::<S, E>::inbox_handler)
.patch(ProxyApp::<S, E>::inbox_handler),
)
.route(
"/users/:id/inbox",
post(ProxyApp::<S, E>::inbox_handler)
.put(ProxyApp::<S, E>::inbox_handler)
.patch(ProxyApp::<S, E>::inbox_handler),
)
.fallback(any(ProxyApp::<S, E>::pass_through));
let ms = app