Actix subscriptions tests (#736)

* Actix subscriptions tests

* Use LocalBoxFuture instead of async-trait

* expose-test-schema already includes serde_json

* Add anyhow to juniper dev-dependencies

* The HTTP test helpers are not needed for juniper tests

* juniper_actix does not need tokio in dev-dependencies

Co-authored-by: Christian Legnitto <LegNeato@users.noreply.github.com>
This commit is contained in:
Mihai Dinculescu 2020-08-19 00:08:53 -07:00 committed by GitHub
parent 633602f000
commit 45c16acc6e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 333 additions and 23 deletions

View file

@ -13,10 +13,10 @@ actix-cors = "0.2.0"
futures = "0.3.5"
tokio = { version = "0.2", features = ["rt-core", "macros"] }
env_logger = "0.7.1"
serde = "1.0.114"
serde = "1.0.115"
serde_json = "1.0.57"
rand = "0.7.3"
juniper = { path = "../../juniper", features = ["expose-test-schema", "serde_json"] }
juniper = { path = "../../juniper", features = ["expose-test-schema"] }
juniper_actix = { path = "../../juniper_actix", features = ["subscriptions"] }
juniper_graphql_ws = { path = "../../juniper_graphql_ws" }

View file

@ -2,7 +2,6 @@ use std::{env, pin::Pin, time::Duration};
use actix_cors::Cors;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use futures::Stream;
use juniper::{
tests::fixtures::starwars::{model::Database, schema::Query},
@ -49,14 +48,15 @@ impl RandomHuman {
}
}
type RandomHumanStream =
Pin<Box<dyn futures::Stream<Item = Result<RandomHuman, FieldError>> + Send>>;
#[juniper::graphql_subscription(Context = Database)]
impl Subscription {
#[graphql(
description = "A random humanoid creature in the Star Wars universe every 3 seconds. Second result will be an error."
)]
async fn random_human(
context: &Database,
) -> Pin<Box<dyn Stream<Item = Result<RandomHuman, FieldError>> + Send>> {
async fn random_human(context: &Database) -> RandomHumanStream {
let mut counter = 0;
let context = (*context).clone();

View file

@ -24,7 +24,7 @@ harness = false
path = "benches/bench.rs"
[features]
expose-test-schema = ["serde_json"]
expose-test-schema = ["anyhow", "serde_json"]
schema-language = ["graphql-parser-integration"]
graphql-parser-integration = ["graphql-parser"]
default = [
@ -39,6 +39,7 @@ scalar-naivetime = []
[dependencies]
juniper_codegen = { version = "0.14.2", path = "../juniper_codegen" }
anyhow = { version = "1.0.32", optional = true }
bson = { version = "1.0.0", optional = true }
chrono = { version = "0.4.0", optional = true }
fnv = "1.0.3"

View file

@ -353,9 +353,10 @@ where
}
}
#[cfg(any(test, feature = "expose-test-schema"))]
#[cfg(feature = "expose-test-schema")]
#[allow(missing_docs)]
pub mod tests {
use crate::LocalBoxFuture;
use serde_json::{self, Value as Json};
/// Normalized response content we expect to get back from
@ -583,4 +584,172 @@ pub mod tests {
assert_eq!(resp.status_code, 400);
}
/// Normalized way to make requests to the WebSocket framework integration we are testing.
pub trait WsIntegration {
/// Runs a test with the given messages
fn run(
&self,
messages: Vec<WsIntegrationMessage>,
) -> LocalBoxFuture<Result<(), anyhow::Error>>;
}
/// WebSocket framework integration message
pub enum WsIntegrationMessage {
/// Send message through the WebSocket
/// Takes a message as a String
Send(String),
/// Expect message to come through the WebSocket
/// Takes expected message as a String and a timeout in milliseconds
Expect(String, u64),
}
/// Default value in milliseconds for how long to wait for an incoming message
pub const WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT: u64 = 100;
#[allow(missing_docs)]
pub async fn run_ws_test_suite<T: WsIntegration>(integration: &T) {
println!("Running WebSocket Test suite for integration");
println!(" - test_ws_simple_subscription");
test_ws_simple_subscription(integration).await;
println!(" - test_ws_invalid_json");
test_ws_invalid_json(integration).await;
println!(" - test_ws_invalid_query");
test_ws_invalid_query(integration).await;
}
async fn test_ws_simple_subscription<T: WsIntegration>(integration: &T) {
let messages = vec![
WsIntegrationMessage::Send(
r#"{
"type":"connection_init",
"payload":{}
}"#
.to_owned(),
),
WsIntegrationMessage::Expect(
r#"{
"type":"connection_ack"
}"#
.to_owned(),
WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT,
),
WsIntegrationMessage::Expect(
r#"{
"type":"ka"
}"#
.to_owned(),
WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT,
),
WsIntegrationMessage::Send(
r#"{
"id":"1",
"type":"start",
"payload":{
"variables":{},
"extensions":{},
"operationName":null,
"query":"subscription { asyncHuman { id, name, homePlanet } }"
}
}"#
.to_owned(),
),
WsIntegrationMessage::Expect(
r#"{
"type":"data",
"id":"1",
"payload":{
"data":{
"asyncHuman":{
"id":"stream id",
"name":"stream name",
"homePlanet":"stream home planet"
}
}
}
}"#
.to_owned(),
WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT,
),
];
integration.run(messages).await.unwrap();
}
async fn test_ws_invalid_json<T: WsIntegration>(integration: &T) {
let messages = vec![
WsIntegrationMessage::Send("invalid json".to_owned()),
WsIntegrationMessage::Expect(
r#"{
"type":"connection_error",
"payload":{
"message":"serde error: expected value at line 1 column 1"
}
}"#
.to_owned(),
WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT,
),
];
integration.run(messages).await.unwrap();
}
async fn test_ws_invalid_query<T: WsIntegration>(integration: &T) {
let messages = vec![
WsIntegrationMessage::Send(
r#"{
"type":"connection_init",
"payload":{}
}"#
.to_owned(),
),
WsIntegrationMessage::Expect(
r#"{
"type":"connection_ack"
}"#
.to_owned(),
WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT
),
WsIntegrationMessage::Expect(
r#"{
"type":"ka"
}"#
.to_owned(),
WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT
),
WsIntegrationMessage::Send(
r#"{
"id":"1",
"type":"start",
"payload":{
"variables":{},
"extensions":{},
"operationName":null,
"query":"subscription { asyncHuman }"
}
}"#
.to_owned(),
),
WsIntegrationMessage::Expect(
r#"{
"type":"error",
"id":"1",
"payload":[{
"message":"Field \"asyncHuman\" of type \"HumanSubscription!\" must have a selection of subfields. Did you mean \"asyncHuman { ... }\"?",
"locations":[{
"line":1,
"column":16
}]
}]
}"#
.to_owned(),
WS_INTEGRATION_EXPECT_DEFAULT_TIMEOUT
)
];
integration.run(messages).await.unwrap();
}
}

View file

@ -122,7 +122,7 @@ extern crate bson;
pub use {futures, static_assertions as sa};
#[doc(inline)]
pub use futures::future::BoxFuture;
pub use futures::future::{BoxFuture, LocalBoxFuture};
// Depend on juniper_codegen and re-export everything in it.
// This allows users to just depend on juniper and get the derive

View file

@ -2,8 +2,11 @@
use crate::{
executor::Context,
graphql_subscription,
tests::fixtures::starwars::model::{Character, Database, Droid, Episode, Human},
GraphQLObject,
};
use std::pin::Pin;
impl Context for Database {}
@ -130,3 +133,32 @@ impl Query {
Some(database.get_hero(episode).as_character())
}
}
#[derive(GraphQLObject)]
#[graphql(description = "A humanoid creature in the Star Wars universe")]
#[derive(Clone)]
/// A humanoid creature in the Star Wars universe.
/// TODO: remove this when async interfaces are merged
struct HumanSubscription {
id: String,
name: String,
home_planet: String,
}
pub struct Subscription;
type HumanStream = Pin<Box<dyn futures::Stream<Item = HumanSubscription> + Send>>;
#[graphql_subscription(context = Database)]
/// Super basic subscription fixture
impl Subscription {
async fn async_human() -> HumanStream {
Box::pin(futures::stream::once(async {
HumanSubscription {
id: "stream id".to_string(),
name: "stream name".to_string(),
home_planet: "stream home planet".to_string(),
}
}))
}
}

View file

@ -19,7 +19,7 @@ actix-web-actors = "2.0.0"
futures = { version = "0.3.5", features = ["compat"] }
tokio = { version = "0.2", features = ["time"] }
serde = { version = "1.0.114", features = ["derive"] }
serde = { version = "1.0.115", features = ["derive"] }
serde_json = "1.0.57"
anyhow = "1.0"
thiserror = "1.0"
@ -34,6 +34,5 @@ actix-identity = "0.2.1"
bytes = "0.5.6"
env_logger = "0.7.1"
log = "0.4.11"
tokio = { version = "0.2", features = ["rt-core", "macros", "blocking"] }
juniper = { version = "0.14.2", path = "../juniper", features = ["expose-test-schema", "serde_json"] }
juniper = { version = "0.14.2", path = "../juniper", features = ["expose-test-schema"] }

View file

@ -40,7 +40,6 @@ Check the LICENSE file for details.
#![deny(warnings)]
#![doc(html_root_url = "https://docs.rs/juniper_actix/0.1.0")]
// use futures::{FutureExt as _};
use actix_web::{
error::{ErrorBadRequest, ErrorMethodNotAllowed, ErrorUnsupportedMediaType},
http::{header::CONTENT_TYPE, Method},
@ -234,10 +233,12 @@ pub mod subscriptions {
use actix_web::{web, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use futures::SinkExt;
use tokio::sync::Mutex;
use juniper::futures::stream::{SplitSink, SplitStream, StreamExt};
use juniper::futures::{
stream::{SplitSink, SplitStream, StreamExt},
SinkExt,
};
use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue};
use juniper_graphql_ws::{ArcSchema, ClientMessage, Connection, Init, ServerMessage};
@ -482,8 +483,8 @@ pub mod subscriptions {
mod tests {
use super::*;
use actix_web::{dev::ServiceResponse, http, http::header::CONTENT_TYPE, test, App};
use futures::StreamExt;
use juniper::{
futures::stream::StreamExt,
http::tests::{run_http_test_suite, HttpIntegration, TestResponse},
tests::fixtures::starwars::{model::Database, schema::Query},
EmptyMutation, EmptySubscription, RootNode,
@ -767,3 +768,111 @@ mod tests {
run_http_test_suite(&TestActixWebIntegration);
}
}
#[cfg(feature = "subscriptions")]
#[cfg(test)]
mod subscription_tests {
use std::time::Duration;
use actix_web::{test, web, App, Error, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use tokio::time::timeout;
use super::subscriptions::subscriptions_handler;
use juniper::{
futures::{SinkExt, StreamExt},
http::tests::{run_ws_test_suite, WsIntegration, WsIntegrationMessage},
tests::fixtures::starwars::{model::Database, schema::Query, schema::Subscription},
EmptyMutation, LocalBoxFuture,
};
use juniper_graphql_ws::ConnectionConfig;
#[derive(Default)]
struct TestActixWsIntegration;
impl TestActixWsIntegration {
async fn run_async(
&self,
messages: Vec<WsIntegrationMessage>,
) -> Result<(), anyhow::Error> {
let mut server = test::start(|| {
App::new()
.data(Schema::new(
Query,
EmptyMutation::<Database>::new(),
Subscription,
))
.service(web::resource("/subscriptions").to(subscriptions))
});
let mut framed = server.ws_at("/subscriptions").await.unwrap();
for message in &messages {
match message {
WsIntegrationMessage::Send(body) => {
framed
.send(ws::Message::Text(body.to_owned()))
.await
.map_err(|e| anyhow::anyhow!("WS error: {:?}", e))?;
}
WsIntegrationMessage::Expect(body, message_timeout) => {
let frame = timeout(Duration::from_millis(*message_timeout), framed.next())
.await
.map_err(|_| anyhow::anyhow!("Timed-out waiting for message"))?
.ok_or_else(|| anyhow::anyhow!("Empty message received"))?
.map_err(|e| anyhow::anyhow!("WS error: {:?}", e))?;
match frame {
ws::Frame::Text(ref bytes) => {
let expected_value =
serde_json::from_str::<serde_json::Value>(body)
.map_err(|e| anyhow::anyhow!("Serde error: {:?}", e))?;
let value: serde_json::Value = serde_json::from_slice(bytes)
.map_err(|e| anyhow::anyhow!("Serde error: {:?}", e))?;
if value != expected_value {
return Err(anyhow::anyhow!(
"Expected message: {}. Received message: {}",
expected_value,
value,
));
}
}
_ => return Err(anyhow::anyhow!("Received non-text frame")),
}
}
}
}
Ok(())
}
}
impl WsIntegration for TestActixWsIntegration {
fn run(
&self,
messages: Vec<WsIntegrationMessage>,
) -> LocalBoxFuture<Result<(), anyhow::Error>> {
Box::pin(self.run_async(messages))
}
}
type Schema = juniper::RootNode<'static, Query, EmptyMutation<Database>, Subscription>;
async fn subscriptions(
req: HttpRequest,
stream: web::Payload,
schema: web::Data<Schema>,
) -> Result<HttpResponse, Error> {
let context = Database::new();
let schema = schema.into_inner();
let config = ConnectionConfig::new(context);
subscriptions_handler(req, stream, schema, config).await
}
#[actix_rt::test]
async fn test_actix_ws_integration() {
run_ws_test_suite(&mut TestActixWsIntegration::default()).await;
}
}

View file

@ -22,9 +22,9 @@ reqwest = { version = "0.10", features = ["blocking", "rustls-tls"] }
[dev-dependencies.juniper]
version = "0.14.2"
features = ["expose-test-schema", "serde_json"]
features = ["expose-test-schema"]
path = "../juniper"
[dev-dependencies.tokio]
version = "0.2"
features = ["macros"]
features = ["macros"]

View file

@ -29,5 +29,5 @@ percent-encoding = "2"
[dev-dependencies.juniper]
version = "0.14.2"
features = ["expose-test-schema", "serde_json"]
features = ["expose-test-schema"]
path = "../juniper"

View file

@ -18,5 +18,5 @@ rocket = { version = "0.4.2", default-features = false }
[dev-dependencies.juniper]
version = "0.14.2"
features = ["expose-test-schema", "serde_json"]
features = ["expose-test-schema"]
path = "../juniper"

View file

@ -20,5 +20,5 @@ tokio = { version = "0.2", features = ["rt-core", "macros"] }
[dev-dependencies.juniper]
version = "0.14.2"
features = ["expose-test-schema", "serde_json"]
features = ["expose-test-schema"]
path = "../juniper"

View file

@ -25,8 +25,8 @@ warp = "0.2"
[dev-dependencies]
env_logger = "0.7.1"
juniper = { version = "0.14.2", path = "../juniper", features = ["expose-test-schema", "serde_json"] }
juniper = { version = "0.14.2", path = "../juniper", features = ["expose-test-schema"] }
log = "0.4.3"
percent-encoding = "2"
tokio = { version = "0.2", features = ["blocking", "macros", "rt-core"] }
url = "2"
url = "2"