Implement relay annnounce resolving

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
This commit is contained in:
ゆめ 2024-10-17 10:53:26 -05:00
parent 56c7391cd5
commit 5bd9dd0024
No known key found for this signature in database
5 changed files with 119 additions and 14 deletions

2
Cargo.lock generated
View file

@ -531,6 +531,7 @@ dependencies = [
"serde_json", "serde_json",
"thiserror", "thiserror",
"tokio", "tokio",
"url",
] ]
[[package]] [[package]]
@ -1851,6 +1852,7 @@ dependencies = [
"form_urlencoded", "form_urlencoded",
"idna", "idna",
"percent-encoding", "percent-encoding",
"serde",
] ]
[[package]] [[package]]

View file

@ -23,3 +23,4 @@ serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128" serde_json = "1.0.128"
thiserror = "1.0.64" thiserror = "1.0.64"
tokio = { version = "1.40.0", features = ["rt", "rt-multi-thread", "macros", "net", "sync", "fs", "signal", "time"] } tokio = { version = "1.40.0", features = ["rt", "rt-multi-thread", "macros", "net", "sync", "fs", "signal", "time"] }
url = { version = "2.5.2", features = ["serde"] }

View file

@ -34,12 +34,9 @@ pub fn extract_host(act: &APRequestInfo) -> Option<String> {
} }
pub fn extract_attributed_to(act: &APRequestInfo) -> Option<String> { pub fn extract_attributed_to(act: &APRequestInfo) -> Option<String> {
match &act.activity.as_ref().ok()?.object { match &act.resolved_obj {
Some(arr) => match arr.clone().into_iter().next() { Some(cow) => match cow.as_ref() {
Some(AnyObject::NoteObject(NoteObject { AnyObject::NoteObject(NoteObject { attributed_to, .. }) => attributed_to.clone(),
attributed_to: Some(s),
..
})) => Some(s),
_ => None, _ => None,
}, },
_ => None, _ => None,

View file

@ -5,6 +5,7 @@
#![warn(unsafe_code)] #![warn(unsafe_code)]
#![warn(missing_docs)] #![warn(missing_docs)]
use std::borrow::Cow;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use std::{fmt::Display, net::SocketAddr}; use std::{fmt::Display, net::SocketAddr};
@ -23,7 +24,7 @@ use evaluate::{
ERR_SERVICE_TEMPORARILY_UNAVAILABLE, ERR_SERVICE_TEMPORARILY_UNAVAILABLE,
}; };
use futures::TryStreamExt; use futures::TryStreamExt;
use model::ap::AnyObject; use model::ap::{Activity, AnyObject, MaybeRelayed, ResolveInto};
use model::{ap, error::MisskeyError}; use model::{ap, error::MisskeyError};
use network::stream::LimitedStream; use network::stream::LimitedStream;
use network::{new_backend_client, Either}; use network::{new_backend_client, Either};
@ -49,7 +50,9 @@ pub struct APRequestInfo<'r> {
pub uri: &'r Uri, pub uri: &'r Uri,
pub header: &'r HeaderMap, pub header: &'r HeaderMap,
pub connect: &'r SocketAddr, pub connect: &'r SocketAddr,
pub activity: &'r Result<ap::Activity<AnyObject>, (serde_json::Value, serde_json::Error)>, pub activity:
&'r Result<ap::Activity<MaybeRelayed<AnyObject>>, (serde_json::Value, serde_json::Error)>,
pub resolved_obj: Option<Cow<'r, AnyObject>>,
} }
impl Serialize for APRequestInfo<'_> { impl Serialize for APRequestInfo<'_> {
@ -184,7 +187,7 @@ impl<
State(app): State<S>, State(app): State<S>,
ConnectInfo(addr): ConnectInfo<SocketAddr>, ConnectInfo(addr): ConnectInfo<SocketAddr>,
OriginalUri(uri): OriginalUri, OriginalUri(uri): OriginalUri,
mut header: HeaderMap, header: HeaderMap,
body: Body, body: Body,
) -> Result<impl IntoResponse, MisskeyError> { ) -> Result<impl IntoResponse, MisskeyError> {
log::debug!( log::debug!(
@ -251,7 +254,7 @@ impl<
State(app): State<S>, State(app): State<S>,
ConnectInfo(addr): ConnectInfo<SocketAddr>, ConnectInfo(addr): ConnectInfo<SocketAddr>,
OriginalUri(uri): OriginalUri, OriginalUri(uri): OriginalUri,
mut header: HeaderMap, header: HeaderMap,
body: Body, body: Body,
) -> Result<impl IntoResponse, Either<MisskeyError, E>> { ) -> Result<impl IntoResponse, Either<MisskeyError, E>> {
log::debug!( log::debug!(
@ -284,7 +287,7 @@ impl<
})?; })?;
let decode = { let decode = {
let activity_decode = serde_json::from_slice::<ap::Activity<AnyObject>>(&body); let activity_decode = serde_json::from_slice(&body);
match activity_decode { match activity_decode {
Ok(activity) => Ok(activity), Ok(activity) => Ok(activity),
@ -301,6 +304,20 @@ impl<
header: &header, header: &header,
connect: &addr, connect: &addr,
activity: &decode, activity: &decode,
resolved_obj: match decode.as_ref() {
Ok(activity) => match &activity.object {
Some(MaybeRelayed::Relayed(r)) => {
app.client_pool_ref()
.with_safe_client(&addr, move |client| {
Box::pin(async move { r.resolve_ref(&client).await.ok() })
})
.await
}
Some(MaybeRelayed::Object(o)) => Some(Cow::Borrowed(&o)),
None => None,
},
Err(_) => None,
},
}; };
let ctx = app.app_state().ctx_template.clone(); let ctx = app.app_state().ctx_template.clone();

View file

@ -1,8 +1,12 @@
// https://www.w3.org/TR/activitystreams-core/ // https://www.w3.org/TR/activitystreams-core/
use std::collections::HashMap; use std::{borrow::Cow, collections::HashMap};
use serde::{Deserialize, Serialize}; use futures::TryStreamExt;
use reqwest::Url;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use crate::network::stream::LimitedStream;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)] #[serde(untagged)]
@ -13,6 +17,8 @@ pub enum Either<A, B> {
B(B), B(B),
} }
pub trait IsAPObject: Serialize + DeserializeOwned {}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)] #[serde(untagged)]
/// Represents a single value or multiple values. /// Represents a single value or multiple values.
@ -53,12 +59,14 @@ pub struct Object {
pub rest: HashMap<String, serde_json::Value>, pub rest: HashMap<String, serde_json::Value>,
} }
impl IsAPObject for Object {}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
/// Represents an ActivityStreams activity. /// Represents an ActivityStreams activity.
#[allow(missing_docs)] #[allow(missing_docs)]
pub struct Activity<O> { pub struct Activity<O> {
pub actor: Option<String>, pub actor: Option<String>,
pub object: Option<FlatArray<O>>, pub object: Option<O>,
#[serde(flatten)] #[serde(flatten)]
pub meta_obj: Object, pub meta_obj: Object,
} }
@ -81,6 +89,8 @@ pub struct NoteObject {
pub attachment: Option<FlatArray<Attachment>>, pub attachment: Option<FlatArray<Attachment>>,
} }
impl IsAPObject for NoteObject {}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
/// Represents an ActivityStreams attachment. /// Represents an ActivityStreams attachment.
#[allow(missing_docs)] #[allow(missing_docs)]
@ -103,3 +113,81 @@ pub enum AnyObject {
NoteObject(NoteObject), NoteObject(NoteObject),
Object(Object), Object(Object),
} }
impl IsAPObject for AnyObject {}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum MaybeRelayed<O> {
Relayed(Url),
Object(O),
}
pub trait ResolveInto<E: From<reqwest::Error>, O> {
async fn resolve_ref<'a>(&self, client: &reqwest::Client) -> Result<Cow<'a, O>, E>
where
O: Clone + 'a;
async fn resolve(self, client: &reqwest::Client) -> Result<O, E>;
}
#[derive(Debug, thiserror::Error)]
pub enum ResolveError {
#[error("Payload too large")]
BodySizeExceeded,
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
}
impl<O: Clone + DeserializeOwned> ResolveInto<ResolveError, O> for Url {
async fn resolve_ref<'a>(&self, client: &reqwest::Client) -> Result<Cow<'a, O>, ResolveError>
where
Self: 'a,
O: 'a,
{
// this should be open visibility if it is announced through a relay
let response = client.get(self.clone()).send().await?.error_for_status()?;
let stream = LimitedStream::new(response.bytes_stream(), 1 << 20);
let mut body = Vec::new();
stream
.try_for_each(|chunk| {
body.extend_from_slice(&chunk);
futures::future::ready(Ok(()))
})
.await
.map_err(|e| match e {
None => ResolveError::BodySizeExceeded,
Some(e) => e.into(),
})?;
let decode = serde_json::from_slice(&body)?;
Ok(Cow::Owned(decode))
}
async fn resolve(self, client: &reqwest::Client) -> Result<O, ResolveError> {
let response = client.get(self).send().await?.error_for_status()?;
let stream = LimitedStream::new(response.bytes_stream(), 1 << 20);
let mut body = Vec::new();
stream
.try_for_each(|chunk| {
body.extend_from_slice(&chunk);
futures::future::ready(Ok(()))
})
.await
.map_err(|e| match e {
None => ResolveError::BodySizeExceeded,
Some(e) => e.into(),
})?;
let decode = serde_json::from_slice(&body)?;
Ok(decode)
}
}