Simplify SubscriptionConnection (#719)

* simplify SubscriptionConnection

* fmt

* update pre-existing juniper_warp::subscriptions

* use struct instead of tuple

* fmt

* update juniper_warp
This commit is contained in:
Chris 2020-07-29 01:14:53 -04:00 committed by GitHub
parent 59419f1ec4
commit dc309b83b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 93 deletions

View file

@ -186,8 +186,8 @@ pub use crate::{
marker::{self, GraphQLUnion}, marker::{self, GraphQLUnion},
scalars::{EmptyMutation, EmptySubscription, ID}, scalars::{EmptyMutation, EmptySubscription, ID},
subscriptions::{ subscriptions::{
GraphQLSubscriptionType, GraphQLSubscriptionValue, SubscriptionConnection, ExecutionOutput, GraphQLSubscriptionType, GraphQLSubscriptionValue,
SubscriptionCoordinator, SubscriptionConnection, SubscriptionCoordinator,
}, },
}, },
validation::RuleError, validation::RuleError,

View file

@ -1,13 +1,37 @@
use futures::{future, stream}; use futures::{future, stream};
use serde::Serialize;
use crate::{ use crate::{
http::{GraphQLRequest, GraphQLResponse}, http::GraphQLRequest,
parser::Spanning, parser::Spanning,
types::base::{is_excluded, merge_key_into, GraphQLType, GraphQLValue}, types::base::{is_excluded, merge_key_into, GraphQLType, GraphQLValue},
Arguments, BoxFuture, DefaultScalarValue, Executor, FieldError, Object, ScalarValue, Selection, Arguments, BoxFuture, DefaultScalarValue, ExecutionError, Executor, FieldError, Object,
Value, ValuesStream, ScalarValue, Selection, Value, ValuesStream,
}; };
/// Represents the result of executing a GraphQL operation (after parsing and validating has been
/// done).
#[derive(Debug, Serialize)]
pub struct ExecutionOutput<S> {
/// The output data.
pub data: Value<S>,
/// The errors that occurred. Note that the presence of errors does not mean there is no data.
/// The output can have both data and errors.
#[serde(bound(serialize = "S: ScalarValue"))]
pub errors: Vec<ExecutionError<S>>,
}
impl<S> ExecutionOutput<S> {
/// Creates execution output from data, with no errors.
pub fn from_data(data: Value<S>) -> Self {
Self {
data,
errors: vec![],
}
}
}
/// Global subscription coordinator trait. /// Global subscription coordinator trait.
/// ///
/// With regular queries we could get away with not having some in-between /// With regular queries we could get away with not having some in-between
@ -33,7 +57,7 @@ where
{ {
/// Type of [`SubscriptionConnection`]s this [`SubscriptionCoordinator`] /// Type of [`SubscriptionConnection`]s this [`SubscriptionCoordinator`]
/// returns /// returns
type Connection: SubscriptionConnection<'a, S>; type Connection: SubscriptionConnection<S>;
/// Type of error while trying to spawn [`SubscriptionConnection`] /// Type of error while trying to spawn [`SubscriptionConnection`]
type Error; type Error;
@ -58,7 +82,7 @@ where
/// ///
/// It can be treated as [`futures::Stream`] yielding [`GraphQLResponse`]s in /// It can be treated as [`futures::Stream`] yielding [`GraphQLResponse`]s in
/// server integration crates. /// server integration crates.
pub trait SubscriptionConnection<'a, S>: futures::Stream<Item = GraphQLResponse<'a, S>> {} pub trait SubscriptionConnection<S>: futures::Stream<Item = ExecutionOutput<S>> {}
/// Extension of [`GraphQLValue`] trait with asynchronous [subscription][1] execution logic. /// Extension of [`GraphQLValue`] trait with asynchronous [subscription][1] execution logic.
/// It should be used with [`GraphQLValue`] in order to implement [subscription][1] resolvers on /// It should be used with [`GraphQLValue`] in order to implement [subscription][1] resolvers on

View file

@ -19,9 +19,9 @@ use std::{
use futures::{future, stream, FutureExt as _, Stream, StreamExt as _, TryFutureExt as _}; use futures::{future, stream, FutureExt as _, Stream, StreamExt as _, TryFutureExt as _};
use juniper::{ use juniper::{
http::{GraphQLRequest, GraphQLResponse}, http::GraphQLRequest, BoxFuture, ExecutionError, ExecutionOutput, GraphQLError,
BoxFuture, ExecutionError, GraphQLError, GraphQLSubscriptionType, GraphQLTypeAsync, Object, GraphQLSubscriptionType, GraphQLTypeAsync, Object, ScalarValue, SubscriptionConnection,
ScalarValue, SubscriptionConnection, SubscriptionCoordinator, Value, ValuesStream, SubscriptionCoordinator, Value, ValuesStream,
}; };
/// Simple [`SubscriptionCoordinator`] implementation: /// Simple [`SubscriptionCoordinator`] implementation:
@ -88,8 +88,8 @@ where
/// Simple [`SubscriptionConnection`] implementation. /// Simple [`SubscriptionConnection`] implementation.
/// ///
/// Resolves `Value<ValuesStream>` into `Stream<Item = GraphQLResponse>` using the following /// Resolves `Value<ValuesStream>` into `Stream<Item = ExecutionOutput<S>>` using
/// logic: /// the following logic:
/// ///
/// [`Value::Null`] - returns [`Value::Null`] once /// [`Value::Null`] - returns [`Value::Null`] once
/// [`Value::Scalar`] - returns `Ok` value or [`Value::Null`] and errors vector /// [`Value::Scalar`] - returns `Ok` value or [`Value::Null`] and errors vector
@ -98,7 +98,7 @@ where
/// [`Value::Object`] - waits while each field of the [`Object`] is returned, then yields the whole object /// [`Value::Object`] - waits while each field of the [`Object`] is returned, then yields the whole object
/// `Value::Object<Value::Object<_>>` - returns [`Value::Null`] if [`Value::Object`] consists of sub-objects /// `Value::Object<Value::Object<_>>` - returns [`Value::Null`] if [`Value::Object`] consists of sub-objects
pub struct Connection<'a, S> { pub struct Connection<'a, S> {
stream: Pin<Box<dyn Stream<Item = GraphQLResponse<'a, S>> + Send + 'a>>, stream: Pin<Box<dyn Stream<Item = ExecutionOutput<S>> + Send + 'a>>,
} }
impl<'a, S> Connection<'a, S> impl<'a, S> Connection<'a, S>
@ -113,16 +113,13 @@ where
} }
} }
impl<'a, S> SubscriptionConnection<'a, S> for Connection<'a, S> where impl<'a, S> SubscriptionConnection<S> for Connection<'a, S> where S: ScalarValue + Send + Sync + 'a {}
S: ScalarValue + Send + Sync + 'a
{
}
impl<'a, S> Stream for Connection<'a, S> impl<'a, S> Stream for Connection<'a, S>
where where
S: ScalarValue + Send + Sync + 'a, S: ScalarValue + Send + Sync + 'a,
{ {
type Item = GraphQLResponse<'a, S>; type Item = ExecutionOutput<S>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
// this is safe as stream is only mutated here and is not moved anywhere // this is safe as stream is only mutated here and is not moved anywhere
@ -132,7 +129,7 @@ where
} }
} }
/// Creates [`futures::Stream`] that yields [`GraphQLResponse`]s depending on the given [`Value`]: /// Creates [`futures::Stream`] that yields `ExecutionOutput<S>`s depending on the given [`Value`]:
/// ///
/// [`Value::Null`] - returns [`Value::Null`] once /// [`Value::Null`] - returns [`Value::Null`] once
/// [`Value::Scalar`] - returns `Ok` value or [`Value::Null`] and errors vector /// [`Value::Scalar`] - returns `Ok` value or [`Value::Null`] and errors vector
@ -143,23 +140,28 @@ where
fn whole_responses_stream<'a, S>( fn whole_responses_stream<'a, S>(
stream: Value<ValuesStream<'a, S>>, stream: Value<ValuesStream<'a, S>>,
errors: Vec<ExecutionError<S>>, errors: Vec<ExecutionError<S>>,
) -> Pin<Box<dyn Stream<Item = GraphQLResponse<'a, S>> + Send + 'a>> ) -> Pin<Box<dyn Stream<Item = ExecutionOutput<S>> + Send + 'a>>
where where
S: ScalarValue + Send + Sync + 'a, S: ScalarValue + Send + Sync + 'a,
{ {
if !errors.is_empty() { if !errors.is_empty() {
return Box::pin(stream::once(future::ready(GraphQLResponse::from_result( return stream::once(future::ready(ExecutionOutput {
Ok((Value::Null, errors)), data: Value::null(),
)))); errors,
}))
.boxed();
} }
match stream { match stream {
Value::Null => Box::pin(stream::once(future::ready(GraphQLResponse::from_result( Value::Null => Box::pin(stream::once(future::ready(ExecutionOutput::from_data(
Ok((Value::Null, vec![])), Value::null(),
)))), )))),
Value::Scalar(s) => Box::pin(s.map(|res| match res { Value::Scalar(s) => Box::pin(s.map(|res| match res {
Ok(val) => GraphQLResponse::from_result(Ok((val, vec![]))), Ok(val) => ExecutionOutput::from_data(val),
Err(err) => GraphQLResponse::from_result(Ok((Value::Null, vec![err]))), Err(err) => ExecutionOutput {
data: Value::null(),
errors: vec![err],
},
})), })),
Value::List(list) => { Value::List(list) => {
let mut streams = vec![]; let mut streams = vec![];
@ -171,9 +173,8 @@ where
Value::Object(mut object) => { Value::Object(mut object) => {
let obj_len = object.field_count(); let obj_len = object.field_count();
if obj_len == 0 { if obj_len == 0 {
return Box::pin(stream::once(future::ready(GraphQLResponse::from_result( return stream::once(future::ready(ExecutionOutput::from_data(Value::null())))
Ok((Value::Null, vec![])), .boxed();
))));
} }
let mut filled_count = 0; let mut filled_count = 0;
@ -182,7 +183,7 @@ where
ready_vec.push(None); ready_vec.push(None);
} }
let stream = stream::poll_fn(move |mut ctx| -> Poll<Option<GraphQLResponse<'a, S>>> { let stream = stream::poll_fn(move |mut ctx| -> Poll<Option<ExecutionOutput<S>>> {
let mut obj_iterator = object.iter_mut(); let mut obj_iterator = object.iter_mut();
// Due to having to modify `ready_vec` contents (by-move pattern) // Due to having to modify `ready_vec` contents (by-move pattern)
@ -233,10 +234,7 @@ where
} }
}); });
let obj = Object::from_iter(ready_vec_iterator); let obj = Object::from_iter(ready_vec_iterator);
Poll::Ready(Some(GraphQLResponse::from_result(Ok(( Poll::Ready(Some(ExecutionOutput::from_data(Value::Object(obj))))
Value::Object(obj),
vec![],
)))))
} else { } else {
Poll::Pending Poll::Pending
} }
@ -256,9 +254,13 @@ mod whole_responses_stream {
#[tokio::test] #[tokio::test]
async fn with_error() { async fn with_error() {
let expected = vec![GraphQLResponse::<DefaultScalarValue>::error( let expected = vec![ExecutionOutput {
FieldError::new("field error", Value::Null), data: Value::<DefaultScalarValue>::Null,
)]; errors: vec![ExecutionError::at_origin(FieldError::new(
"field error",
Value::Null,
))],
}];
let expected = serde_json::to_string(&expected).unwrap(); let expected = serde_json::to_string(&expected).unwrap();
let result = whole_responses_stream::<DefaultScalarValue>( let result = whole_responses_stream::<DefaultScalarValue>(
@ -277,10 +279,9 @@ mod whole_responses_stream {
#[tokio::test] #[tokio::test]
async fn value_null() { async fn value_null() {
let expected = vec![GraphQLResponse::<DefaultScalarValue>::from_result(Ok(( let expected = vec![ExecutionOutput::from_data(
Value::Null, Value::<DefaultScalarValue>::Null,
vec![], )];
)))];
let expected = serde_json::to_string(&expected).unwrap(); let expected = serde_json::to_string(&expected).unwrap();
let result = whole_responses_stream::<DefaultScalarValue>(Value::Null, vec![]) let result = whole_responses_stream::<DefaultScalarValue>(Value::Null, vec![])
@ -296,26 +297,11 @@ mod whole_responses_stream {
#[tokio::test] #[tokio::test]
async fn value_scalar() { async fn value_scalar() {
let expected = vec![ let expected = vec![
GraphQLResponse::from_result(Ok(( ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(1i32))),
Value::Scalar(DefaultScalarValue::Int(1i32)), ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(2i32))),
vec![], ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(3i32))),
))), ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(4i32))),
GraphQLResponse::from_result(Ok(( ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(5i32))),
Value::Scalar(DefaultScalarValue::Int(2i32)),
vec![],
))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(3i32)),
vec![],
))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(4i32)),
vec![],
))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(5i32)),
vec![],
))),
]; ];
let expected = serde_json::to_string(&expected).unwrap(); let expected = serde_json::to_string(&expected).unwrap();
@ -340,19 +326,10 @@ mod whole_responses_stream {
#[tokio::test] #[tokio::test]
async fn value_list() { async fn value_list() {
let expected = vec![ let expected = vec![
GraphQLResponse::from_result(Ok(( ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(1i32))),
Value::Scalar(DefaultScalarValue::Int(1i32)), ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(2i32))),
vec![], ExecutionOutput::from_data(Value::null()),
))), ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(4i32))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(2i32)),
vec![],
))),
GraphQLResponse::from_result(Ok((Value::Null, vec![]))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(4i32)),
vec![],
))),
]; ];
let expected = serde_json::to_string(&expected).unwrap(); let expected = serde_json::to_string(&expected).unwrap();
@ -380,25 +357,19 @@ mod whole_responses_stream {
#[tokio::test] #[tokio::test]
async fn value_object() { async fn value_object() {
let expected = vec![ let expected = vec![
GraphQLResponse::from_result(Ok(( ExecutionOutput::from_data(Value::Object(Object::from_iter(
Value::Object(Object::from_iter(
vec![ vec![
("one", Value::Scalar(DefaultScalarValue::Int(1i32))), ("one", Value::Scalar(DefaultScalarValue::Int(1i32))),
("two", Value::Scalar(DefaultScalarValue::Int(1i32))), ("two", Value::Scalar(DefaultScalarValue::Int(1i32))),
] ]
.into_iter(), .into_iter(),
)),
vec![],
))), ))),
GraphQLResponse::from_result(Ok(( ExecutionOutput::from_data(Value::Object(Object::from_iter(
Value::Object(Object::from_iter(
vec![ vec![
("one", Value::Scalar(DefaultScalarValue::Int(2i32))), ("one", Value::Scalar(DefaultScalarValue::Int(2i32))),
("two", Value::Scalar(DefaultScalarValue::Int(2i32))), ("two", Value::Scalar(DefaultScalarValue::Int(2i32))),
] ]
.into_iter(), .into_iter(),
)),
vec![],
))), ))),
]; ];
let expected = serde_json::to_string(&expected).unwrap(); let expected = serde_json::to_string(&expected).unwrap();