diff --git a/examples/actix_subscriptions/Cargo.toml b/examples/actix_subscriptions/Cargo.toml index 6825e404..98700539 100644 --- a/examples/actix_subscriptions/Cargo.toml +++ b/examples/actix_subscriptions/Cargo.toml @@ -13,10 +13,10 @@ actix-cors = "0.2.0" futures = "0.3.5" tokio = { version = "0.2", features = ["rt-core", "macros"] } env_logger = "0.7.1" -serde = "1.0.114" +serde = "1.0.115" serde_json = "1.0.57" rand = "0.7.3" -juniper = { path = "../../juniper", features = ["expose-test-schema", "serde_json"] } +juniper = { path = "../../juniper", features = ["expose-test-schema"] } juniper_actix = { path = "../../juniper_actix", features = ["subscriptions"] } juniper_graphql_ws = { path = "../../juniper_graphql_ws" } diff --git a/examples/actix_subscriptions/src/main.rs b/examples/actix_subscriptions/src/main.rs index e9f884ea..dda291d0 100644 --- a/examples/actix_subscriptions/src/main.rs +++ b/examples/actix_subscriptions/src/main.rs @@ -2,7 +2,6 @@ use std::{env, pin::Pin, time::Duration}; use actix_cors::Cors; use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer}; -use futures::Stream; use juniper::{ tests::fixtures::starwars::{model::Database, schema::Query}, @@ -49,14 +48,15 @@ impl RandomHuman { } } +type RandomHumanStream = + Pin> + Send>>; + #[juniper::graphql_subscription(Context = Database)] impl Subscription { #[graphql( description = "A random humanoid creature in the Star Wars universe every 3 seconds. Second result will be an error." )] - async fn random_human( - context: &Database, - ) -> Pin> + Send>> { + async fn random_human(context: &Database) -> RandomHumanStream { let mut counter = 0; let context = (*context).clone(); diff --git a/juniper/Cargo.toml b/juniper/Cargo.toml index d02b6cf4..b3b9cbb6 100644 --- a/juniper/Cargo.toml +++ b/juniper/Cargo.toml @@ -24,7 +24,7 @@ harness = false path = "benches/bench.rs" [features] -expose-test-schema = ["serde_json"] +expose-test-schema = ["anyhow", "serde_json"] schema-language = ["graphql-parser-integration"] graphql-parser-integration = ["graphql-parser"] default = [ @@ -39,6 +39,7 @@ scalar-naivetime = [] [dependencies] juniper_codegen = { version = "0.14.2", path = "../juniper_codegen" } +anyhow = { version = "1.0.32", optional = true } bson = { version = "1.0.0", optional = true } chrono = { version = "0.4.0", optional = true } fnv = "1.0.3" diff --git a/juniper/src/http/mod.rs b/juniper/src/http/mod.rs index e298613d..17907150 100644 --- a/juniper/src/http/mod.rs +++ b/juniper/src/http/mod.rs @@ -353,9 +353,10 @@ where } } -#[cfg(any(test, feature = "expose-test-schema"))] +#[cfg(feature = "expose-test-schema")] #[allow(missing_docs)] pub mod tests { + use crate::LocalBoxFuture; use serde_json::{self, Value as Json}; /// Normalized response content we expect to get back from @@ -583,4 +584,172 @@ pub mod tests { assert_eq!(resp.status_code, 400); } + + /// Normalized way to make requests to the WebSocket framework integration we are testing. + pub trait WsIntegration { + /// Runs a test with the given messages + fn run( + &self, + messages: Vec, + ) -> LocalBoxFuture>; + } + + /// WebSocket framework integration message + pub enum WsIntegrationMessage { + /// Send message through the WebSocket + /// Takes a message as a String + Send(String), + /// Expect message to come through the WebSocket + /// Takes expected message as a String and a timeout in milliseconds + Expect(String, u64), + } + + /// Default value in milliseconds for how long to wait for an incoming message + pub const WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT: u64 = 100; + + #[allow(missing_docs)] + pub async fn run_ws_test_suite(integration: &T) { + println!("Running WebSocket Test suite for integration"); + + println!(" - test_ws_simple_subscription"); + test_ws_simple_subscription(integration).await; + + println!(" - test_ws_invalid_json"); + test_ws_invalid_json(integration).await; + + println!(" - test_ws_invalid_query"); + test_ws_invalid_query(integration).await; + } + + async fn test_ws_simple_subscription(integration: &T) { + let messages = vec![ + WsIntegrationMessage::Send( + r#"{ + "type":"connection_init", + "payload":{} + }"# + .to_owned(), + ), + WsIntegrationMessage::Expect( + r#"{ + "type":"connection_ack" + }"# + .to_owned(), + WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT, + ), + WsIntegrationMessage::Expect( + r#"{ + "type":"ka" + }"# + .to_owned(), + WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT, + ), + WsIntegrationMessage::Send( + r#"{ + "id":"1", + "type":"start", + "payload":{ + "variables":{}, + "extensions":{}, + "operationName":null, + "query":"subscription { asyncHuman { id, name, homePlanet } }" + } + }"# + .to_owned(), + ), + WsIntegrationMessage::Expect( + r#"{ + "type":"data", + "id":"1", + "payload":{ + "data":{ + "asyncHuman":{ + "id":"stream id", + "name":"stream name", + "homePlanet":"stream home planet" + } + } + } + }"# + .to_owned(), + WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT, + ), + ]; + + integration.run(messages).await.unwrap(); + } + + async fn test_ws_invalid_json(integration: &T) { + let messages = vec![ + WsIntegrationMessage::Send("invalid json".to_owned()), + WsIntegrationMessage::Expect( + r#"{ + "type":"connection_error", + "payload":{ + "message":"serde error: expected value at line 1 column 1" + } + }"# + .to_owned(), + WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT, + ), + ]; + + integration.run(messages).await.unwrap(); + } + + async fn test_ws_invalid_query(integration: &T) { + let messages = vec![ + WsIntegrationMessage::Send( + r#"{ + "type":"connection_init", + "payload":{} + }"# + .to_owned(), + ), + WsIntegrationMessage::Expect( + r#"{ + "type":"connection_ack" + }"# + .to_owned(), + WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT + ), + WsIntegrationMessage::Expect( + r#"{ + "type":"ka" + }"# + .to_owned(), + WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT + ), + WsIntegrationMessage::Send( + r#"{ + "id":"1", + "type":"start", + "payload":{ + "variables":{}, + "extensions":{}, + "operationName":null, + "query":"subscription { asyncHuman }" + } + }"# + .to_owned(), + ), + WsIntegrationMessage::Expect( + r#"{ + "type":"error", + "id":"1", + "payload":[{ + "message":"Field \"asyncHuman\" of type \"HumanSubscription!\" must have a selection of subfields. Did you mean \"asyncHuman { ... }\"?", + "locations":[{ + "line":1, + "column":16 + }] + }] + }"# + .to_owned(), + WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT + ) + ]; + + integration.run(messages).await.unwrap(); + } } diff --git a/juniper/src/lib.rs b/juniper/src/lib.rs index 65c43ccb..cf8bac69 100644 --- a/juniper/src/lib.rs +++ b/juniper/src/lib.rs @@ -122,7 +122,7 @@ extern crate bson; pub use {futures, static_assertions as sa}; #[doc(inline)] -pub use futures::future::BoxFuture; +pub use futures::future::{BoxFuture, LocalBoxFuture}; // Depend on juniper_codegen and re-export everything in it. // This allows users to just depend on juniper and get the derive diff --git a/juniper/src/tests/fixtures/starwars/schema.rs b/juniper/src/tests/fixtures/starwars/schema.rs index 8589a736..e0818e57 100644 --- a/juniper/src/tests/fixtures/starwars/schema.rs +++ b/juniper/src/tests/fixtures/starwars/schema.rs @@ -2,8 +2,11 @@ use crate::{ executor::Context, + graphql_subscription, tests::fixtures::starwars::model::{Character, Database, Droid, Episode, Human}, + GraphQLObject, }; +use std::pin::Pin; impl Context for Database {} @@ -130,3 +133,32 @@ impl Query { Some(database.get_hero(episode).as_character()) } } + +#[derive(GraphQLObject)] +#[graphql(description = "A humanoid creature in the Star Wars universe")] +#[derive(Clone)] +/// A humanoid creature in the Star Wars universe. +/// TODO: remove this when async interfaces are merged +struct HumanSubscription { + id: String, + name: String, + home_planet: String, +} + +pub struct Subscription; + +type HumanStream = Pin + Send>>; + +#[graphql_subscription(context = Database)] +/// Super basic subscription fixture +impl Subscription { + async fn async_human() -> HumanStream { + Box::pin(futures::stream::once(async { + HumanSubscription { + id: "stream id".to_string(), + name: "stream name".to_string(), + home_planet: "stream home planet".to_string(), + } + })) + } +} diff --git a/juniper_actix/Cargo.toml b/juniper_actix/Cargo.toml index 7f9be579..9cfa8746 100644 --- a/juniper_actix/Cargo.toml +++ b/juniper_actix/Cargo.toml @@ -19,7 +19,7 @@ actix-web-actors = "2.0.0" futures = { version = "0.3.5", features = ["compat"] } tokio = { version = "0.2", features = ["time"] } -serde = { version = "1.0.114", features = ["derive"] } +serde = { version = "1.0.115", features = ["derive"] } serde_json = "1.0.57" anyhow = "1.0" thiserror = "1.0" @@ -34,6 +34,5 @@ actix-identity = "0.2.1" bytes = "0.5.6" env_logger = "0.7.1" log = "0.4.11" -tokio = { version = "0.2", features = ["rt-core", "macros", "blocking"] } -juniper = { version = "0.14.2", path = "../juniper", features = ["expose-test-schema", "serde_json"] } +juniper = { version = "0.14.2", path = "../juniper", features = ["expose-test-schema"] } diff --git a/juniper_actix/src/lib.rs b/juniper_actix/src/lib.rs index e6a2c28d..4cda347c 100644 --- a/juniper_actix/src/lib.rs +++ b/juniper_actix/src/lib.rs @@ -40,7 +40,6 @@ Check the LICENSE file for details. #![deny(warnings)] #![doc(html_root_url = "https://docs.rs/juniper_actix/0.1.0")] -// use futures::{FutureExt as _}; use actix_web::{ error::{ErrorBadRequest, ErrorMethodNotAllowed, ErrorUnsupportedMediaType}, http::{header::CONTENT_TYPE, Method}, @@ -234,10 +233,12 @@ pub mod subscriptions { use actix_web::{web, HttpRequest, HttpResponse}; use actix_web_actors::ws; - use futures::SinkExt; use tokio::sync::Mutex; - use juniper::futures::stream::{SplitSink, SplitStream, StreamExt}; + use juniper::futures::{ + stream::{SplitSink, SplitStream, StreamExt}, + SinkExt, + }; use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue}; use juniper_graphql_ws::{ArcSchema, ClientMessage, Connection, Init, ServerMessage}; @@ -482,8 +483,8 @@ pub mod subscriptions { mod tests { use super::*; use actix_web::{dev::ServiceResponse, http, http::header::CONTENT_TYPE, test, App}; - use futures::StreamExt; use juniper::{ + futures::stream::StreamExt, http::tests::{run_http_test_suite, HttpIntegration, TestResponse}, tests::fixtures::starwars::{model::Database, schema::Query}, EmptyMutation, EmptySubscription, RootNode, @@ -767,3 +768,111 @@ mod tests { run_http_test_suite(&TestActixWebIntegration); } } + +#[cfg(feature = "subscriptions")] +#[cfg(test)] +mod subscription_tests { + use std::time::Duration; + + use actix_web::{test, web, App, Error, HttpRequest, HttpResponse}; + use actix_web_actors::ws; + use tokio::time::timeout; + + use super::subscriptions::subscriptions_handler; + use juniper::{ + futures::{SinkExt, StreamExt}, + http::tests::{run_ws_test_suite, WsIntegration, WsIntegrationMessage}, + tests::fixtures::starwars::{model::Database, schema::Query, schema::Subscription}, + EmptyMutation, LocalBoxFuture, + }; + use juniper_graphql_ws::ConnectionConfig; + + #[derive(Default)] + struct TestActixWsIntegration; + + impl TestActixWsIntegration { + async fn run_async( + &self, + messages: Vec, + ) -> Result<(), anyhow::Error> { + let mut server = test::start(|| { + App::new() + .data(Schema::new( + Query, + EmptyMutation::::new(), + Subscription, + )) + .service(web::resource("/subscriptions").to(subscriptions)) + }); + let mut framed = server.ws_at("/subscriptions").await.unwrap(); + + for message in &messages { + match message { + WsIntegrationMessage::Send(body) => { + framed + .send(ws::Message::Text(body.to_owned())) + .await + .map_err(|e| anyhow::anyhow!("WS error: {:?}", e))?; + } + WsIntegrationMessage::Expect(body, message_timeout) => { + let frame = timeout(Duration::from_millis(*message_timeout), framed.next()) + .await + .map_err(|_| anyhow::anyhow!("Timed-out waiting for message"))? + .ok_or_else(|| anyhow::anyhow!("Empty message received"))? + .map_err(|e| anyhow::anyhow!("WS error: {:?}", e))?; + + match frame { + ws::Frame::Text(ref bytes) => { + let expected_value = + serde_json::from_str::(body) + .map_err(|e| anyhow::anyhow!("Serde error: {:?}", e))?; + + let value: serde_json::Value = serde_json::from_slice(bytes) + .map_err(|e| anyhow::anyhow!("Serde error: {:?}", e))?; + + if value != expected_value { + return Err(anyhow::anyhow!( + "Expected message: {}. Received message: {}", + expected_value, + value, + )); + } + } + _ => return Err(anyhow::anyhow!("Received non-text frame")), + } + } + } + } + + Ok(()) + } + } + + impl WsIntegration for TestActixWsIntegration { + fn run( + &self, + messages: Vec, + ) -> LocalBoxFuture> { + Box::pin(self.run_async(messages)) + } + } + + type Schema = juniper::RootNode<'static, Query, EmptyMutation, Subscription>; + + async fn subscriptions( + req: HttpRequest, + stream: web::Payload, + schema: web::Data, + ) -> Result { + let context = Database::new(); + let schema = schema.into_inner(); + let config = ConnectionConfig::new(context); + + subscriptions_handler(req, stream, schema, config).await + } + + #[actix_rt::test] + async fn test_actix_ws_integration() { + run_ws_test_suite(&mut TestActixWsIntegration::default()).await; + } +} diff --git a/juniper_hyper/Cargo.toml b/juniper_hyper/Cargo.toml index bc183dcb..73d02ce0 100644 --- a/juniper_hyper/Cargo.toml +++ b/juniper_hyper/Cargo.toml @@ -22,9 +22,9 @@ reqwest = { version = "0.10", features = ["blocking", "rustls-tls"] } [dev-dependencies.juniper] version = "0.14.2" -features = ["expose-test-schema", "serde_json"] +features = ["expose-test-schema"] path = "../juniper" [dev-dependencies.tokio] version = "0.2" -features = ["macros"] \ No newline at end of file +features = ["macros"] diff --git a/juniper_iron/Cargo.toml b/juniper_iron/Cargo.toml index 3a148868..baa15158 100644 --- a/juniper_iron/Cargo.toml +++ b/juniper_iron/Cargo.toml @@ -29,5 +29,5 @@ percent-encoding = "2" [dev-dependencies.juniper] version = "0.14.2" -features = ["expose-test-schema", "serde_json"] +features = ["expose-test-schema"] path = "../juniper" diff --git a/juniper_rocket/Cargo.toml b/juniper_rocket/Cargo.toml index 18ce8c76..f71bad39 100644 --- a/juniper_rocket/Cargo.toml +++ b/juniper_rocket/Cargo.toml @@ -18,5 +18,5 @@ rocket = { version = "0.4.2", default-features = false } [dev-dependencies.juniper] version = "0.14.2" -features = ["expose-test-schema", "serde_json"] +features = ["expose-test-schema"] path = "../juniper" diff --git a/juniper_rocket_async/Cargo.toml b/juniper_rocket_async/Cargo.toml index e3d31582..f4fb946a 100644 --- a/juniper_rocket_async/Cargo.toml +++ b/juniper_rocket_async/Cargo.toml @@ -20,5 +20,5 @@ tokio = { version = "0.2", features = ["rt-core", "macros"] } [dev-dependencies.juniper] version = "0.14.2" -features = ["expose-test-schema", "serde_json"] +features = ["expose-test-schema"] path = "../juniper" diff --git a/juniper_warp/Cargo.toml b/juniper_warp/Cargo.toml index f2fcb5b5..09207ee1 100644 --- a/juniper_warp/Cargo.toml +++ b/juniper_warp/Cargo.toml @@ -25,8 +25,8 @@ warp = "0.2" [dev-dependencies] env_logger = "0.7.1" -juniper = { version = "0.14.2", path = "../juniper", features = ["expose-test-schema", "serde_json"] } +juniper = { version = "0.14.2", path = "../juniper", features = ["expose-test-schema"] } log = "0.4.3" percent-encoding = "2" tokio = { version = "0.2", features = ["blocking", "macros", "rt-core"] } -url = "2" \ No newline at end of file +url = "2"