Upgraded tokio, warp, hyper and actix (#912)
* Upgraded tokio, warp, hyper and actix * Code formatting * actix-web temporary version fix specification * Error handling fix on juniper_rocket Co-authored-by: Christian Legnitto <LegNeato@users.noreply.github.com>
This commit is contained in:
parent
de4c0e9088
commit
739cc3bfc2
28 changed files with 135 additions and 118 deletions
|
@ -51,7 +51,7 @@ type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send
|
||||||
#[graphql_subscription(context = Database)]
|
#[graphql_subscription(context = Database)]
|
||||||
impl Subscription {
|
impl Subscription {
|
||||||
async fn hello_world() -> StringStream {
|
async fn hello_world() -> StringStream {
|
||||||
let stream = tokio::stream::iter(vec![
|
let stream = futures::stream::iter(vec![
|
||||||
Ok(String::from("Hello")),
|
Ok(String::from("Hello")),
|
||||||
Ok(String::from("World!"))
|
Ok(String::from("World!"))
|
||||||
]);
|
]);
|
||||||
|
@ -123,7 +123,7 @@ where [`Connection`][Connection] is a `Stream` of values returned by the operati
|
||||||
# impl Subscription {
|
# impl Subscription {
|
||||||
# async fn hello_world() -> StringStream {
|
# async fn hello_world() -> StringStream {
|
||||||
# let stream =
|
# let stream =
|
||||||
# tokio::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
|
# futures::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
|
||||||
# Box::pin(stream)
|
# Box::pin(stream)
|
||||||
# }
|
# }
|
||||||
# }
|
# }
|
||||||
|
|
|
@ -16,7 +16,7 @@ iron = "0.5"
|
||||||
mount = "0.4"
|
mount = "0.4"
|
||||||
skeptic = "0.13"
|
skeptic = "0.13"
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
tokio = { version = "0.2", features = ["blocking", "macros", "rt-core", "rt-util", "stream"] }
|
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||||
uuid = "0.8"
|
uuid = "0.8"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
|
|
|
@ -6,15 +6,15 @@ authors = ["Mihai Dinculescu <mihai.dinculescu@outlook.com>"]
|
||||||
publish = false
|
publish = false
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-web = "3.3"
|
actix-web = "4.0.0-beta.5"
|
||||||
actix-cors = "0.5"
|
actix-cors = "0.6.0-beta.1"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
tokio = { version = "0.2", features = ["macros", "rt-core"] }
|
|
||||||
env_logger = "0.8"
|
env_logger = "0.8"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
rand = "0.7"
|
rand = "0.8"
|
||||||
|
tokio = "1.0"
|
||||||
|
async-stream = "0.3"
|
||||||
juniper = { path = "../../juniper", features = ["expose-test-schema"] }
|
juniper = { path = "../../juniper", features = ["expose-test-schema"] }
|
||||||
juniper_actix = { path = "../../juniper_actix", features = ["subscriptions"] }
|
juniper_actix = { path = "../../juniper_actix", features = ["subscriptions"] }
|
||||||
juniper_graphql_ws = { path = "../../juniper_graphql_ws" }
|
juniper_graphql_ws = { path = "../../juniper_graphql_ws" }
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::{env, pin::Pin, time::Duration};
|
use std::{env, pin::Pin, time::Duration};
|
||||||
|
|
||||||
use actix_cors::Cors;
|
use actix_cors::Cors;
|
||||||
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
|
use actix_web::{http::header, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
|
||||||
|
|
||||||
use juniper::{
|
use juniper::{
|
||||||
graphql_object, graphql_subscription,
|
graphql_object, graphql_subscription,
|
||||||
|
@ -64,27 +64,29 @@ impl Subscription {
|
||||||
|
|
||||||
use rand::{rngs::StdRng, Rng, SeedableRng};
|
use rand::{rngs::StdRng, Rng, SeedableRng};
|
||||||
let mut rng = StdRng::from_entropy();
|
let mut rng = StdRng::from_entropy();
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
||||||
let stream = tokio::time::interval(Duration::from_secs(3)).map(move |_| {
|
let stream = async_stream::stream! {
|
||||||
counter += 1;
|
counter += 1;
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
if counter == 2 {
|
||||||
|
yield Err(FieldError::new(
|
||||||
|
"some field error from handler",
|
||||||
|
Value::Scalar(DefaultScalarValue::String(
|
||||||
|
"some additional string".to_string(),
|
||||||
|
)),
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
let random_id = rng.gen_range(1000..1005).to_string();
|
||||||
|
let human = context.get_human(&random_id).unwrap().clone();
|
||||||
|
|
||||||
if counter == 2 {
|
yield Ok(RandomHuman {
|
||||||
Err(FieldError::new(
|
id: human.id().to_owned(),
|
||||||
"some field error from handler",
|
name: human.name().unwrap().to_owned(),
|
||||||
Value::Scalar(DefaultScalarValue::String(
|
})
|
||||||
"some additional string".to_string(),
|
}
|
||||||
)),
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
let random_id = rng.gen_range(1000, 1005).to_string();
|
|
||||||
let human = context.get_human(&random_id).unwrap().clone();
|
|
||||||
|
|
||||||
Ok(RandomHuman {
|
|
||||||
id: human.id().to_owned(),
|
|
||||||
name: human.name().unwrap().to_owned(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
});
|
};
|
||||||
|
|
||||||
Box::pin(stream)
|
Box::pin(stream)
|
||||||
}
|
}
|
||||||
|
@ -117,7 +119,10 @@ async fn main() -> std::io::Result<()> {
|
||||||
.wrap(middleware::Logger::default())
|
.wrap(middleware::Logger::default())
|
||||||
.wrap(
|
.wrap(
|
||||||
Cors::default()
|
Cors::default()
|
||||||
|
.allowed_origin("http://127.0.0.1:8080")
|
||||||
.allowed_methods(vec!["POST", "GET"])
|
.allowed_methods(vec!["POST", "GET"])
|
||||||
|
.allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT])
|
||||||
|
.allowed_header(header::CONTENT_TYPE)
|
||||||
.supports_credentials()
|
.supports_credentials()
|
||||||
.max_age(3600),
|
.max_age(3600),
|
||||||
)
|
)
|
||||||
|
@ -130,7 +135,7 @@ async fn main() -> std::io::Result<()> {
|
||||||
.service(web::resource("/playground").route(web::get().to(playground)))
|
.service(web::resource("/playground").route(web::get().to(playground)))
|
||||||
.default_service(web::route().to(|| {
|
.default_service(web::route().to(|| {
|
||||||
HttpResponse::Found()
|
HttpResponse::Found()
|
||||||
.header("location", "/playground")
|
.append_header((header::LOCATION, "/playground"))
|
||||||
.finish()
|
.finish()
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
|
|
@ -11,7 +11,7 @@ authors = ["Jordao Rosario <jordao.rosario01@gmail.com>"]
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
tokio = { version = "0.2", features = ["rt-core", "macros", "stream"] }
|
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||||
|
|
||||||
juniper = { path = "../../juniper" }
|
juniper = { path = "../../juniper" }
|
||||||
juniper_subscriptions = { path = "../../juniper_subscriptions" }
|
juniper_subscriptions = { path = "../../juniper_subscriptions" }
|
||||||
|
|
|
@ -37,7 +37,7 @@ type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send
|
||||||
impl Subscription {
|
impl Subscription {
|
||||||
async fn hello_world() -> StringStream {
|
async fn hello_world() -> StringStream {
|
||||||
let stream =
|
let stream =
|
||||||
tokio::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
|
futures::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
|
||||||
Box::pin(stream)
|
Box::pin(stream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,5 +13,5 @@ env_logger = "0.8.1"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
log = "0.4.8"
|
log = "0.4.8"
|
||||||
reqwest = { version = "0.11", features = ["rustls-tls"] }
|
reqwest = { version = "0.11", features = ["rustls-tls"] }
|
||||||
tokio = { version = "0.2", features = ["rt-core", "macros"] }
|
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||||
warp = "0.2"
|
warp = "0.3"
|
||||||
|
|
|
@ -10,9 +10,9 @@ futures = "0.3.1"
|
||||||
log = "0.4.8"
|
log = "0.4.8"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
tokio = { version = "0.2", features = ["rt-core", "macros"] }
|
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||||
warp = "0.2.1"
|
warp = "0.3"
|
||||||
|
async-stream = "0.3"
|
||||||
juniper = { path = "../../juniper" }
|
juniper = { path = "../../juniper" }
|
||||||
juniper_graphql_ws = { path = "../../juniper_graphql_ws" }
|
juniper_graphql_ws = { path = "../../juniper_graphql_ws" }
|
||||||
juniper_warp = { path = "../../juniper_warp", features = ["subscriptions"] }
|
juniper_warp = { path = "../../juniper_warp", features = ["subscriptions"] }
|
||||||
|
|
|
@ -109,23 +109,27 @@ struct Subscription;
|
||||||
impl Subscription {
|
impl Subscription {
|
||||||
async fn users() -> UsersStream {
|
async fn users() -> UsersStream {
|
||||||
let mut counter = 0;
|
let mut counter = 0;
|
||||||
let stream = tokio::time::interval(Duration::from_secs(5)).map(move |_| {
|
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
||||||
|
let stream = async_stream::stream! {
|
||||||
counter += 1;
|
counter += 1;
|
||||||
if counter == 2 {
|
loop {
|
||||||
Err(FieldError::new(
|
interval.tick().await;
|
||||||
"some field error from handler",
|
if counter == 2 {
|
||||||
Value::Scalar(DefaultScalarValue::String(
|
yield Err(FieldError::new(
|
||||||
"some additional string".to_string(),
|
"some field error from handler",
|
||||||
)),
|
Value::Scalar(DefaultScalarValue::String(
|
||||||
))
|
"some additional string".to_string(),
|
||||||
} else {
|
)),
|
||||||
Ok(User {
|
))
|
||||||
id: counter,
|
} else {
|
||||||
kind: UserKind::Admin,
|
yield Ok(User {
|
||||||
name: "stream user".to_string(),
|
id: counter,
|
||||||
})
|
kind: UserKind::Admin,
|
||||||
|
name: "stream user".to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
};
|
||||||
|
|
||||||
Box::pin(stream)
|
Box::pin(stream)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,4 +8,4 @@ publish = false
|
||||||
[dependencies]
|
[dependencies]
|
||||||
juniper = { path = "../../juniper" }
|
juniper = { path = "../../juniper" }
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
tokio = { version = "0.2", features = ["rt-core", "time", "macros"] }
|
tokio = { version = "1", features = ["rt", "time", "macros"] }
|
|
@ -42,7 +42,7 @@ impl User {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delayed() -> bool {
|
async fn delayed() -> bool {
|
||||||
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,7 @@ impl Query {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delayed() -> bool {
|
async fn delayed() -> bool {
|
||||||
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,5 +10,5 @@ futures = "0.3.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serde_json = { version = "1" }
|
serde_json = { version = "1" }
|
||||||
tokio = { version = "0.2", features = ["rt-core", "time", "macros"] }
|
tokio = { version = "1", features = ["rt", "time", "macros"] }
|
||||||
trybuild = "1.0.25"
|
trybuild = "1.0.25"
|
|
@ -14,4 +14,4 @@ juniper_subscriptions = { path = "../../juniper_subscriptions" }
|
||||||
async-trait = "0.1.39"
|
async-trait = "0.1.39"
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
fnv = "1.0"
|
fnv = "1.0"
|
||||||
tokio = { version = "0.2", features = ["macros", "rt-core", "time"] }
|
tokio = { version = "1", features = ["rt", "macros", "time"] }
|
||||||
|
|
|
@ -55,7 +55,7 @@ uuid = { version = "0.8", default-features = false, optional = true }
|
||||||
bencher = "0.1.2"
|
bencher = "0.1.2"
|
||||||
pretty_assertions = "0.7.1"
|
pretty_assertions = "0.7.1"
|
||||||
serde_json = "1.0.2"
|
serde_json = "1.0.2"
|
||||||
tokio = { version = "0.2", features = ["macros", "rt-core", "time"] }
|
tokio = { version = "1", features = ["macros", "time", "rt-multi-thread"] }
|
||||||
|
|
||||||
[[bench]]
|
[[bench]]
|
||||||
name = "bench"
|
name = "bench"
|
||||||
|
|
|
@ -39,7 +39,7 @@ impl User {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delayed() -> bool {
|
async fn delayed() -> bool {
|
||||||
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,7 @@ impl Query {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delayed() -> bool {
|
async fn delayed() -> bool {
|
||||||
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "juniper_actix"
|
name = "juniper_actix"
|
||||||
version = "0.2.5"
|
version = "0.3.0"
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
authors = ["Jordao Rosario <jordao.rosario01@gmail.com>"]
|
authors = ["Jordao Rosario <jordao.rosario01@gmail.com>"]
|
||||||
description = "Juniper GraphQL integration with Actix"
|
description = "Juniper GraphQL integration with Actix"
|
||||||
|
@ -12,27 +12,34 @@ repository = "https://github.com/graphql-rust/juniper"
|
||||||
subscriptions = ["juniper_graphql_ws"]
|
subscriptions = ["juniper_graphql_ws"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix = "0.10"
|
actix = "0.11"
|
||||||
actix-web = "3.3"
|
# actix-web had some problems in beta release see https://github.com/actix/actix-web/issues/2173#issuecomment-822758353
|
||||||
actix-web-actors = "3.0"
|
# and we need these version specification to handle this issue temporarily while the stable release is not available
|
||||||
|
# to understand these dependecy version specification see https://github.com/actix/actix-web/issues/2185 or https://github.com/LukeMathWalker/zero-to-production/issues/96
|
||||||
|
actix-http = "=3.0.0-beta.5"
|
||||||
|
actix-service = "=2.0.0-beta.5"
|
||||||
|
actix-web = "=4.0.0-beta.5"
|
||||||
|
actix-web-actors = "4.0.0-beta.4"
|
||||||
|
|
||||||
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
|
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
|
||||||
juniper_graphql_ws = { version = "0.2.5", path = "../juniper_graphql_ws", optional = true }
|
juniper_graphql_ws = { version = "0.2.5", path = "../juniper_graphql_ws", optional = true }
|
||||||
|
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
futures = "0.3.5"
|
futures = "0.3"
|
||||||
serde = { version = "1.0.116", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0.57"
|
serde_json = "1.0"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
tokio = { version = "0.2", features = ["time"] }
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "1.1"
|
actix-rt = "2"
|
||||||
actix-cors = "0.5"
|
actix-cors = "0.6.0-beta.1"
|
||||||
actix-identity = "0.3"
|
actix-identity = "0.4.0-beta.1"
|
||||||
|
tokio = "1"
|
||||||
|
async-stream = "0.3"
|
||||||
|
actix-test = "0.1.0-beta.1"
|
||||||
|
|
||||||
juniper = { version = "0.15.6", path = "../juniper", features = ["expose-test-schema"] }
|
juniper = { version = "0.15.6", path = "../juniper", features = ["expose-test-schema"] }
|
||||||
|
|
||||||
bytes = "0.6"
|
bytes = "1.0"
|
||||||
env_logger = "0.8"
|
env_logger = "0.8"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
|
|
@ -6,6 +6,7 @@ use actix_cors::Cors;
|
||||||
use actix_web::{http::header, middleware, web, App, Error, HttpResponse, HttpServer};
|
use actix_web::{http::header, middleware, web, App, Error, HttpResponse, HttpServer};
|
||||||
use juniper::{graphql_object, EmptyMutation, EmptySubscription, GraphQLObject, RootNode};
|
use juniper::{graphql_object, EmptyMutation, EmptySubscription, GraphQLObject, RootNode};
|
||||||
use juniper_actix::{graphiql_handler, graphql_handler, playground_handler};
|
use juniper_actix::{graphiql_handler, graphql_handler, playground_handler};
|
||||||
|
|
||||||
#[derive(Clone, GraphQLObject)]
|
#[derive(Clone, GraphQLObject)]
|
||||||
///a user
|
///a user
|
||||||
pub struct User {
|
pub struct User {
|
||||||
|
|
|
@ -215,7 +215,10 @@ pub async fn playground_handler(
|
||||||
/// [1]: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
|
/// [1]: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
|
||||||
#[cfg(feature = "subscriptions")]
|
#[cfg(feature = "subscriptions")]
|
||||||
pub mod subscriptions {
|
pub mod subscriptions {
|
||||||
use std::{fmt, sync::Arc};
|
use std::{
|
||||||
|
fmt,
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
};
|
||||||
|
|
||||||
use actix::{prelude::*, Actor, StreamHandler};
|
use actix::{prelude::*, Actor, StreamHandler};
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
|
@ -224,8 +227,6 @@ pub mod subscriptions {
|
||||||
};
|
};
|
||||||
use actix_web_actors::ws;
|
use actix_web_actors::ws;
|
||||||
|
|
||||||
use tokio::sync::Mutex;
|
|
||||||
|
|
||||||
use juniper::{
|
use juniper::{
|
||||||
futures::{
|
futures::{
|
||||||
stream::{SplitSink, SplitStream, StreamExt},
|
stream::{SplitSink, SplitStream, StreamExt},
|
||||||
|
@ -328,7 +329,7 @@ pub mod subscriptions {
|
||||||
let tx = self.graphql_tx.clone();
|
let tx = self.graphql_tx.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let mut tx = tx.lock().await;
|
let mut tx = tx.lock().unwrap();
|
||||||
tx.send(msg)
|
tx.send(msg)
|
||||||
.await
|
.await
|
||||||
.expect("Infallible: this should not happen");
|
.expect("Infallible: this should not happen");
|
||||||
|
@ -365,7 +366,7 @@ pub mod subscriptions {
|
||||||
let addr = ctx.address();
|
let addr = ctx.address();
|
||||||
|
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
let mut stream = stream.lock().await;
|
let mut stream = stream.lock().unwrap();
|
||||||
while let Some(message) = stream.next().await {
|
while let Some(message) = stream.next().await {
|
||||||
// sending the message to self so that it can be forwarded back to the client
|
// sending the message to self so that it can be forwarded back to the client
|
||||||
addr.do_send(ServerMessageWrapper { message });
|
addr.do_send(ServerMessageWrapper { message });
|
||||||
|
@ -383,7 +384,7 @@ pub mod subscriptions {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// actor -> websocket response
|
/// actor -> websocket response
|
||||||
impl<Query, Mutation, Subscription, CtxT, S, I> Handler<ServerMessageWrapper<S>>
|
impl<Query, Mutation, Subscription, CtxT, S, I> actix::prelude::Handler<ServerMessageWrapper<S>>
|
||||||
for SubscriptionActor<Query, Mutation, Subscription, CtxT, S, I>
|
for SubscriptionActor<Query, Mutation, Subscription, CtxT, S, I>
|
||||||
where
|
where
|
||||||
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
|
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
|
||||||
|
@ -401,14 +402,11 @@ pub mod subscriptions {
|
||||||
fn handle(
|
fn handle(
|
||||||
&mut self,
|
&mut self,
|
||||||
msg: ServerMessageWrapper<S>,
|
msg: ServerMessageWrapper<S>,
|
||||||
ctx: &mut ws::WebsocketContext<Self>,
|
ctx: &mut Self::Context,
|
||||||
) -> Self::Result {
|
) -> Self::Result {
|
||||||
let msg = serde_json::to_string(&msg.message);
|
let msg = serde_json::to_string(&msg.message);
|
||||||
|
|
||||||
match msg {
|
match msg {
|
||||||
Ok(msg) => {
|
Ok(msg) => ctx.text(msg),
|
||||||
ctx.text(msg);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let reason = ws::CloseReason {
|
let reason = ws::CloseReason {
|
||||||
code: ws::CloseCode::Error,
|
code: ws::CloseCode::Error,
|
||||||
|
@ -416,12 +414,12 @@ pub mod subscriptions {
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: trace
|
// TODO: trace
|
||||||
ctx.close(Some(reason));
|
ctx.close(Some(reason))
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
#[rtype(result = "()")]
|
#[rtype(result = "()")]
|
||||||
struct ServerMessageWrapper<S>
|
struct ServerMessageWrapper<S>
|
||||||
|
@ -483,6 +481,7 @@ mod tests {
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use actix_web::http::header::ACCEPT;
|
||||||
|
|
||||||
type Schema =
|
type Schema =
|
||||||
juniper::RootNode<'static, Query, EmptyMutation<Database>, EmptySubscription<Database>>;
|
juniper::RootNode<'static, Query, EmptyMutation<Database>, EmptySubscription<Database>>;
|
||||||
|
@ -523,7 +522,7 @@ mod tests {
|
||||||
test::init_service(App::new().route("/", web::get().to(graphql_handler))).await;
|
test::init_service(App::new().route("/", web::get().to(graphql_handler))).await;
|
||||||
let req = test::TestRequest::get()
|
let req = test::TestRequest::get()
|
||||||
.uri("/")
|
.uri("/")
|
||||||
.header("accept", "text/html")
|
.append_header((ACCEPT, "text/html"))
|
||||||
.to_request();
|
.to_request();
|
||||||
|
|
||||||
let resp = test::call_service(&mut app, req).await;
|
let resp = test::call_service(&mut app, req).await;
|
||||||
|
@ -539,7 +538,7 @@ mod tests {
|
||||||
test::init_service(App::new().route("/", web::get().to(graphql_handler))).await;
|
test::init_service(App::new().route("/", web::get().to(graphql_handler))).await;
|
||||||
let req = test::TestRequest::get()
|
let req = test::TestRequest::get()
|
||||||
.uri("/")
|
.uri("/")
|
||||||
.header("accept", "text/html")
|
.append_header((ACCEPT, "text/html"))
|
||||||
.to_request();
|
.to_request();
|
||||||
|
|
||||||
let mut resp = test::call_service(&mut app, req).await;
|
let mut resp = test::call_service(&mut app, req).await;
|
||||||
|
@ -564,7 +563,7 @@ mod tests {
|
||||||
test::init_service(App::new().route("/", web::get().to(graphql_handler))).await;
|
test::init_service(App::new().route("/", web::get().to(graphql_handler))).await;
|
||||||
let req = test::TestRequest::get()
|
let req = test::TestRequest::get()
|
||||||
.uri("/")
|
.uri("/")
|
||||||
.header("accept", "text/html")
|
.append_header((ACCEPT, "text/html"))
|
||||||
.to_request();
|
.to_request();
|
||||||
|
|
||||||
let resp = test::call_service(&mut app, req).await;
|
let resp = test::call_service(&mut app, req).await;
|
||||||
|
@ -580,7 +579,7 @@ mod tests {
|
||||||
test::init_service(App::new().route("/", web::get().to(graphql_handler))).await;
|
test::init_service(App::new().route("/", web::get().to(graphql_handler))).await;
|
||||||
let req = test::TestRequest::get()
|
let req = test::TestRequest::get()
|
||||||
.uri("/")
|
.uri("/")
|
||||||
.header("accept", "text/html")
|
.append_header((ACCEPT, "text/html"))
|
||||||
.to_request();
|
.to_request();
|
||||||
|
|
||||||
let mut resp = test::call_service(&mut app, req).await;
|
let mut resp = test::call_service(&mut app, req).await;
|
||||||
|
@ -602,7 +601,7 @@ mod tests {
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = test::TestRequest::post()
|
let req = test::TestRequest::post()
|
||||||
.header("content-type", "application/json; charset=utf-8")
|
.append_header(("content-type", "application/json; charset=utf-8"))
|
||||||
.set_payload(
|
.set_payload(
|
||||||
r##"{ "variables": null, "query": "{ hero(episode: NEW_HOPE) { name } }" }"##,
|
r##"{ "variables": null, "query": "{ hero(episode: NEW_HOPE) { name } }" }"##,
|
||||||
)
|
)
|
||||||
|
@ -634,7 +633,7 @@ mod tests {
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = test::TestRequest::get()
|
let req = test::TestRequest::get()
|
||||||
.header("content-type", "application/json")
|
.append_header(("content-type", "application/json"))
|
||||||
.uri("/?query=%7B%20hero%28episode%3A%20NEW_HOPE%29%20%7B%20name%20%7D%20%7D&variables=null")
|
.uri("/?query=%7B%20hero%28episode%3A%20NEW_HOPE%29%20%7B%20name%20%7D%20%7D&variables=null")
|
||||||
.to_request();
|
.to_request();
|
||||||
|
|
||||||
|
@ -668,7 +667,7 @@ mod tests {
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = test::TestRequest::post()
|
let req = test::TestRequest::post()
|
||||||
.header("content-type", "application/json")
|
.append_header(("content-type", "application/json"))
|
||||||
.set_payload(
|
.set_payload(
|
||||||
r##"[
|
r##"[
|
||||||
{ "variables": null, "query": "{ hero(episode: NEW_HOPE) { name } }" },
|
{ "variables": null, "query": "{ hero(episode: NEW_HOPE) { name } }" },
|
||||||
|
@ -706,7 +705,7 @@ mod tests {
|
||||||
|
|
||||||
impl TestActixWebIntegration {
|
impl TestActixWebIntegration {
|
||||||
fn make_request(&self, req: test::TestRequest) -> TestResponse {
|
fn make_request(&self, req: test::TestRequest) -> TestResponse {
|
||||||
actix_web::rt::System::new("request").block_on(async move {
|
actix_web::rt::System::new().block_on(async move {
|
||||||
let schema = Schema::new(
|
let schema = Schema::new(
|
||||||
Query,
|
Query,
|
||||||
EmptyMutation::<Database>::new(),
|
EmptyMutation::<Database>::new(),
|
||||||
|
@ -730,7 +729,7 @@ mod tests {
|
||||||
fn post_json(&self, url: &str, body: &str) -> TestResponse {
|
fn post_json(&self, url: &str, body: &str) -> TestResponse {
|
||||||
self.make_request(
|
self.make_request(
|
||||||
test::TestRequest::post()
|
test::TestRequest::post()
|
||||||
.header("content-type", "application/json")
|
.append_header(("content-type", "application/json"))
|
||||||
.set_payload(body.to_string())
|
.set_payload(body.to_string())
|
||||||
.uri(url),
|
.uri(url),
|
||||||
)
|
)
|
||||||
|
@ -739,7 +738,7 @@ mod tests {
|
||||||
fn post_graphql(&self, url: &str, body: &str) -> TestResponse {
|
fn post_graphql(&self, url: &str, body: &str) -> TestResponse {
|
||||||
self.make_request(
|
self.make_request(
|
||||||
test::TestRequest::post()
|
test::TestRequest::post()
|
||||||
.header("content-type", "application/graphql")
|
.append_header(("content-type", "application/graphql"))
|
||||||
.set_payload(body.to_string())
|
.set_payload(body.to_string())
|
||||||
.uri(url),
|
.uri(url),
|
||||||
)
|
)
|
||||||
|
@ -768,7 +767,8 @@ mod tests {
|
||||||
mod subscription_tests {
|
mod subscription_tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use actix_web::{test, web, App, Error, HttpRequest, HttpResponse};
|
use actix_test::start;
|
||||||
|
use actix_web::{web, App, Error, HttpRequest, HttpResponse};
|
||||||
use actix_web_actors::ws;
|
use actix_web_actors::ws;
|
||||||
use juniper::{
|
use juniper::{
|
||||||
futures::{SinkExt, StreamExt},
|
futures::{SinkExt, StreamExt},
|
||||||
|
@ -789,7 +789,7 @@ mod subscription_tests {
|
||||||
&self,
|
&self,
|
||||||
messages: Vec<WsIntegrationMessage>,
|
messages: Vec<WsIntegrationMessage>,
|
||||||
) -> Result<(), anyhow::Error> {
|
) -> Result<(), anyhow::Error> {
|
||||||
let mut server = test::start(|| {
|
let mut server = start(|| {
|
||||||
App::new()
|
App::new()
|
||||||
.data(Schema::new(
|
.data(Schema::new(
|
||||||
Query,
|
Query,
|
||||||
|
@ -804,7 +804,7 @@ mod subscription_tests {
|
||||||
match message {
|
match message {
|
||||||
WsIntegrationMessage::Send(body) => {
|
WsIntegrationMessage::Send(body) => {
|
||||||
framed
|
framed
|
||||||
.send(ws::Message::Text(body.to_owned()))
|
.send(ws::Message::Text(body.to_owned().into()))
|
||||||
.await
|
.await
|
||||||
.map_err(|e| anyhow::anyhow!("WS error: {:?}", e))?;
|
.map_err(|e| anyhow::anyhow!("WS error: {:?}", e))?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ juniper = { path = "../juniper" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
criterion = "0.3"
|
criterion = "0.3"
|
||||||
tokio = { version = "0.2", features = ["rt-core", "rt-threaded"] }
|
tokio = { version = "1", features = ["rt-multi-thread"] }
|
||||||
|
|
||||||
[[bench]]
|
[[bench]]
|
||||||
name = "benchmark"
|
name = "benchmark"
|
||||||
|
|
|
@ -13,7 +13,7 @@ keywords = ["apollo", "graphql", "graphql-ws", "juniper"]
|
||||||
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
|
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
|
||||||
juniper_subscriptions = { version = "0.15.5", path = "../juniper_subscriptions" }
|
juniper_subscriptions = { version = "0.15.5", path = "../juniper_subscriptions" }
|
||||||
serde = { version = "1.0.8", features = ["derive"], default-features = false }
|
serde = { version = "1.0.8", features = ["derive"], default-features = false }
|
||||||
tokio = { version = "0.2", features = ["macros", "rt-core", "time"], default-features = false }
|
tokio = { version = "1", features = ["macros", "rt", "time"], default-features = false }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
|
|
@ -176,7 +176,7 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
|
||||||
.boxed();
|
.boxed();
|
||||||
s = s
|
s = s
|
||||||
.chain(stream::unfold((), move |_| async move {
|
.chain(stream::unfold((), move |_| async move {
|
||||||
tokio::time::delay_for(keep_alive_interval).await;
|
tokio::time::sleep(keep_alive_interval).await;
|
||||||
Some((
|
Some((
|
||||||
Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive),
|
Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive),
|
||||||
(),
|
(),
|
||||||
|
@ -658,7 +658,7 @@ mod test {
|
||||||
impl Subscription {
|
impl Subscription {
|
||||||
/// never never emits anything.
|
/// never never emits anything.
|
||||||
async fn never(context: &Context) -> BoxStream<'static, FieldResult<i32>> {
|
async fn never(context: &Context) -> BoxStream<'static, FieldResult<i32>> {
|
||||||
tokio::time::delay_for(Duration::from_secs(10000))
|
tokio::time::sleep(Duration::from_secs(10000))
|
||||||
.map(|_| unreachable!())
|
.map(|_| unreachable!())
|
||||||
.into_stream()
|
.into_stream()
|
||||||
.boxed()
|
.boxed()
|
||||||
|
@ -668,7 +668,7 @@ mod test {
|
||||||
async fn context(context: &Context) -> BoxStream<'static, FieldResult<i32>> {
|
async fn context(context: &Context) -> BoxStream<'static, FieldResult<i32>> {
|
||||||
stream::once(future::ready(Ok(context.0)))
|
stream::once(future::ready(Ok(context.0)))
|
||||||
.chain(
|
.chain(
|
||||||
tokio::time::delay_for(Duration::from_secs(10000))
|
tokio::time::sleep(Duration::from_secs(10000))
|
||||||
.map(|_| unreachable!())
|
.map(|_| unreachable!())
|
||||||
.into_stream(),
|
.into_stream(),
|
||||||
)
|
)
|
||||||
|
@ -682,7 +682,7 @@ mod test {
|
||||||
Value::null(),
|
Value::null(),
|
||||||
))))
|
))))
|
||||||
.chain(
|
.chain(
|
||||||
tokio::time::delay_for(Duration::from_secs(10000))
|
tokio::time::sleep(Duration::from_secs(10000))
|
||||||
.map(|_| unreachable!())
|
.map(|_| unreachable!())
|
||||||
.into_stream(),
|
.into_stream(),
|
||||||
)
|
)
|
||||||
|
|
|
@ -11,13 +11,13 @@ repository = "https://github.com/graphql-rust/juniper"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
|
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
|
||||||
hyper = "0.13"
|
hyper = {version = "0.14", features = ["server", "runtime"]}
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
tokio = "0.2"
|
tokio = "1"
|
||||||
url = "2"
|
url = "2"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
juniper = { version = "0.15.6", path = "../juniper", features = ["expose-test-schema"] }
|
juniper = { version = "0.15.6", path = "../juniper", features = ["expose-test-schema"] }
|
||||||
pretty_env_logger = "0.4"
|
pretty_env_logger = "0.4"
|
||||||
reqwest = { version = "0.11", features = ["blocking", "rustls-tls"] }
|
reqwest = { version = "0.11", features = ["blocking", "rustls-tls"] }
|
||||||
tokio = { version = "0.2", features = ["macros"] }
|
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
use std::{convert::Infallible, sync::Arc};
|
use std::{convert::Infallible, sync::Arc};
|
||||||
|
|
||||||
use hyper::{
|
use hyper::{
|
||||||
|
server::Server,
|
||||||
service::{make_service_fn, service_fn},
|
service::{make_service_fn, service_fn},
|
||||||
Body, Method, Response, Server, StatusCode,
|
Body, Method, Response, StatusCode,
|
||||||
};
|
};
|
||||||
use juniper::{
|
use juniper::{
|
||||||
tests::fixtures::starwars::schema::{Database, Query},
|
tests::fixtures::starwars::schema::{Database, Query},
|
||||||
|
|
|
@ -312,8 +312,9 @@ impl Error for GraphQLRequestError {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use hyper::{
|
use hyper::{
|
||||||
|
server::Server,
|
||||||
service::{make_service_fn, service_fn},
|
service::{make_service_fn, service_fn},
|
||||||
Body, Method, Response, Server, StatusCode,
|
Body, Method, Response, StatusCode,
|
||||||
};
|
};
|
||||||
use juniper::{
|
use juniper::{
|
||||||
http::tests as http_tests,
|
http::tests as http_tests,
|
||||||
|
@ -421,7 +422,7 @@ mod tests {
|
||||||
});
|
});
|
||||||
|
|
||||||
let (shutdown_fut, shutdown) = futures::future::abortable(async {
|
let (shutdown_fut, shutdown) = futures::future::abortable(async {
|
||||||
tokio::time::delay_for(Duration::from_secs(60)).await;
|
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
let server = Server::bind(&addr)
|
let server = Server::bind(&addr)
|
||||||
|
|
|
@ -13,7 +13,7 @@ repository = "https://github.com/graphql-rust/juniper"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
juniper = { version = "0.15.6", path = "../juniper", default-features = false}
|
juniper = { version = "0.15.6", path = "../juniper", default-features = false}
|
||||||
rocket = { version = "0.4.9", default-features = false }
|
rocket = { version = "0.4.10", default-features = false }
|
||||||
serde_json = "1.0.2"
|
serde_json = "1.0.2"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
|
@ -14,4 +14,4 @@ juniper = { version = "0.15.6", path = "../juniper", default-features = false }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
tokio = { version = "0.2", features = ["macros", "rt-core"] }
|
tokio = { version = "1", features = ["macros", "rt"] }
|
||||||
|
|
|
@ -13,20 +13,19 @@ subscriptions = ["juniper_graphql_ws"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
bytes = "0.5"
|
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
|
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
|
||||||
juniper_graphql_ws = { version = "0.2.5", path = "../juniper_graphql_ws", optional = true }
|
juniper_graphql_ws = { version = "0.2.5", path = "../juniper_graphql_ws", optional = true }
|
||||||
serde = { version = "1.0.75", features = ["derive"] }
|
serde = { version = "1.0.75", features = ["derive"] }
|
||||||
serde_json = "1.0.24"
|
serde_json = "1.0.24"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
tokio = { version = "0.2", features = ["blocking", "rt-core"] }
|
tokio = { version = "1", features = ["rt-multi-thread"] }
|
||||||
warp = "0.2"
|
warp = "0.3"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
env_logger = "0.8"
|
env_logger = "0.8"
|
||||||
juniper = { version = "0.15.6", path = "../juniper", features = ["expose-test-schema"] }
|
juniper = { version = "0.15.6", path = "../juniper", features = ["expose-test-schema"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
percent-encoding = "2.1"
|
percent-encoding = "2.1"
|
||||||
tokio = { version = "0.2", features = ["blocking", "macros", "rt-core"] }
|
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||||
url = "2"
|
url = "2"
|
||||||
|
|
|
@ -41,7 +41,6 @@ Check the LICENSE file for details.
|
||||||
#![doc(html_root_url = "https://docs.rs/juniper_warp/0.2.0")]
|
#![doc(html_root_url = "https://docs.rs/juniper_warp/0.2.0")]
|
||||||
|
|
||||||
use anyhow::anyhow;
|
use anyhow::anyhow;
|
||||||
use bytes::Bytes;
|
|
||||||
use futures::{FutureExt as _, TryFutureExt};
|
use futures::{FutureExt as _, TryFutureExt};
|
||||||
use juniper::{
|
use juniper::{
|
||||||
http::{GraphQLBatchRequest, GraphQLRequest},
|
http::{GraphQLBatchRequest, GraphQLRequest},
|
||||||
|
@ -49,7 +48,7 @@ use juniper::{
|
||||||
};
|
};
|
||||||
use std::{collections::HashMap, str, sync::Arc};
|
use std::{collections::HashMap, str, sync::Arc};
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
use warp::{body, filters::BoxedFilter, http, query, Filter};
|
use warp::{body, filters::BoxedFilter, http, hyper::body::Bytes, query, Filter};
|
||||||
|
|
||||||
/// Make a filter for graphql queries/mutations.
|
/// Make a filter for graphql queries/mutations.
|
||||||
///
|
///
|
||||||
|
@ -717,7 +716,7 @@ mod tests_http_harness {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_request(&self, req: warp::test::RequestBuilder) -> TestResponse {
|
fn make_request(&self, req: warp::test::RequestBuilder) -> TestResponse {
|
||||||
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create tokio::Runtime");
|
let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio::Runtime");
|
||||||
make_test_response(rt.block_on(async move {
|
make_test_response(rt.block_on(async move {
|
||||||
req.filter(&self.filter).await.unwrap_or_else(|rejection| {
|
req.filter(&self.filter).await.unwrap_or_else(|rejection| {
|
||||||
let code = if rejection.is_not_found() {
|
let code = if rejection.is_not_found() {
|
||||||
|
|
Loading…
Reference in a new issue