yumechi-no-kuni/yume-mods/nyuukyou/src/main.rs
eternal-flame-AD ccf81b9398
All checks were successful
Test (production install and build) / production (22.11.0) (push) Successful in 1m27s
Publish Docker image / Build (push) Successful in 4m45s
filter out duplicate events
Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
2024-11-19 01:18:31 -06:00

142 lines
4.2 KiB
Rust

use std::collections::HashMap;
use std::hash::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Instant;
use axum::http::StatusCode;
use clap::Parser;
use fedivet::evaluate::chain::audit::AuditOptions;
use fedivet::evaluate::Disposition;
use fedivet::evaluate::Evaluator;
use fedivet::model::error::MisskeyError;
use fedivet::serve;
use fedivet::BaseAppState;
use fedivet::HasAppState;
use rand::random;
#[derive(Parser)]
pub struct Args {
#[clap(short, long, default_value = "127.0.0.1:3001")]
pub listen: String,
#[clap(short, long, default_value = "http://web:3000")]
pub backend: String,
#[clap(long)]
pub tls_cert: Option<String>,
#[clap(long)]
pub tls_key: Option<String>,
}
struct LastSeenEntry {
ts: Instant,
count: u64,
}
const ERR_DEDUP: MisskeyError = MisskeyError::new_const(
StatusCode::OK,
"TOO_MANY_DUPLICATES",
"02fe81b0-c1ef-4d4e-bd5c-288ae379bff7",
"Too many duplicates, further duplicates will be ignored.",
);
#[allow(clippy::unused_async)]
async fn build_state(
base: Arc<BaseAppState<MisskeyError>>,
_args: &Args,
) -> impl HasAppState<MisskeyError> + Evaluator<MisskeyError> {
let dedup_last_seen = Arc::new(RwLock::new(HashMap::<_, LastSeenEntry>::new()));
let dedup_last_seen_clone = dedup_last_seen.clone();
let dedup_last_seen_clone2 = dedup_last_seen.clone();
tokio::spawn(async move {
let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(60 * 60));
loop {
ticker.tick().await;
let mut dedup_last_seen = dedup_last_seen_clone.write().unwrap();
dedup_last_seen.retain(|_, entry| entry.ts.elapsed().as_secs() < 60 * 5);
}
});
base.extract_meta()
.filter(
move |event| match event.activity.as_ref() {
Err(_) => Disposition::Next,
Ok(activity) => {
let dedup_last_seen = dedup_last_seen.read().unwrap();
let mut hash = DefaultHasher::new();
activity.meta_obj.ty_.hash(&mut hash);
activity.meta_obj.id.hash(&mut hash);
let count = dedup_last_seen
.get(&hash.finish())
.map(|entry| entry.count)
.unwrap_or(0);
if count > 5 {
Disposition::Intercept(ERR_DEDUP)
} else {
Disposition::Next
}
}
},
move |_resp, info| match info.activity.as_ref() {
Err(_) => {}
Ok(activity) => {
let mut hash = DefaultHasher::new();
activity.meta_obj.ty_.hash(&mut hash);
activity.meta_obj.id.hash(&mut hash);
let mut dedup_last_seen = dedup_last_seen_clone2.write().unwrap();
dedup_last_seen
.entry(hash.finish())
.or_insert(LastSeenEntry {
ts: Instant::now(),
count: 1,
});
if dedup_last_seen.len() * std::mem::size_of::<(u64, LastSeenEntry)>()
> 64 << 20
{
dedup_last_seen.retain(|_, _| random::<bool>());
}
}
},
)
.audited(AuditOptions::new(PathBuf::from(
"/store/log/audit/incoming",
)))
}
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
env_logger::init();
let args = Args::parse();
let state = build_state(
Arc::new(
BaseAppState::new(args.backend.parse().expect("Invalid backend URL")).with_empty_ctx(),
),
&args,
)
.await;
serve::run(
state.clone(),
serve::start(
state,
&args.listen,
Some("/store/log/panic.log"),
args.tls_cert.as_deref(),
args.tls_key.as_deref(),
)
.await,
)
.await;
}