From dc309b83b71a3fccf5ecb529e5e6803df9d178cd Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 29 Jul 2020 01:14:53 -0400 Subject: [PATCH] Simplify SubscriptionConnection (#719) * simplify SubscriptionConnection * fmt * update pre-existing juniper_warp::subscriptions * use struct instead of tuple * fmt * update juniper_warp --- juniper/src/lib.rs | 4 +- juniper/src/types/subscriptions.rs | 34 ++++++- juniper_subscriptions/src/lib.rs | 143 ++++++++++++----------------- 3 files changed, 88 insertions(+), 93 deletions(-) diff --git a/juniper/src/lib.rs b/juniper/src/lib.rs index baba1af2..65c43ccb 100644 --- a/juniper/src/lib.rs +++ b/juniper/src/lib.rs @@ -186,8 +186,8 @@ pub use crate::{ marker::{self, GraphQLUnion}, scalars::{EmptyMutation, EmptySubscription, ID}, subscriptions::{ - GraphQLSubscriptionType, GraphQLSubscriptionValue, SubscriptionConnection, - SubscriptionCoordinator, + ExecutionOutput, GraphQLSubscriptionType, GraphQLSubscriptionValue, + SubscriptionConnection, SubscriptionCoordinator, }, }, validation::RuleError, diff --git a/juniper/src/types/subscriptions.rs b/juniper/src/types/subscriptions.rs index 9ff1cdf7..70662097 100644 --- a/juniper/src/types/subscriptions.rs +++ b/juniper/src/types/subscriptions.rs @@ -1,13 +1,37 @@ use futures::{future, stream}; +use serde::Serialize; use crate::{ - http::{GraphQLRequest, GraphQLResponse}, + http::GraphQLRequest, parser::Spanning, types::base::{is_excluded, merge_key_into, GraphQLType, GraphQLValue}, - Arguments, BoxFuture, DefaultScalarValue, Executor, FieldError, Object, ScalarValue, Selection, - Value, ValuesStream, + Arguments, BoxFuture, DefaultScalarValue, ExecutionError, Executor, FieldError, Object, + ScalarValue, Selection, Value, ValuesStream, }; +/// Represents the result of executing a GraphQL operation (after parsing and validating has been +/// done). +#[derive(Debug, Serialize)] +pub struct ExecutionOutput { + /// The output data. + pub data: Value, + + /// The errors that occurred. Note that the presence of errors does not mean there is no data. + /// The output can have both data and errors. + #[serde(bound(serialize = "S: ScalarValue"))] + pub errors: Vec>, +} + +impl ExecutionOutput { + /// Creates execution output from data, with no errors. + pub fn from_data(data: Value) -> Self { + Self { + data, + errors: vec![], + } + } +} + /// Global subscription coordinator trait. /// /// With regular queries we could get away with not having some in-between @@ -33,7 +57,7 @@ where { /// Type of [`SubscriptionConnection`]s this [`SubscriptionCoordinator`] /// returns - type Connection: SubscriptionConnection<'a, S>; + type Connection: SubscriptionConnection; /// Type of error while trying to spawn [`SubscriptionConnection`] type Error; @@ -58,7 +82,7 @@ where /// /// It can be treated as [`futures::Stream`] yielding [`GraphQLResponse`]s in /// server integration crates. -pub trait SubscriptionConnection<'a, S>: futures::Stream> {} +pub trait SubscriptionConnection: futures::Stream> {} /// Extension of [`GraphQLValue`] trait with asynchronous [subscription][1] execution logic. /// It should be used with [`GraphQLValue`] in order to implement [subscription][1] resolvers on diff --git a/juniper_subscriptions/src/lib.rs b/juniper_subscriptions/src/lib.rs index 756a4f87..0e78279c 100644 --- a/juniper_subscriptions/src/lib.rs +++ b/juniper_subscriptions/src/lib.rs @@ -19,9 +19,9 @@ use std::{ use futures::{future, stream, FutureExt as _, Stream, StreamExt as _, TryFutureExt as _}; use juniper::{ - http::{GraphQLRequest, GraphQLResponse}, - BoxFuture, ExecutionError, GraphQLError, GraphQLSubscriptionType, GraphQLTypeAsync, Object, - ScalarValue, SubscriptionConnection, SubscriptionCoordinator, Value, ValuesStream, + http::GraphQLRequest, BoxFuture, ExecutionError, ExecutionOutput, GraphQLError, + GraphQLSubscriptionType, GraphQLTypeAsync, Object, ScalarValue, SubscriptionConnection, + SubscriptionCoordinator, Value, ValuesStream, }; /// Simple [`SubscriptionCoordinator`] implementation: @@ -88,8 +88,8 @@ where /// Simple [`SubscriptionConnection`] implementation. /// -/// Resolves `Value` into `Stream` using the following -/// logic: +/// Resolves `Value` into `Stream>` using +/// the following logic: /// /// [`Value::Null`] - returns [`Value::Null`] once /// [`Value::Scalar`] - returns `Ok` value or [`Value::Null`] and errors vector @@ -98,7 +98,7 @@ where /// [`Value::Object`] - waits while each field of the [`Object`] is returned, then yields the whole object /// `Value::Object>` - returns [`Value::Null`] if [`Value::Object`] consists of sub-objects pub struct Connection<'a, S> { - stream: Pin> + Send + 'a>>, + stream: Pin> + Send + 'a>>, } impl<'a, S> Connection<'a, S> @@ -113,16 +113,13 @@ where } } -impl<'a, S> SubscriptionConnection<'a, S> for Connection<'a, S> where - S: ScalarValue + Send + Sync + 'a -{ -} +impl<'a, S> SubscriptionConnection for Connection<'a, S> where S: ScalarValue + Send + Sync + 'a {} impl<'a, S> Stream for Connection<'a, S> where S: ScalarValue + Send + Sync + 'a, { - type Item = GraphQLResponse<'a, S>; + type Item = ExecutionOutput; fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { // this is safe as stream is only mutated here and is not moved anywhere @@ -132,7 +129,7 @@ where } } -/// Creates [`futures::Stream`] that yields [`GraphQLResponse`]s depending on the given [`Value`]: +/// Creates [`futures::Stream`] that yields `ExecutionOutput`s depending on the given [`Value`]: /// /// [`Value::Null`] - returns [`Value::Null`] once /// [`Value::Scalar`] - returns `Ok` value or [`Value::Null`] and errors vector @@ -143,23 +140,28 @@ where fn whole_responses_stream<'a, S>( stream: Value>, errors: Vec>, -) -> Pin> + Send + 'a>> +) -> Pin> + Send + 'a>> where S: ScalarValue + Send + Sync + 'a, { if !errors.is_empty() { - return Box::pin(stream::once(future::ready(GraphQLResponse::from_result( - Ok((Value::Null, errors)), - )))); + return stream::once(future::ready(ExecutionOutput { + data: Value::null(), + errors, + })) + .boxed(); } match stream { - Value::Null => Box::pin(stream::once(future::ready(GraphQLResponse::from_result( - Ok((Value::Null, vec![])), + Value::Null => Box::pin(stream::once(future::ready(ExecutionOutput::from_data( + Value::null(), )))), Value::Scalar(s) => Box::pin(s.map(|res| match res { - Ok(val) => GraphQLResponse::from_result(Ok((val, vec![]))), - Err(err) => GraphQLResponse::from_result(Ok((Value::Null, vec![err]))), + Ok(val) => ExecutionOutput::from_data(val), + Err(err) => ExecutionOutput { + data: Value::null(), + errors: vec![err], + }, })), Value::List(list) => { let mut streams = vec![]; @@ -171,9 +173,8 @@ where Value::Object(mut object) => { let obj_len = object.field_count(); if obj_len == 0 { - return Box::pin(stream::once(future::ready(GraphQLResponse::from_result( - Ok((Value::Null, vec![])), - )))); + return stream::once(future::ready(ExecutionOutput::from_data(Value::null()))) + .boxed(); } let mut filled_count = 0; @@ -182,7 +183,7 @@ where ready_vec.push(None); } - let stream = stream::poll_fn(move |mut ctx| -> Poll>> { + let stream = stream::poll_fn(move |mut ctx| -> Poll>> { let mut obj_iterator = object.iter_mut(); // Due to having to modify `ready_vec` contents (by-move pattern) @@ -233,10 +234,7 @@ where } }); let obj = Object::from_iter(ready_vec_iterator); - Poll::Ready(Some(GraphQLResponse::from_result(Ok(( - Value::Object(obj), - vec![], - ))))) + Poll::Ready(Some(ExecutionOutput::from_data(Value::Object(obj)))) } else { Poll::Pending } @@ -256,9 +254,13 @@ mod whole_responses_stream { #[tokio::test] async fn with_error() { - let expected = vec![GraphQLResponse::::error( - FieldError::new("field error", Value::Null), - )]; + let expected = vec![ExecutionOutput { + data: Value::::Null, + errors: vec![ExecutionError::at_origin(FieldError::new( + "field error", + Value::Null, + ))], + }]; let expected = serde_json::to_string(&expected).unwrap(); let result = whole_responses_stream::( @@ -277,10 +279,9 @@ mod whole_responses_stream { #[tokio::test] async fn value_null() { - let expected = vec![GraphQLResponse::::from_result(Ok(( - Value::Null, - vec![], - )))]; + let expected = vec![ExecutionOutput::from_data( + Value::::Null, + )]; let expected = serde_json::to_string(&expected).unwrap(); let result = whole_responses_stream::(Value::Null, vec![]) @@ -296,26 +297,11 @@ mod whole_responses_stream { #[tokio::test] async fn value_scalar() { let expected = vec![ - GraphQLResponse::from_result(Ok(( - Value::Scalar(DefaultScalarValue::Int(1i32)), - vec![], - ))), - GraphQLResponse::from_result(Ok(( - Value::Scalar(DefaultScalarValue::Int(2i32)), - vec![], - ))), - GraphQLResponse::from_result(Ok(( - Value::Scalar(DefaultScalarValue::Int(3i32)), - vec![], - ))), - GraphQLResponse::from_result(Ok(( - Value::Scalar(DefaultScalarValue::Int(4i32)), - vec![], - ))), - GraphQLResponse::from_result(Ok(( - Value::Scalar(DefaultScalarValue::Int(5i32)), - vec![], - ))), + ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(1i32))), + ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(2i32))), + ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(3i32))), + ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(4i32))), + ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(5i32))), ]; let expected = serde_json::to_string(&expected).unwrap(); @@ -340,19 +326,10 @@ mod whole_responses_stream { #[tokio::test] async fn value_list() { let expected = vec![ - GraphQLResponse::from_result(Ok(( - Value::Scalar(DefaultScalarValue::Int(1i32)), - vec![], - ))), - GraphQLResponse::from_result(Ok(( - Value::Scalar(DefaultScalarValue::Int(2i32)), - vec![], - ))), - GraphQLResponse::from_result(Ok((Value::Null, vec![]))), - GraphQLResponse::from_result(Ok(( - Value::Scalar(DefaultScalarValue::Int(4i32)), - vec![], - ))), + ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(1i32))), + ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(2i32))), + ExecutionOutput::from_data(Value::null()), + ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(4i32))), ]; let expected = serde_json::to_string(&expected).unwrap(); @@ -380,25 +357,19 @@ mod whole_responses_stream { #[tokio::test] async fn value_object() { let expected = vec![ - GraphQLResponse::from_result(Ok(( - Value::Object(Object::from_iter( - vec![ - ("one", Value::Scalar(DefaultScalarValue::Int(1i32))), - ("two", Value::Scalar(DefaultScalarValue::Int(1i32))), - ] - .into_iter(), - )), - vec![], + ExecutionOutput::from_data(Value::Object(Object::from_iter( + vec![ + ("one", Value::Scalar(DefaultScalarValue::Int(1i32))), + ("two", Value::Scalar(DefaultScalarValue::Int(1i32))), + ] + .into_iter(), ))), - GraphQLResponse::from_result(Ok(( - Value::Object(Object::from_iter( - vec![ - ("one", Value::Scalar(DefaultScalarValue::Int(2i32))), - ("two", Value::Scalar(DefaultScalarValue::Int(2i32))), - ] - .into_iter(), - )), - vec![], + ExecutionOutput::from_data(Value::Object(Object::from_iter( + vec![ + ("one", Value::Scalar(DefaultScalarValue::Int(2i32))), + ("two", Value::Scalar(DefaultScalarValue::Int(2i32))), + ] + .into_iter(), ))), ]; let expected = serde_json::to_string(&expected).unwrap();