Compare commits
3 commits
c67e647b11
...
b6dea3fd06
Author | SHA1 | Date | |
---|---|---|---|
b6dea3fd06 | |||
f1c0741089 | |||
1af251420e |
5 changed files with 67 additions and 8 deletions
|
@ -1,7 +1,10 @@
|
|||
use axum::response::IntoResponse;
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::{Compression, GzBuilder};
|
||||
use serde::Serialize;
|
||||
use std::collections::HashSet;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::{BufReader, BufWriter};
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use std::{collections::HashMap, fmt::Debug, ops::DerefMut, path::PathBuf, sync::Arc};
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
@ -57,7 +60,7 @@ pub enum AuditError {
|
|||
|
||||
pub struct AuditState {
|
||||
options: AuditOptions,
|
||||
cur_file: Option<RwLock<HashMap<String, Mutex<File>>>>,
|
||||
cur_file: Option<RwLock<HashMap<String, Mutex<GzEncoder<File>>>>>,
|
||||
vacuum_counter: AtomicU32,
|
||||
}
|
||||
|
||||
|
@ -81,7 +84,7 @@ impl AuditState {
|
|||
.filter_map(|f| f.ok())
|
||||
.filter(|f| {
|
||||
f.file_type().map(|t| t.is_file()).unwrap_or(false)
|
||||
&& f.file_name().to_string_lossy().ends_with(".json")
|
||||
&& f.file_name().to_string_lossy().ends_with(".json.gz")
|
||||
})
|
||||
.filter(|f| !in_use_files.contains(&f.file_name().to_string_lossy().to_string()));
|
||||
|
||||
|
@ -108,7 +111,7 @@ impl AuditState {
|
|||
|
||||
let mut write = self.cur_file.as_ref().unwrap().write().await;
|
||||
|
||||
let full_name = format!("{}_{}.json", name, time_str);
|
||||
let full_name = format!("{}_{}.json.gz", name, time_str);
|
||||
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
|
@ -116,7 +119,9 @@ impl AuditState {
|
|||
.append(true)
|
||||
.open(&self.options.output.join(&full_name))?;
|
||||
|
||||
write.insert(name.to_string(), Mutex::new(file));
|
||||
let gz = GzEncoder::new(file, Compression::default());
|
||||
|
||||
write.insert(name.to_string(), Mutex::new(gz));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -143,7 +148,7 @@ impl AuditState {
|
|||
|
||||
serde_json::to_writer(f.deref_mut(), &item)?;
|
||||
|
||||
let meta = f.metadata()?;
|
||||
let meta = f.get_ref().metadata()?;
|
||||
|
||||
if let Some(size) = self.options.rotate_size {
|
||||
if meta.len() >= size {
|
||||
|
@ -161,6 +166,10 @@ impl AuditState {
|
|||
|
||||
let file = File::create(&self.options.output.join(name))?;
|
||||
|
||||
let file = GzBuilder::new()
|
||||
.filename(name)
|
||||
.write(file, Compression::default());
|
||||
|
||||
write.insert(name.to_string(), Mutex::new(file));
|
||||
|
||||
// this is deliberately out of order to make sure we don't create endless files if serialization fails
|
||||
|
|
|
@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
|
|||
|
||||
use crate::{
|
||||
delegate,
|
||||
evaluate::{Disposition, Evaluator},
|
||||
model::ap::{AnyObject, NoteObject},
|
||||
APRequestInfo, HasAppState,
|
||||
};
|
||||
|
@ -14,6 +15,15 @@ pub struct Meta<E: IntoResponse + 'static, I: HasAppState<E>> {
|
|||
_marker: std::marker::PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<E: IntoResponse + 'static, I: HasAppState<E>> Meta<E, I> {
|
||||
pub fn new(inner: I) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
_marker: std::marker::PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extract_host(act: &APRequestInfo) -> Option<String> {
|
||||
act.activity
|
||||
.as_ref()
|
||||
|
@ -50,3 +60,26 @@ pub struct MetaItem {
|
|||
instance_host: Option<String>,
|
||||
attributed_to: Option<String>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<
|
||||
E: IntoResponse + Serialize + Send + Sync + 'static,
|
||||
I: HasAppState<E> + Evaluator<E> + Sync,
|
||||
> Evaluator<E> for Meta<E, I>
|
||||
{
|
||||
fn name() -> &'static str {
|
||||
"ExtractMeta"
|
||||
}
|
||||
async fn evaluate<'r>(
|
||||
&self,
|
||||
ctx: Option<serde_json::Value>,
|
||||
info: &APRequestInfo<'r>,
|
||||
) -> (Disposition<E>, Option<serde_json::Value>) {
|
||||
let ctx = ctx.map(|mut c| {
|
||||
c["meta"] = serde_json::to_value(extract_meta(info)).unwrap();
|
||||
c
|
||||
});
|
||||
|
||||
self.inner.evaluate(ctx, info).await
|
||||
}
|
||||
}
|
||||
|
|
10
src/lib.rs
10
src/lib.rs
|
@ -17,6 +17,7 @@ use axum::{
|
|||
};
|
||||
use client::ClientCache;
|
||||
use evaluate::chain::audit::{Audit, AuditOptions};
|
||||
use evaluate::chain::meta::Meta;
|
||||
use evaluate::{
|
||||
Disposition, Evaluator, ERR_BAD_REQUEST, ERR_INTERNAL_SERVER_ERROR, ERR_PAYLOAD_TOO_LARGE,
|
||||
ERR_SERVICE_TEMPORARILY_UNAVAILABLE,
|
||||
|
@ -103,6 +104,15 @@ pub trait HasAppState<E: IntoResponse + 'static>: Clone {
|
|||
{
|
||||
Audit::new(self, opts)
|
||||
}
|
||||
|
||||
/// Wrap the evaluator in a meta chain.
|
||||
fn extract_meta(self) -> Meta<E, Self>
|
||||
where
|
||||
Self: Sized + Send + Sync + Evaluator<E>,
|
||||
E: Send + Sync,
|
||||
{
|
||||
Meta::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Application state.
|
||||
|
|
|
@ -29,6 +29,7 @@ async fn build_state<E: IntoResponse + Clone + Serialize + Send + Sync + 'static
|
|||
_args: &Args,
|
||||
) -> impl HasAppState<E> + Evaluator<E> {
|
||||
base.audited(AuditOptions::new(PathBuf::from("inbox_audit")))
|
||||
.extract_meta()
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
@ -41,9 +42,9 @@ async fn main() {
|
|||
let args = Args::parse();
|
||||
|
||||
let state = build_state::<MisskeyError>(
|
||||
Arc::new(BaseAppState::new(
|
||||
args.backend.parse().expect("Invalid backend URL"),
|
||||
)),
|
||||
Arc::new(
|
||||
BaseAppState::new(args.backend.parse().expect("Invalid backend URL")).with_empty_ctx(),
|
||||
),
|
||||
&args,
|
||||
)
|
||||
.await;
|
||||
|
|
|
@ -40,6 +40,12 @@ pub async fn start<
|
|||
.put(ProxyApp::<S, E>::inbox_handler)
|
||||
.patch(ProxyApp::<S, E>::inbox_handler),
|
||||
)
|
||||
.route(
|
||||
"/users/:id/inbox",
|
||||
post(ProxyApp::<S, E>::inbox_handler)
|
||||
.put(ProxyApp::<S, E>::inbox_handler)
|
||||
.patch(ProxyApp::<S, E>::inbox_handler),
|
||||
)
|
||||
.fallback(any(ProxyApp::<S, E>::pass_through));
|
||||
|
||||
let ms = app
|
||||
|
|
Loading…
Add table
Reference in a new issue