init
Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
This commit is contained in:
commit
d891dff9e5
8 changed files with 5467 additions and 0 deletions
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
|
@ -0,0 +1,3 @@
|
|||
/target
|
||||
/config
|
||||
/dump
|
4075
Cargo.lock
generated
Normal file
4075
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
33
Cargo.toml
Normal file
33
Cargo.toml
Normal file
|
@ -0,0 +1,33 @@
|
|||
[package]
|
||||
name = "matrix-dump"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
[dependencies]
|
||||
aes = "0.8.4"
|
||||
clap = { version = "4.5.16", features = ["derive"] }
|
||||
crossterm = "0.28.1"
|
||||
ctr = "0.9.2"
|
||||
env_logger = "0.11.5"
|
||||
futures = "0.3.30"
|
||||
log = "0.4.22"
|
||||
matrix-sdk = { version = "0.7.1", default-features = false, features = ["sqlite", "e2e-encryption", "automatic-room-key-forwarding"] }
|
||||
matrix-sdk-common = "0.7.0"
|
||||
matrix-sdk-crypto = {version = "0.7.2", features = ["automatic-room-key-forwarding"]}
|
||||
reqwest = { version = "0.12.7", features = ["stream"] }
|
||||
ruma-client = { version = "0.12", features = ["client-api", "reqwest", "reqwest-rustls-native-roots"] }
|
||||
ruma-client-api = { version = "0.18.0", features = ["client"] }
|
||||
ruma-common = "0.13.0"
|
||||
ruma-events = { version = "0.28.1" }
|
||||
serde = { version = "1.0.209", features = ["derive"] }
|
||||
serde_json = "1.0.127"
|
||||
sha2 = "0.10.8"
|
||||
thiserror = "1.0.63"
|
||||
tokio = { version = "1.40.0", features = ["rt", "rt-multi-thread", "macros", "time", "fs", "signal", "sync", "io-std"] }
|
||||
|
||||
[features]
|
||||
default = ["rustls"]
|
||||
bundled-sqlite = ["matrix-sdk/bundled-sqlite"]
|
||||
rustls = ["matrix-sdk/rustls-tls"]
|
||||
native-tls = ["matrix-sdk/native-tls"]
|
222
deny.toml
Normal file
222
deny.toml
Normal file
|
@ -0,0 +1,222 @@
|
|||
# This template contains all of the possible sections and their default values
|
||||
|
||||
# Note that all fields that take a lint level have these possible values:
|
||||
# * deny - An error will be produced and the check will fail
|
||||
# * warn - A warning will be produced, but the check will not fail
|
||||
# * allow - No warning or error will be produced, though in some cases a note
|
||||
# will be
|
||||
|
||||
# The values provided in this template are the default values that will be used
|
||||
# when any section or field is not specified in your own configuration
|
||||
|
||||
# Root options
|
||||
|
||||
# The graph table configures how the dependency graph is constructed and thus
|
||||
# which crates the checks are performed against
|
||||
[graph]
|
||||
# If 1 or more target triples (and optionally, target_features) are specified,
|
||||
# only the specified targets will be checked when running `cargo deny check`.
|
||||
# This means, if a particular package is only ever used as a target specific
|
||||
# dependency, such as, for example, the `nix` crate only being used via the
|
||||
# `target_family = "unix"` configuration, that only having windows targets in
|
||||
# this list would mean the nix crate, as well as any of its exclusive
|
||||
# dependencies not shared by any other crates, would be ignored, as the target
|
||||
# list here is effectively saying which targets you are building for.
|
||||
targets = [
|
||||
# The triple can be any string, but only the target triples built in to
|
||||
# rustc (as of 1.40) can be checked against actual config expressions
|
||||
#"x86_64-unknown-linux-musl",
|
||||
# You can also specify which target_features you promise are enabled for a
|
||||
# particular target. target_features are currently not validated against
|
||||
# the actual valid features supported by the target architecture.
|
||||
#{ triple = "wasm32-unknown-unknown", features = ["atomics"] },
|
||||
]
|
||||
# When creating the dependency graph used as the source of truth when checks are
|
||||
# executed, this field can be used to prune crates from the graph, removing them
|
||||
# from the view of cargo-deny. This is an extremely heavy hammer, as if a crate
|
||||
# is pruned from the graph, all of its dependencies will also be pruned unless
|
||||
# they are connected to another crate in the graph that hasn't been pruned,
|
||||
# so it should be used with care. The identifiers are [Package ID Specifications]
|
||||
# (https://doc.rust-lang.org/cargo/reference/pkgid-spec.html)
|
||||
#exclude = []
|
||||
# If true, metadata will be collected with `--all-features`. Note that this can't
|
||||
# be toggled off if true, if you want to conditionally enable `--all-features` it
|
||||
# is recommended to pass `--all-features` on the cmd line instead
|
||||
all-features = false
|
||||
# If true, metadata will be collected with `--no-default-features`. The same
|
||||
# caveat with `all-features` applies
|
||||
no-default-features = false
|
||||
# If set, these feature will be enabled when collecting metadata. If `--features`
|
||||
# is specified on the cmd line they will take precedence over this option.
|
||||
#features = []
|
||||
|
||||
# The output table provides options for how/if diagnostics are outputted
|
||||
[output]
|
||||
# When outputting inclusion graphs in diagnostics that include features, this
|
||||
# option can be used to specify the depth at which feature edges will be added.
|
||||
# This option is included since the graphs can be quite large and the addition
|
||||
# of features from the crate(s) to all of the graph roots can be far too verbose.
|
||||
# This option can be overridden via `--feature-depth` on the cmd line
|
||||
feature-depth = 1
|
||||
|
||||
# This section is considered when running `cargo deny check advisories`
|
||||
# More documentation for the advisories section can be found here:
|
||||
# https://embarkstudios.github.io/cargo-deny/checks/advisories/cfg.html
|
||||
[advisories]
|
||||
# The path where the advisory databases are cloned/fetched into
|
||||
#db-path = "$CARGO_HOME/advisory-dbs"
|
||||
# The url(s) of the advisory databases to use
|
||||
#db-urls = ["https://github.com/rustsec/advisory-db"]
|
||||
# A list of advisory IDs to ignore. Note that ignored advisories will still
|
||||
# output a note when they are encountered.
|
||||
ignore = [
|
||||
#"RUSTSEC-0000-0000",
|
||||
#{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" },
|
||||
#"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish
|
||||
#{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" },
|
||||
]
|
||||
# If this is true, then cargo deny will use the git executable to fetch advisory database.
|
||||
# If this is false, then it uses a built-in git library.
|
||||
# Setting this to true can be helpful if you have special authentication requirements that cargo-deny does not support.
|
||||
# See Git Authentication for more information about setting up git authentication.
|
||||
#git-fetch-with-cli = true
|
||||
|
||||
# This section is considered when running `cargo deny check licenses`
|
||||
# More documentation for the licenses section can be found here:
|
||||
# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html
|
||||
[licenses]
|
||||
# List of explicitly allowed licenses
|
||||
# See https://spdx.org/licenses/ for list of possible licenses
|
||||
# [possible values: any SPDX 3.11 short identifier (+ optional exception)].
|
||||
allow = [
|
||||
"MIT",
|
||||
"Apache-2.0",
|
||||
"ISC",
|
||||
"BSD-3-Clause",
|
||||
"Zlib",
|
||||
"MPL-2.0",
|
||||
#"Apache-2.0 WITH LLVM-exception",
|
||||
]
|
||||
# The confidence threshold for detecting a license from license text.
|
||||
# The higher the value, the more closely the license text must be to the
|
||||
# canonical license text of a valid SPDX license file.
|
||||
# [possible values: any between 0.0 and 1.0].
|
||||
confidence-threshold = 0.8
|
||||
# Allow 1 or more licenses on a per-crate basis, so that particular licenses
|
||||
# aren't accepted for every possible crate as with the normal allow list
|
||||
exceptions = [
|
||||
# Each entry is the crate and version constraint, and its specific allow
|
||||
# list
|
||||
#{ allow = ["Zlib"], crate = "adler32" },
|
||||
{ allow = ["Unicode-DFS-2016"], crate = "unicode-ident" }
|
||||
]
|
||||
|
||||
[licenses.private]
|
||||
# If true, ignores workspace crates that aren't published, or are only
|
||||
# published to private registries.
|
||||
# To see how to mark a crate as unpublished (to the official registry),
|
||||
# visit https://doc.rust-lang.org/cargo/reference/manifest.html#the-publish-field.
|
||||
ignore = false
|
||||
# One or more private registries that you might publish crates to, if a crate
|
||||
# is only published to private registries, and ignore is true, the crate will
|
||||
# not have its license(s) checked
|
||||
registries = [
|
||||
#"https://sekretz.com/registry
|
||||
]
|
||||
|
||||
# This section is considered when running `cargo deny check bans`.
|
||||
# More documentation about the 'bans' section can be found here:
|
||||
# https://embarkstudios.github.io/cargo-deny/checks/bans/cfg.html
|
||||
[bans]
|
||||
# Lint level for when multiple versions of the same crate are detected
|
||||
multiple-versions = "warn"
|
||||
# Lint level for when a crate version requirement is `*`
|
||||
wildcards = "allow"
|
||||
# The graph highlighting used when creating dotgraphs for crates
|
||||
# with multiple versions
|
||||
# * lowest-version - The path to the lowest versioned duplicate is highlighted
|
||||
# * simplest-path - The path to the version with the fewest edges is highlighted
|
||||
# * all - Both lowest-version and simplest-path are used
|
||||
highlight = "all"
|
||||
# The default lint level for `default` features for crates that are members of
|
||||
# the workspace that is being checked. This can be overridden by allowing/denying
|
||||
# `default` on a crate-by-crate basis if desired.
|
||||
workspace-default-features = "allow"
|
||||
# The default lint level for `default` features for external crates that are not
|
||||
# members of the workspace. This can be overridden by allowing/denying `default`
|
||||
# on a crate-by-crate basis if desired.
|
||||
external-default-features = "allow"
|
||||
# List of crates that are allowed. Use with care!
|
||||
allow = [
|
||||
#"ansi_term@0.11.0",
|
||||
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is allowed" },
|
||||
]
|
||||
# List of crates to deny
|
||||
deny = [
|
||||
#"ansi_term@0.11.0",
|
||||
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is banned" },
|
||||
# Wrapper crates can optionally be specified to allow the crate when it
|
||||
# is a direct dependency of the otherwise banned crate
|
||||
#{ crate = "ansi_term@0.11.0", wrappers = ["this-crate-directly-depends-on-ansi_term"] },
|
||||
]
|
||||
|
||||
# List of features to allow/deny
|
||||
# Each entry the name of a crate and a version range. If version is
|
||||
# not specified, all versions will be matched.
|
||||
#[[bans.features]]
|
||||
#crate = "reqwest"
|
||||
# Features to not allow
|
||||
#deny = ["json"]
|
||||
# Features to allow
|
||||
#allow = [
|
||||
# "rustls",
|
||||
# "__rustls",
|
||||
# "__tls",
|
||||
# "hyper-rustls",
|
||||
# "rustls",
|
||||
# "rustls-pemfile",
|
||||
# "rustls-tls-webpki-roots",
|
||||
# "tokio-rustls",
|
||||
# "webpki-roots",
|
||||
#]
|
||||
# If true, the allowed features must exactly match the enabled feature set. If
|
||||
# this is set there is no point setting `deny`
|
||||
#exact = true
|
||||
|
||||
# Certain crates/versions that will be skipped when doing duplicate detection.
|
||||
skip = [
|
||||
#"ansi_term@0.11.0",
|
||||
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" },
|
||||
]
|
||||
# Similarly to `skip` allows you to skip certain crates during duplicate
|
||||
# detection. Unlike skip, it also includes the entire tree of transitive
|
||||
# dependencies starting at the specified crate, up to a certain depth, which is
|
||||
# by default infinite.
|
||||
skip-tree = [
|
||||
#"ansi_term@0.11.0", # will be skipped along with _all_ of its direct and transitive dependencies
|
||||
#{ crate = "ansi_term@0.11.0", depth = 20 },
|
||||
]
|
||||
|
||||
# This section is considered when running `cargo deny check sources`.
|
||||
# More documentation about the 'sources' section can be found here:
|
||||
# https://embarkstudios.github.io/cargo-deny/checks/sources/cfg.html
|
||||
[sources]
|
||||
# Lint level for what to happen when a crate from a crate registry that is not
|
||||
# in the allow list is encountered
|
||||
unknown-registry = "warn"
|
||||
# Lint level for what to happen when a crate from a git repository that is not
|
||||
# in the allow list is encountered
|
||||
unknown-git = "warn"
|
||||
# List of URLs for allowed crate registries. Defaults to the crates.io index
|
||||
# if not specified. If it is specified but empty, no registries are allowed.
|
||||
allow-registry = ["https://github.com/rust-lang/crates.io-index"]
|
||||
# List of URLs for allowed Git repositories
|
||||
allow-git = []
|
||||
|
||||
[sources.allow-org]
|
||||
# github.com organizations to allow git sources for
|
||||
github = []
|
||||
# gitlab.com organizations to allow git sources for
|
||||
gitlab = []
|
||||
# bitbucket.org organizations to allow git sources for
|
||||
bitbucket = []
|
124
src/e2e/mod.rs
Normal file
124
src/e2e/mod.rs
Normal file
|
@ -0,0 +1,124 @@
|
|||
use aes::cipher::{KeyIvInit, StreamCipher};
|
||||
use futures::{TryStream, TryStreamExt};
|
||||
use matrix_sdk::bytes::Bytes;
|
||||
use ruma_events::room::{EncryptedFile, JsonWebKey};
|
||||
use sha2::Digest;
|
||||
use std::pin::Pin;
|
||||
|
||||
type Aes256CTR = ctr::Ctr128BE<aes::Aes256>;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ErrOrWrongHash<E: std::error::Error> {
|
||||
#[error("Error: {0}")]
|
||||
Err(#[from] E),
|
||||
#[error("Wrong hash")]
|
||||
WrongHash,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DecryptError {
|
||||
#[error("Wrong key spec")]
|
||||
WrongKeySpec,
|
||||
|
||||
#[error("IO error")]
|
||||
Io(#[from] tokio::io::Error),
|
||||
|
||||
#[error("Unsupported Encryption Algorithm: {0}")]
|
||||
UnsupportedEncryptionAlgorithm(String),
|
||||
|
||||
#[error("Missing hash")]
|
||||
MissingHash,
|
||||
|
||||
#[error("Mismatched hash")]
|
||||
MismatchedHash,
|
||||
|
||||
#[error("Version mismatch")]
|
||||
VersionMismatch,
|
||||
}
|
||||
|
||||
pub async fn try_decrypt<'s, E>(
|
||||
jwk: &JsonWebKey,
|
||||
data: impl TryStream<Ok = Vec<u8>, Error = E> + Send + 's,
|
||||
iv: &[u8],
|
||||
) -> Result<impl TryStream<Ok = Vec<u8>, Error = E> + 's, DecryptError> {
|
||||
if jwk.alg != "A256CTR" {
|
||||
return Err(DecryptError::UnsupportedEncryptionAlgorithm(
|
||||
jwk.alg.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
let key = jwk.k.as_bytes();
|
||||
|
||||
let mut cipher = Aes256CTR::new_from_slices(key, iv).map_err(|_| DecryptError::WrongKeySpec)?;
|
||||
|
||||
Ok(data.map_ok(move |mut chunk| {
|
||||
cipher.apply_keystream(&mut chunk);
|
||||
chunk
|
||||
}))
|
||||
}
|
||||
|
||||
pub struct VerifyingStream<'a, S> {
|
||||
inner: Pin<Box<S>>,
|
||||
hasher: Option<sha2::Sha256>,
|
||||
expected: &'a [u8],
|
||||
}
|
||||
|
||||
impl<'a, S> VerifyingStream<'a, S> {
|
||||
pub fn new(inner: Pin<Box<S>>, expected: &'a [u8]) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
hasher: Some(sha2::Sha256::new()),
|
||||
expected,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S, E: std::error::Error> futures::Stream for VerifyingStream<'a, S>
|
||||
where
|
||||
S: futures::Stream<Item = Result<Vec<u8>, E>>,
|
||||
{
|
||||
type Item = Result<Vec<u8>, ErrOrWrongHash<E>>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
match futures::ready!(std::pin::Pin::new(&mut self.inner).poll_next(cx)) {
|
||||
Some(Ok(chunk)) => {
|
||||
self.hasher.as_mut().unwrap().update(&chunk);
|
||||
std::task::Poll::Ready(Some(Ok(chunk)))
|
||||
}
|
||||
Some(Err(e)) => std::task::Poll::Ready(Some(Err(ErrOrWrongHash::Err(e)))),
|
||||
None => match self.hasher.take() {
|
||||
None => return std::task::Poll::Ready(None),
|
||||
Some(hash) => {
|
||||
if hash.finalize().as_slice() == self.expected {
|
||||
return std::task::Poll::Ready(None);
|
||||
}
|
||||
std::task::Poll::Ready(Some(Err(ErrOrWrongHash::WrongHash)))
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn decrypt_file<'s, E: std::error::Error + 's>(
|
||||
file: &'s EncryptedFile,
|
||||
data: impl TryStream<Ok = Bytes, Error = E> + Send + 's,
|
||||
) -> Result<impl TryStream<Ok = Vec<u8>, Error = ErrOrWrongHash<E>> + 's, DecryptError> {
|
||||
if file.v.to_ascii_lowercase() != "v2" {
|
||||
return Err(DecryptError::VersionMismatch);
|
||||
}
|
||||
|
||||
let iv = file.iv.as_bytes();
|
||||
|
||||
let sha256 = file.hashes.get("sha256").ok_or(DecryptError::MissingHash)?;
|
||||
let sha256_expect = sha256.as_bytes();
|
||||
|
||||
let data = Box::pin(VerifyingStream::new(
|
||||
Box::pin(data.map_ok(|b| b.to_vec())),
|
||||
sha256_expect,
|
||||
));
|
||||
|
||||
try_decrypt(&file.key, data, &iv).await
|
||||
}
|
449
src/lib.rs
Normal file
449
src/lib.rs
Normal file
|
@ -0,0 +1,449 @@
|
|||
use std::{pin::Pin, sync::Arc};
|
||||
|
||||
use e2e::ErrOrWrongHash;
|
||||
use futures::{future::BoxFuture, stream, StreamExt, TryStream, TryStreamExt};
|
||||
use matrix_sdk::{
|
||||
bytes::Bytes,
|
||||
deserialized_responses::TimelineEvent,
|
||||
encryption::verification::{Verification, VerificationRequestState},
|
||||
room::MessagesOptions,
|
||||
ruma::events::key::verification::VerificationMethod,
|
||||
Client, HttpError, IdParseError, Room,
|
||||
};
|
||||
use matrix_sdk_crypto::SasState;
|
||||
use ruma_client_api::{
|
||||
filter::{EventFormat, FilterDefinition, RoomEventFilter, RoomFilter},
|
||||
sync::sync_events::v3::Filter,
|
||||
};
|
||||
use ruma_common::MxcUriError;
|
||||
use ruma_events::room::{
|
||||
message::{MessageType, RoomMessageEvent},
|
||||
MediaSource,
|
||||
};
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
pub mod e2e;
|
||||
pub mod serdes;
|
||||
|
||||
fn mxc_url_to_https(mxc_url: &str, homeserver: &str) -> String {
|
||||
format!(
|
||||
"{}_matrix/media/r0/download/{}",
|
||||
homeserver,
|
||||
mxc_url.trim_start_matches("mxc://")
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DumpError {
|
||||
#[error("Serialization error: {0}")]
|
||||
Serialization(#[from] serde_json::Error),
|
||||
|
||||
#[error("Matrix SDK error: {0}")]
|
||||
Matrix(#[from] matrix_sdk::Error),
|
||||
|
||||
#[error("IO error: {0}")]
|
||||
Tokio(#[from] tokio::io::Error),
|
||||
|
||||
#[error("MxcUri error: {0}")]
|
||||
MxcUri(#[from] MxcUriError),
|
||||
|
||||
#[error("HTTP error: {0}")]
|
||||
HttpError(#[from] HttpError),
|
||||
|
||||
#[error("Unable to decrypt media: {0}")]
|
||||
Decrypt(#[from] e2e::DecryptError),
|
||||
|
||||
#[error("Failed to verify media hash")]
|
||||
HashMismatch,
|
||||
|
||||
#[error("Reqwest error: {0}")]
|
||||
Reqwest(#[from] matrix_sdk::reqwest::Error),
|
||||
|
||||
#[error("Invalid ID: {0}")]
|
||||
InvalidId(#[from] IdParseError),
|
||||
}
|
||||
|
||||
pub struct MatrixClient {
|
||||
client: Client,
|
||||
}
|
||||
|
||||
pub fn minimal_sync_filter() -> Filter {
|
||||
let mut filter = FilterDefinition::empty();
|
||||
|
||||
filter.event_format = EventFormat::Client;
|
||||
filter.presence = ruma_client_api::filter::Filter::empty();
|
||||
|
||||
let mut room_filter = RoomFilter::empty();
|
||||
let mut room_event_filter = RoomEventFilter::empty();
|
||||
room_event_filter.types = Some(
|
||||
vec![
|
||||
"m.room.encryption",
|
||||
"m.room.encryption*",
|
||||
"m.room.create",
|
||||
"m.room.avatar",
|
||||
]
|
||||
.into_iter()
|
||||
.map(|s| s.to_string())
|
||||
.collect(),
|
||||
);
|
||||
room_filter.timeline = room_event_filter;
|
||||
|
||||
filter.room = room_filter;
|
||||
|
||||
Filter::FilterDefinition(filter)
|
||||
}
|
||||
|
||||
impl MatrixClient {
|
||||
pub fn new(client: Client) -> Self {
|
||||
Self { client }
|
||||
}
|
||||
|
||||
pub fn new_arc(client: Client) -> Arc<Self> {
|
||||
Arc::new(Self::new(client))
|
||||
}
|
||||
|
||||
pub fn client(&self) -> &Client {
|
||||
&self.client
|
||||
}
|
||||
|
||||
pub fn try_read_attachment<'c, 'a: 'c>(
|
||||
self: Arc<Self>,
|
||||
client: &'c matrix_sdk::reqwest::Client,
|
||||
msg: &'a RoomMessageEvent,
|
||||
) -> Result<
|
||||
Option<
|
||||
BoxFuture<
|
||||
'c,
|
||||
Result<
|
||||
(
|
||||
String,
|
||||
String,
|
||||
Pin<
|
||||
Box<
|
||||
dyn TryStream<
|
||||
Ok = Bytes,
|
||||
Error = DumpError,
|
||||
Item = Result<Bytes, DumpError>,
|
||||
> + Send
|
||||
+ 'c,
|
||||
>,
|
||||
>,
|
||||
),
|
||||
DumpError,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
DumpError,
|
||||
> {
|
||||
macro_rules! impl_file_like {
|
||||
($msg:expr, $($variant:ident),*) => {
|
||||
match $msg {
|
||||
$(
|
||||
MessageType::$variant(file) => {
|
||||
Ok(Some(Box::pin(
|
||||
async move {
|
||||
let src = match &file.source {
|
||||
MediaSource::Plain(s) => s,
|
||||
MediaSource::Encrypted(e) => &e.url,
|
||||
};
|
||||
let filename = file.filename.as_deref().map(|s| s.to_string()).unwrap_or(file.body.clone());
|
||||
let url = mxc_url_to_https(src.as_str(), self.client.homeserver().as_str());
|
||||
let resp = client.get(&url).send().await?;
|
||||
let body = resp.bytes_stream();
|
||||
Ok((filename, url, match &file.source {
|
||||
MediaSource::Plain(_) => {
|
||||
Box::pin(body.map_err(DumpError::from)) as Pin<Box<dyn TryStream<Ok = Bytes, Error = DumpError, Item = Result<Bytes, DumpError>> + Send>>
|
||||
}
|
||||
MediaSource::Encrypted(e) => Box::pin(e2e::decrypt_file(e.as_ref(), body).await?.map_ok(|v| Bytes::from(v)).map_err(
|
||||
|e| match e {
|
||||
ErrOrWrongHash::Err(e) => e.into(),
|
||||
ErrOrWrongHash::WrongHash => DumpError::HashMismatch,
|
||||
},
|
||||
)) as Pin<Box<dyn TryStream<Ok = Bytes, Error = DumpError, Item = Result<Bytes, DumpError>> + Send>>,
|
||||
}))
|
||||
})))}
|
||||
)*
|
||||
_ => Ok(None),
|
||||
}
|
||||
};
|
||||
}
|
||||
match msg {
|
||||
RoomMessageEvent::Original(msg) => {
|
||||
impl_file_like!(&msg.content.msgtype, Image, Video, Audio, File)
|
||||
}
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn setup_e2e(self: Arc<Self>) -> bool {
|
||||
let client = &self.client;
|
||||
|
||||
log::info!("Preparing e2e machine");
|
||||
client
|
||||
.encryption()
|
||||
.wait_for_e2ee_initialization_tasks()
|
||||
.await;
|
||||
log::info!("E2E machine ready");
|
||||
|
||||
let own_device = client
|
||||
.encryption()
|
||||
.get_own_device()
|
||||
.await
|
||||
.expect("Failed to get own device")
|
||||
.expect("No own device found");
|
||||
|
||||
if own_device.is_cross_signed_by_owner() {
|
||||
log::info!("Cross-signing keys are already set up");
|
||||
return true;
|
||||
}
|
||||
|
||||
let mut stdin = tokio::io::stdin();
|
||||
|
||||
let devices = client
|
||||
.encryption()
|
||||
.get_user_devices(own_device.user_id())
|
||||
.await
|
||||
.expect("Failed to get devices")
|
||||
.devices()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (i, d) in devices.iter().enumerate() {
|
||||
log::info!(
|
||||
"Device {}: {} ({})",
|
||||
i,
|
||||
d.display_name().unwrap_or_else(|| "Unnamed"),
|
||||
d.device_id()
|
||||
);
|
||||
}
|
||||
|
||||
println!("Enter the device number to verify with: ");
|
||||
let mut response = String::new();
|
||||
while let Some(c) = stdin.read_u8().await.ok() {
|
||||
if c == b'\n' {
|
||||
break;
|
||||
}
|
||||
response.push(c as char);
|
||||
}
|
||||
|
||||
let device_num = response
|
||||
.trim()
|
||||
.parse::<usize>()
|
||||
.expect("Failed to parse device number");
|
||||
|
||||
let device = devices.get(device_num).expect("Invalid device number");
|
||||
|
||||
log::info!(
|
||||
"Requesting verification with {} ({})",
|
||||
device.display_name().unwrap_or_else(|| "Unnamed"),
|
||||
device.device_id()
|
||||
);
|
||||
|
||||
let req = match device
|
||||
.request_verification_with_methods(vec![VerificationMethod::SasV1])
|
||||
.await
|
||||
{
|
||||
Ok(req) => req,
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"Failed to request verification for {}: {}",
|
||||
device.device_id(),
|
||||
e
|
||||
);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let device_name = format!(
|
||||
"{} ({})",
|
||||
device.display_name().unwrap_or_else(|| "Unnamed"),
|
||||
device.device_id()
|
||||
);
|
||||
|
||||
let device_name_clone = device_name.clone();
|
||||
|
||||
let mut c = req.changes();
|
||||
|
||||
while let Some(change) = c.next().await {
|
||||
match change {
|
||||
VerificationRequestState::Done => {
|
||||
log::info!("Verification successful for {}", device_name_clone);
|
||||
return true;
|
||||
}
|
||||
VerificationRequestState::Cancelled(info) => {
|
||||
log::info!(
|
||||
"Verification canceled for {}: {:?}",
|
||||
device.device_id(),
|
||||
info
|
||||
);
|
||||
return false;
|
||||
}
|
||||
VerificationRequestState::Transitioned { verification } => {
|
||||
log::info!(
|
||||
"Verification transitioned for {}: {:?}",
|
||||
device.device_id(),
|
||||
verification
|
||||
);
|
||||
match verification {
|
||||
Verification::SasV1(v) => {
|
||||
v.accept().await.expect("Failed to accept verification");
|
||||
let emoji_str = v
|
||||
.emoji()
|
||||
.map(|emojis| {
|
||||
emojis
|
||||
.iter()
|
||||
.map(|e| format!("{} ({})", e.symbol, e.description))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
})
|
||||
.unwrap_or_else(|| "No emojis".to_string());
|
||||
let decimals = v
|
||||
.decimals()
|
||||
.map(|(n1, n2, n3)| format!("{} {} {}", n1, n2, n3))
|
||||
.unwrap_or_else(|| "No decimals".to_string());
|
||||
|
||||
println!(
|
||||
"Verification for {}:\nEmoji: {}\nDecimals: {}\n Confirm? (y/n)",
|
||||
device.device_id(),
|
||||
emoji_str,
|
||||
decimals
|
||||
);
|
||||
|
||||
let mut response = String::new();
|
||||
while let Some(c) = stdin.read_u8().await.ok() {
|
||||
if c == b'\n' {
|
||||
break;
|
||||
}
|
||||
response.push(c as char);
|
||||
}
|
||||
|
||||
if response.trim() == "y" {
|
||||
v.confirm().await.expect("Failed to confirm");
|
||||
} else {
|
||||
v.cancel().await.expect("Failed to cancel");
|
||||
}
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
VerificationRequestState::Ready {
|
||||
their_methods,
|
||||
our_methods,
|
||||
..
|
||||
} => {
|
||||
log::info!(
|
||||
"Verification ready for {}: their methods: {:?}, our methods: {:?}",
|
||||
device.device_id(),
|
||||
their_methods,
|
||||
our_methods
|
||||
);
|
||||
|
||||
req.accept_with_methods(vec![VerificationMethod::SasV1])
|
||||
.await
|
||||
.expect("Failed to accept verification");
|
||||
|
||||
let sas = req
|
||||
.start_sas()
|
||||
.await
|
||||
.expect("Failed to start SAS")
|
||||
.expect("No SAS");
|
||||
|
||||
sas.accept().await.expect("Failed to accept SAS");
|
||||
|
||||
while let Some(event) = sas.changes().next().await {
|
||||
log::info!("SAS event: {:?}", event);
|
||||
match event {
|
||||
SasState::Started { protocols } => {
|
||||
log::info!("SAS started with protocols: {:?}", protocols);
|
||||
sas.accept().await.expect("Failed to accept SAS");
|
||||
}
|
||||
SasState::Cancelled(c) => {
|
||||
log::info!("SAS canceled: {:?}", c);
|
||||
return false;
|
||||
}
|
||||
SasState::Done {
|
||||
verified_devices,
|
||||
verified_identities,
|
||||
} => {
|
||||
log::info!(
|
||||
"SAS done: verified devices: {:?}, verified identities: {:?}",
|
||||
verified_devices,
|
||||
verified_identities
|
||||
);
|
||||
}
|
||||
SasState::KeysExchanged { emojis, decimals } => {
|
||||
println!(
|
||||
"Verification for {}:\nEmoji: {}\nDecimals: {}\n Confirm? (y/n)",
|
||||
device.device_id(),
|
||||
emojis.map(|e| e.emojis
|
||||
.iter()
|
||||
.map(|e| format!("{} ({})", e.symbol, e.description))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", "),
|
||||
).unwrap_or_else(|| "No emojis".to_string()),
|
||||
format!("{} {} {}", decimals.0, decimals.1, decimals.2)
|
||||
);
|
||||
|
||||
let mut response = String::new();
|
||||
|
||||
while let Some(c) = stdin.read_u8().await.ok() {
|
||||
if c == b'\n' {
|
||||
break;
|
||||
}
|
||||
response.push(c as char);
|
||||
}
|
||||
|
||||
if response.trim() == "y" {
|
||||
sas.confirm().await.expect("Failed to confirm SAS");
|
||||
} else {
|
||||
sas.cancel().await.expect("Failed to cancel SAS");
|
||||
}
|
||||
}
|
||||
SasState::Accepted { accepted_protocols } => {
|
||||
log::info!("SAS accepted with protocols: {:?}", accepted_protocols);
|
||||
}
|
||||
SasState::Confirmed => {
|
||||
log::info!("SAS confirmed! Waiting for verification to finish");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
VerificationRequestState::Requested { their_methods, .. } => {
|
||||
log::info!(
|
||||
"Verification requested for {}: their methods: {:?}",
|
||||
device.device_id(),
|
||||
their_methods
|
||||
);
|
||||
}
|
||||
VerificationRequestState::Created { our_methods } => {
|
||||
log::info!(
|
||||
"Verification created for {}: our methods: {:?}",
|
||||
device.device_id(),
|
||||
our_methods
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("Verification for {} fell through", device.device_id());
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
pub fn room_messages(
|
||||
room: &Room,
|
||||
since: Option<String>,
|
||||
) -> impl TryStream<Ok = Vec<TimelineEvent>, Error = matrix_sdk::Error> + '_ {
|
||||
stream::try_unfold(since, move |since| async move {
|
||||
let mut opt = MessagesOptions::forward().from(since.as_deref());
|
||||
opt.limit = 32.try_into().expect("Failed to convert");
|
||||
|
||||
room.messages(opt).await.map(|r| {
|
||||
if r.chunk.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some((r.chunk, r.end))
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
468
src/main.rs
Normal file
468
src/main.rs
Normal file
|
@ -0,0 +1,468 @@
|
|||
use std::{
|
||||
fs::OpenOptions,
|
||||
os::unix::fs::OpenOptionsExt,
|
||||
path::Path,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
|
||||
use clap::Parser;
|
||||
use futures::TryStreamExt;
|
||||
use matrix_dump::{DumpError, MatrixClient};
|
||||
use matrix_sdk::{
|
||||
config::SyncSettings, deserialized_responses::EncryptionInfo, Client, RoomState, ServerName,
|
||||
};
|
||||
use reqwest::Url;
|
||||
use ruma_client::http_client::Reqwest;
|
||||
use ruma_common::serde::Raw;
|
||||
use ruma_events::{AnyMessageLikeEvent, AnyTimelineEvent};
|
||||
use tokio::{io::AsyncWriteExt, sync::Semaphore, task::JoinSet};
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct DumpEvent {
|
||||
event: Raw<AnyTimelineEvent>,
|
||||
file_mapping: Option<(String, String)>,
|
||||
encryption_info: Option<EncryptionInfo>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Serialize)]
|
||||
pub struct RoomMeta<'a> {
|
||||
pub id: &'a str,
|
||||
pub name: Option<&'a str>,
|
||||
pub state: &'a RoomState,
|
||||
}
|
||||
|
||||
fn sanitize_filename(name: &str) -> String {
|
||||
name.chars()
|
||||
.map(|c| match c {
|
||||
'/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' | '!' => '_',
|
||||
_ => c,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Parser)]
|
||||
pub struct Args {
|
||||
#[clap(long = "home", default_value = "matrix.org")]
|
||||
pub home_server: String,
|
||||
|
||||
#[clap(short, long)]
|
||||
pub username: Option<String>,
|
||||
|
||||
#[clap(long, default_value = "Matrix.org Protocol Dumper by Yumechi")]
|
||||
pub device_name: Option<String>,
|
||||
|
||||
#[clap(long)]
|
||||
pub device_id: Option<String>,
|
||||
|
||||
#[clap(long, default_value = "config/token.json")]
|
||||
pub access_token_file: Option<String>,
|
||||
|
||||
#[clap(short, long, default_value = "dump")]
|
||||
pub out_dir: String,
|
||||
|
||||
#[clap(long)]
|
||||
pub filter: Vec<String>,
|
||||
|
||||
#[clap(long, short = 'j', default_value = "4")]
|
||||
pub concurrency: usize,
|
||||
|
||||
#[clap(long, default_value = "config/e2e.db")]
|
||||
pub e2e_db: String,
|
||||
|
||||
#[clap(long)]
|
||||
pub password_file: Option<String>,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
help = "The timeout for the key sync in seconds",
|
||||
default_value = "300"
|
||||
)]
|
||||
pub key_sync_timeout: u64,
|
||||
}
|
||||
|
||||
fn read_password() -> Result<String, std::io::Error> {
|
||||
use crossterm::{execute, style::Print, terminal};
|
||||
|
||||
terminal::enable_raw_mode()?;
|
||||
|
||||
let mut password = String::new();
|
||||
|
||||
execute!(std::io::stdout(), Print("Password:"))?;
|
||||
|
||||
loop {
|
||||
match crossterm::event::read()? {
|
||||
crossterm::event::Event::Key(event) => match event.code {
|
||||
crossterm::event::KeyCode::Enter => break,
|
||||
crossterm::event::KeyCode::Backspace => {
|
||||
password.pop();
|
||||
}
|
||||
crossterm::event::KeyCode::Char(c) => {
|
||||
password.push(c);
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
execute!(std::io::stdout(), Print("\n"))?;
|
||||
|
||||
terminal::disable_raw_mode()?;
|
||||
|
||||
println!();
|
||||
|
||||
Ok(password)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
if std::env::var("RUST_LOG").is_err() {
|
||||
std::env::set_var("RUST_LOG", "info");
|
||||
}
|
||||
|
||||
env_logger::init();
|
||||
let mut js = JoinSet::new();
|
||||
let mut bg_js = JoinSet::new();
|
||||
tokio::select! {
|
||||
_ = run(&mut js, &mut bg_js) => {},
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
log::info!("Received Ctrl-C, exiting");
|
||||
js.abort_all();
|
||||
},
|
||||
}
|
||||
log::info!("Waiting for tasks to finish");
|
||||
bg_js.abort_all();
|
||||
while let Some(_) = bg_js.join_next().await {}
|
||||
js.join_all().await;
|
||||
}
|
||||
|
||||
async fn run(js: &mut JoinSet<Result<(), DumpError>>, bg_js: &mut JoinSet<Result<(), DumpError>>) {
|
||||
let args = Args::parse();
|
||||
log::info!("Starting matrix dump, args: {:?}", args);
|
||||
let http_client = Reqwest::builder().https_only(true).build().unwrap();
|
||||
let client = Client::builder()
|
||||
.server_name(<&ServerName>::try_from(args.home_server.as_str()).unwrap())
|
||||
.http_client(http_client.clone())
|
||||
.sqlite_store(&args.e2e_db, None)
|
||||
.build()
|
||||
.await
|
||||
.expect("Failed to create client");
|
||||
|
||||
let client = MatrixClient::new_arc(client);
|
||||
let client1 = client.clone();
|
||||
let access_token_file_clone = args.access_token_file.clone();
|
||||
(move || async move {
|
||||
if let Some(ref access_token_file) = access_token_file_clone {
|
||||
if std::fs::exists(access_token_file).expect("Failed to check access token file") {
|
||||
let token = serde_json::from_str(
|
||||
&tokio::fs::read_to_string(access_token_file)
|
||||
.await
|
||||
.expect("Failed to read access token file"),
|
||||
)
|
||||
.expect("Failed to parse access token file");
|
||||
|
||||
client1
|
||||
.client()
|
||||
.matrix_auth()
|
||||
.restore_session(token)
|
||||
.await
|
||||
.expect("Failed to restore session");
|
||||
|
||||
log::info!("Restored session");
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(ref password_file) = args.password_file {
|
||||
if std::fs::exists(password_file).expect("Failed to check password file") {
|
||||
log::info!("Logging in with password file");
|
||||
let password = tokio::fs::read_to_string(password_file)
|
||||
.await
|
||||
.expect("Failed to read password file");
|
||||
client1
|
||||
.client()
|
||||
.matrix_auth()
|
||||
.login_username(
|
||||
args.username.clone().expect("Username not provided"),
|
||||
&password,
|
||||
)
|
||||
.initial_device_display_name("Matrix Protocol Dumper By Yumechi")
|
||||
.await
|
||||
.expect("Failed to login");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref username) = args.username {
|
||||
log::info!("Logging in with password prompt");
|
||||
let password = read_password().expect("Failed to read password");
|
||||
client1
|
||||
.client()
|
||||
.matrix_auth()
|
||||
.login_username(username.clone(), &password)
|
||||
.initial_device_display_name("Matrix Protocol Dumper By Yumechi")
|
||||
.await
|
||||
.expect("Failed to login");
|
||||
return;
|
||||
}
|
||||
|
||||
panic!("No login method provided");
|
||||
}
|
||||
})()
|
||||
.await;
|
||||
|
||||
if !client.clone().client().logged_in() {
|
||||
log::error!("Failed to login");
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(s) = client.client().matrix_auth().session() {
|
||||
if let Some(ref access_token_file) = args.access_token_file {
|
||||
let f = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.mode(0o600)
|
||||
.open(access_token_file)
|
||||
.expect("Failed to open access token file");
|
||||
|
||||
serde_json::to_writer(&f, &s).expect("Failed to write access token file");
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("Starting sync");
|
||||
client
|
||||
.clone()
|
||||
.client()
|
||||
.sync_once(SyncSettings::default())
|
||||
.await
|
||||
.expect("Failed to sync");
|
||||
log::info!("Sync done");
|
||||
|
||||
let client1 = client.clone();
|
||||
bg_js.spawn(async move {
|
||||
client1.client().sync(SyncSettings::default()).await?;
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
log::info!("Starting E2E setup");
|
||||
match client.clone().setup_e2e().await {
|
||||
true => log::info!("E2E setup done"),
|
||||
false => log::error!("E2E setup failed"),
|
||||
}
|
||||
|
||||
log::info!("Starting room dump");
|
||||
|
||||
let sem = Arc::new(Semaphore::new(args.concurrency));
|
||||
|
||||
let (synced_keys_tx, _) = tokio::sync::broadcast::channel::<matrix_sdk::ruma::OwnedRoomId>(1);
|
||||
|
||||
let synced_keys_tx = Arc::new(synced_keys_tx);
|
||||
let synced_keys_tx1 = synced_keys_tx.clone();
|
||||
|
||||
client.client().add_event_handler(
|
||||
|ev: matrix_sdk::ruma::events::forwarded_room_key::ToDeviceForwardedRoomKeyEvent| async move {
|
||||
synced_keys_tx.send(ev.content.room_id.clone()).unwrap();
|
||||
},
|
||||
);
|
||||
|
||||
for room in client.client().rooms() {
|
||||
let mut synced_keys_rx = synced_keys_tx1.subscribe();
|
||||
|
||||
let sem = sem.clone();
|
||||
|
||||
let filter = args.filter.clone();
|
||||
let out_dir = args.out_dir.clone();
|
||||
let room_id = room.room_id().to_owned();
|
||||
let room_id_clone = room_id.clone();
|
||||
let client1 = client.clone();
|
||||
let http_client = http_client.clone();
|
||||
js.spawn(async move {
|
||||
if room.is_encrypted().await.unwrap_or(false) && !room.is_encryption_state_synced() {
|
||||
log::info!(
|
||||
"Room {} is encrypted, waiting for at most {} seconds for key sync",
|
||||
room_id_clone,
|
||||
args.key_sync_timeout
|
||||
);
|
||||
|
||||
let room_id_clone1 = room_id_clone.clone();
|
||||
let room_clone = room.clone();
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(args.key_sync_timeout)) => {
|
||||
log::warn!("Key sync timed out for room {}", room_id);
|
||||
}
|
||||
_ = async move {
|
||||
while let Ok(room_id) = synced_keys_rx.recv().await {
|
||||
if room_id == room_id_clone1 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !room_clone.is_encryption_state_synced() {
|
||||
log::warn!("Waiting for another 10 seconds for key sync to finish");
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
|
||||
if !room_clone.is_encryption_state_synced() {
|
||||
log::warn!("Key sync timed out for room {}", room_id_clone1);
|
||||
}
|
||||
} => {
|
||||
log::info!("Key sync done for room {}", room_id_clone);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
let permit = sem.clone().acquire_owned().await;
|
||||
|
||||
|
||||
let room_name = room.display_name().await.map(|d| d.to_string())
|
||||
.unwrap_or(room.name().unwrap_or("unknown".to_string()));
|
||||
|
||||
let room_dir =
|
||||
Path::new(&out_dir).join(sanitize_filename(&format!("{}_{}", room_id, room_name)));
|
||||
|
||||
let match_filter = if filter.is_empty() {
|
||||
true
|
||||
} else {
|
||||
filter
|
||||
.iter()
|
||||
.any(|filter| room_name.contains(filter) || room_id.as_str().contains(filter))
|
||||
};
|
||||
|
||||
tokio::fs::create_dir_all(&room_dir)
|
||||
.await
|
||||
.expect("Failed to create room directory");
|
||||
|
||||
let meta_path = room_dir.join("meta.json");
|
||||
|
||||
serde_json::to_writer_pretty(
|
||||
OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(&meta_path)
|
||||
.expect("Failed to open meta file"),
|
||||
&RoomMeta {
|
||||
id: room_id.as_str(),
|
||||
name: Some(room_name.as_str()),
|
||||
state: &room.state(),
|
||||
},
|
||||
)?;
|
||||
|
||||
if !match_filter {
|
||||
log::debug!("Skipping room: {} ({})", room_id, room_name);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
log::info!("Dumping room: {} ({})", room_id, room_name);
|
||||
|
||||
let chunk_idx = &AtomicU64::new(0);
|
||||
let client1 = client1.clone();
|
||||
MatrixClient::room_messages(&room, None)
|
||||
.try_for_each_concurrent(Some(args.concurrency), |msg| {
|
||||
let room_dir = room_dir.clone();
|
||||
let client1 = client1.clone();
|
||||
let http_client = http_client.clone();
|
||||
async move {
|
||||
let output = room_dir.join(format!(
|
||||
"chunk-{}.json",
|
||||
chunk_idx.fetch_add(1, Ordering::SeqCst)
|
||||
));
|
||||
|
||||
let mut out = Vec::with_capacity(msg.len());
|
||||
|
||||
for event in msg.into_iter() {
|
||||
let mut fm = None;
|
||||
match event.event.clone().cast::<AnyTimelineEvent>().deserialize() {
|
||||
Ok(event) => match event {
|
||||
AnyTimelineEvent::MessageLike(msg) => match msg {
|
||||
AnyMessageLikeEvent::RoomMessage(m) => {
|
||||
match client1.clone()
|
||||
.try_read_attachment(&http_client, &m) {
|
||||
Ok(None) => {}
|
||||
Ok(Some(fut)) => {
|
||||
match fut.await {
|
||||
Ok((filename, url, mut byte_stream)) => {
|
||||
|
||||
let file_name = format!("attachment_{}_{}_{}",
|
||||
m.event_id().as_str(),
|
||||
sanitize_filename(Url::parse(&url).unwrap().path_segments().unwrap().last().unwrap()),
|
||||
sanitize_filename(&filename),
|
||||
);
|
||||
|
||||
let file = tokio::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(room_dir.join(&file_name))
|
||||
.await?;
|
||||
|
||||
let mut file = tokio::io::BufWriter::new(file);
|
||||
|
||||
loop {
|
||||
match byte_stream.try_next().await {
|
||||
Ok(Some(chunk)) => {
|
||||
file.write_all(&chunk).await?;
|
||||
}
|
||||
Ok(None) => {
|
||||
fm = Some((url, file_name));
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Failed to get attachment data: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
file.shutdown().await?;
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Failed to get attachment data: {}", e);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Failed to get attachment: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Failed to deserialize event: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
out.push(DumpEvent {
|
||||
event: Raw::from_json(event.event.into_json()),
|
||||
file_mapping: fm,
|
||||
encryption_info: event.encryption_info.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
serde_json::to_writer_pretty(
|
||||
std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(output)?,
|
||||
&out,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("Failed to get messages");
|
||||
|
||||
drop(permit);
|
||||
|
||||
Ok::<_, DumpError>(())
|
||||
});
|
||||
}
|
||||
}
|
93
src/serdes.rs
Normal file
93
src/serdes.rs
Normal file
|
@ -0,0 +1,93 @@
|
|||
use matrix_sdk_common::ruma::Int;
|
||||
use ruma_common::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId};
|
||||
use ruma_events::{room::encrypted::RoomEncryptedEvent, MessageLikeEvent};
|
||||
use serde::{ser::SerializeStruct, Serialize};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SerializableMegolmError(pub matrix_sdk_crypto::MegolmError);
|
||||
|
||||
impl Serialize for SerializableMegolmError {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let mut s = serializer.serialize_struct("MegolmError", 1)?;
|
||||
|
||||
s.serialize_field("error", &self.0.to_string())?;
|
||||
|
||||
s.end()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SerializableRoomEncryptedEvent(pub RoomEncryptedEvent);
|
||||
|
||||
impl Serialize for SerializableRoomEncryptedEvent {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
#[derive(Serialize)]
|
||||
struct SerializableMessageUnsigned {
|
||||
pub age: Option<Int>,
|
||||
pub transaction_id: Option<OwnedTransactionId>,
|
||||
pub relations: (),
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SerializableRedactedUnsigned {
|
||||
pub sender: OwnedUserId,
|
||||
pub event_id: OwnedEventId,
|
||||
pub redacts: Option<OwnedEventId>,
|
||||
pub origin_server_ts: MilliSecondsSinceUnixEpoch,
|
||||
pub reason: Option<String>,
|
||||
}
|
||||
|
||||
match &self.0 {
|
||||
MessageLikeEvent::Original(event) => {
|
||||
let mut s = serializer.serialize_struct("RoomEncryptedEvent", 6)?;
|
||||
|
||||
s.serialize_field("sender", &event.sender)?;
|
||||
s.serialize_field("event_id", &event.event_id)?;
|
||||
s.serialize_field("content", &event.content)?;
|
||||
s.serialize_field("origin_server_ts", &event.origin_server_ts)?;
|
||||
s.serialize_field("room_id", &event.room_id)?;
|
||||
|
||||
s.serialize_field(
|
||||
"unsigned",
|
||||
&SerializableMessageUnsigned {
|
||||
age: event.unsigned.age,
|
||||
transaction_id: event.unsigned.transaction_id.clone(),
|
||||
relations: (),
|
||||
},
|
||||
)?;
|
||||
|
||||
s.end()
|
||||
}
|
||||
MessageLikeEvent::Redacted(r) => {
|
||||
// Redacted events are not serialized.
|
||||
|
||||
let mut s = serializer.serialize_struct("RoomEncryptedEvent", 6)?;
|
||||
|
||||
s.serialize_field("sender", &r.sender)?;
|
||||
s.serialize_field("event_id", &r.event_id)?;
|
||||
s.serialize_field("content", &r.content)?;
|
||||
s.serialize_field("origin_server_ts", &r.origin_server_ts)?;
|
||||
s.serialize_field("room_id", &r.room_id)?;
|
||||
|
||||
s.serialize_field(
|
||||
"unsigned",
|
||||
&SerializableRedactedUnsigned {
|
||||
redacts: r.unsigned.redacted_because.content.redacts.clone(),
|
||||
reason: r.unsigned.redacted_because.content.reason.clone(),
|
||||
sender: r.unsigned.redacted_because.sender.clone(),
|
||||
event_id: r.unsigned.redacted_because.event_id.clone(),
|
||||
origin_server_ts: r.unsigned.redacted_because.origin_server_ts,
|
||||
},
|
||||
)?;
|
||||
|
||||
s.end()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue