diff --git a/examples/basic_subscriptions/src/main.rs b/examples/basic_subscriptions/src/main.rs index 3f3cbe0d..a5d463e1 100644 --- a/examples/basic_subscriptions/src/main.rs +++ b/examples/basic_subscriptions/src/main.rs @@ -1,11 +1,14 @@ #![deny(warnings)] -use futures::{Stream, StreamExt}; -use juniper::http::GraphQLRequest; -use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode, SubscriptionCoordinator}; -use juniper_subscriptions::Coordinator; use std::pin::Pin; +use futures::{Stream, StreamExt}; +use juniper::{ + http::GraphQLRequest, DefaultScalarValue, EmptyMutation, FieldError, RootNode, + SubscriptionCoordinator, +}; +use juniper_subscriptions::Coordinator; + #[derive(Clone)] pub struct Database; @@ -50,13 +53,11 @@ async fn main() { let schema = schema(); let coordinator = Coordinator::new(schema); let req: GraphQLRequest = serde_json::from_str( - r#" - { + r#"{ "query": "subscription { helloWorld }" - } - "#, + }"#, ) - .unwrap(); + .unwrap(); let ctx = Database::new(); let mut conn = coordinator.subscribe(&req, &ctx).await.unwrap(); while let Some(result) = conn.next().await { diff --git a/examples/warp_subscriptions/Cargo.toml b/examples/warp_subscriptions/Cargo.toml index 3212f799..152f30a0 100644 --- a/examples/warp_subscriptions/Cargo.toml +++ b/examples/warp_subscriptions/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] env_logger = "0.6.2" -futures = { version = "=0.3.1" } +futures = "0.3.1" log = "0.4.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/examples/warp_subscriptions/src/main.rs b/examples/warp_subscriptions/src/main.rs index 4f42c267..f0f9f737 100644 --- a/examples/warp_subscriptions/src/main.rs +++ b/examples/warp_subscriptions/src/main.rs @@ -1,6 +1,6 @@ //! This example demonstrates asynchronous subscriptions with warp and tokio 0.2 -use std::{pin::Pin, sync::Arc, time::Duration}; +use std::{env, pin::Pin, sync::Arc, time::Duration}; use futures::{Future, FutureExt as _, Stream}; use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode}; @@ -136,10 +136,10 @@ fn schema() -> Schema { #[tokio::main] async fn main() { - ::std::env::set_var("RUST_LOG", "warp_subscriptions"); + env::set_var("RUST_LOG", "warp_subscriptions"); env_logger::init(); - let log = warp::log("warp_server"); + let log = warp::log("warp_subscriptions"); let homepage = warp::path::end().map(|| { Response::builder() diff --git a/juniper/Cargo.toml b/juniper/Cargo.toml index 431b30e2..4fa16654 100644 --- a/juniper/Cargo.toml +++ b/juniper/Cargo.toml @@ -43,6 +43,7 @@ bson = { version = "1.0.0", optional = true } chrono = { version = "0.4.0", optional = true } fnv = "1.0.3" futures = "0.3.1" +futures-enum = "0.1.12" indexmap = { version = "1.0.0", features = ["serde-1"] } serde = { version = "1.0.8", features = ["derive"] } serde_json = { version="1.0.2", optional = true } diff --git a/juniper/src/executor/mod.rs b/juniper/src/executor/mod.rs index f3c25606..f9e5f465 100644 --- a/juniper/src/executor/mod.rs +++ b/juniper/src/executor/mod.rs @@ -7,6 +7,7 @@ use std::{ }; use fnv::FnvHashMap; +use futures::Stream; use crate::{ ast::{ @@ -224,9 +225,9 @@ pub type FieldResult = Result>; /// The result of resolving an unspecified field pub type ExecutionResult = Result, FieldError>; -/// Boxed `futures::Stream` yielding `Result, ExecutionError>` +/// Boxed `Stream` yielding `Result, ExecutionError>` pub type ValuesStream<'a, S = DefaultScalarValue> = - std::pin::Pin, ExecutionError>> + Send + 'a>>; + std::pin::Pin, ExecutionError>> + Send + 'a>>; /// The map of variables used for substitution during query execution pub type Variables = HashMap>; @@ -374,18 +375,15 @@ where 'i: 'res, 'v: 'res, 'a: 'res, - T: GraphQLSubscriptionValue + Send + Sync, - T::TypeInfo: Send + Sync, - CtxT: Send + Sync, + T: GraphQLSubscriptionValue + ?Sized, + T::TypeInfo: Sync, + CtxT: Sync, S: Send + Sync, { - match self.subscribe(info, value).await { - Ok(v) => v, - Err(e) => { - self.push_error(e); - Value::Null - } - } + self.subscribe(info, value).await.unwrap_or_else(|e| { + self.push_error(e); + Value::Null + }) } /// Resolve a single arbitrary value into a stream of [`Value`]s. @@ -398,9 +396,9 @@ where where 't: 'res, 'a: 'res, - T: GraphQLSubscriptionValue, - T::TypeInfo: Send + Sync, - CtxT: Send + Sync, + T: GraphQLSubscriptionValue + ?Sized, + T::TypeInfo: Sync, + CtxT: Sync, S: Send + Sync, { value.resolve_into_stream(info, self).await @@ -427,9 +425,9 @@ where /// Resolve a single arbitrary value into an `ExecutionResult` pub async fn resolve_async(&self, info: &T::TypeInfo, value: &T) -> ExecutionResult where - T: GraphQLValueAsync + Send + Sync + ?Sized, - T::TypeInfo: Send + Sync, - CtxT: Send + Sync, + T: GraphQLValueAsync + ?Sized, + T::TypeInfo: Sync, + CtxT: Sync, S: Send + Sync, { value @@ -444,10 +442,10 @@ where value: &T, ) -> ExecutionResult where - T: GraphQLValueAsync + Send + Sync, - T::TypeInfo: Send + Sync, + T: GraphQLValueAsync + ?Sized, + T::TypeInfo: Sync, + NewCtxT: FromContext + Sync, S: Send + Sync, - NewCtxT: FromContext + Send + Sync, { let e = self.replaced_context(>::from(self.context)); e.resolve_async(info, value).await @@ -471,9 +469,9 @@ where /// If the field fails to resolve, `null` will be returned. pub async fn resolve_into_value_async(&self, info: &T::TypeInfo, value: &T) -> Value where - T: GraphQLValueAsync + Send + Sync + ?Sized, - T::TypeInfo: Send + Sync, - CtxT: Send + Sync, + T: GraphQLValueAsync + ?Sized, + T::TypeInfo: Sync, + CtxT: Sync, S: Send + Sync, { self.resolve_async(info, value).await.unwrap_or_else(|e| { @@ -750,18 +748,18 @@ impl ExecutionError { /// Create new `Executor` and start query/mutation execution. /// Returns `IsSubscription` error if subscription is passed. -pub fn execute_validated_query<'a, 'b, QueryT, MutationT, SubscriptionT, CtxT, S>( +pub fn execute_validated_query<'a, 'b, QueryT, MutationT, SubscriptionT, S>( document: &'b Document, operation: &'b Spanning>, root_node: &RootNode, variables: &Variables, - context: &CtxT, + context: &QueryT::Context, ) -> Result<(Value, Vec>), GraphQLError<'a>> where S: ScalarValue, - QueryT: GraphQLType, - MutationT: GraphQLType, - SubscriptionT: GraphQLType, + QueryT: GraphQLType, + MutationT: GraphQLType, + SubscriptionT: GraphQLType, { if operation.item.operation_type == OperationType::Subscription { return Err(GraphQLError::IsSubscription); @@ -844,22 +842,22 @@ where /// Create new `Executor` and start asynchronous query execution. /// Returns `IsSubscription` error if subscription is passed. -pub async fn execute_validated_query_async<'a, 'b, QueryT, MutationT, SubscriptionT, CtxT, S>( +pub async fn execute_validated_query_async<'a, 'b, QueryT, MutationT, SubscriptionT, S>( document: &'b Document<'a, S>, operation: &'b Spanning>, root_node: &RootNode<'a, QueryT, MutationT, SubscriptionT, S>, variables: &Variables, - context: &CtxT, + context: &QueryT::Context, ) -> Result<(Value, Vec>), GraphQLError<'a>> where + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + QueryT::Context: Sync, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLType + Sync, + SubscriptionT::TypeInfo: Sync, S: ScalarValue + Send + Sync, - QueryT: GraphQLTypeAsync + Send + Sync, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLType + Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync, { if operation.item.operation_type == OperationType::Subscription { return Err(GraphQLError::IsSubscription); @@ -986,27 +984,26 @@ pub async fn resolve_validated_subscription< QueryT, MutationT, SubscriptionT, - CtxT, S, >( document: &Document<'d, S>, operation: &Spanning>, root_node: &'r RootNode<'r, QueryT, MutationT, SubscriptionT, S>, variables: &Variables, - context: &'r CtxT, + context: &'r QueryT::Context, ) -> Result<(Value>, Vec>), GraphQLError<'r>> where 'r: 'exec_ref, 'd: 'r, 'op: 'd, + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + QueryT::Context: Sync + 'r, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLSubscriptionType, + SubscriptionT::TypeInfo: Sync, S: ScalarValue + Send + Sync, - QueryT: GraphQLTypeAsync + Send + Sync, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync + 'r, { if operation.item.operation_type != OperationType::Subscription { return Err(GraphQLError::NotSubscription); diff --git a/juniper/src/http/mod.rs b/juniper/src/http/mod.rs index 182bc2fe..59d9cb45 100644 --- a/juniper/src/http/mod.rs +++ b/juniper/src/http/mod.rs @@ -75,16 +75,16 @@ where /// /// This is a simple wrapper around the `execute_sync` function exposed at the /// top level of this crate. - pub fn execute_sync<'a, CtxT, QueryT, MutationT, SubscriptionT>( + pub fn execute_sync<'a, QueryT, MutationT, SubscriptionT>( &'a self, root_node: &'a RootNode, - context: &CtxT, + context: &QueryT::Context, ) -> GraphQLResponse<'a, S> where S: ScalarValue, - QueryT: GraphQLType, - MutationT: GraphQLType, - SubscriptionT: GraphQLType, + QueryT: GraphQLType, + MutationT: GraphQLType, + SubscriptionT: GraphQLType, { GraphQLResponse(crate::execute_sync( &self.query, @@ -99,20 +99,20 @@ where /// /// This is a simple wrapper around the `execute` function exposed at the /// top level of this crate. - pub async fn execute<'a, CtxT, QueryT, MutationT, SubscriptionT>( + pub async fn execute<'a, QueryT, MutationT, SubscriptionT>( &'a self, root_node: &'a RootNode<'a, QueryT, MutationT, SubscriptionT, S>, - context: &'a CtxT, + context: &'a QueryT::Context, ) -> GraphQLResponse<'a, S> where + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + QueryT::Context: Sync, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLType + Sync, + SubscriptionT::TypeInfo: Sync, S: ScalarValue + Send + Sync, - QueryT: GraphQLTypeAsync + Send + Sync, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLType + Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync, { let op = self.operation_name(); let vars = &self.variables(); @@ -125,23 +125,23 @@ where /// specified schema and context. /// This is a wrapper around the `resolve_into_stream` function exposed at the top /// level of this crate. -pub async fn resolve_into_stream<'req, 'rn, 'ctx, 'a, CtxT, QueryT, MutationT, SubscriptionT, S>( +pub async fn resolve_into_stream<'req, 'rn, 'ctx, 'a, QueryT, MutationT, SubscriptionT, S>( req: &'req GraphQLRequest, root_node: &'rn RootNode<'a, QueryT, MutationT, SubscriptionT, S>, - context: &'ctx CtxT, + context: &'ctx QueryT::Context, ) -> Result<(Value>, Vec>), GraphQLError<'a>> where 'req: 'a, 'rn: 'a, 'ctx: 'a, + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + QueryT::Context: Sync, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLSubscriptionType, + SubscriptionT::TypeInfo: Sync, S: ScalarValue + Send + Sync, - QueryT: GraphQLTypeAsync + Send + Sync, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync, { let op = req.operation_name(); let vars = req.variables(); @@ -257,15 +257,15 @@ where /// Execute a GraphQL batch request synchronously using the specified schema and context /// /// This is a simple wrapper around the `execute_sync` function exposed in GraphQLRequest. - pub fn execute_sync<'a, CtxT, QueryT, MutationT, SubscriptionT>( + pub fn execute_sync<'a, QueryT, MutationT, SubscriptionT>( &'a self, root_node: &'a RootNode, - context: &CtxT, + context: &QueryT::Context, ) -> GraphQLBatchResponse<'a, S> where - QueryT: GraphQLType, - MutationT: GraphQLType, - SubscriptionT: GraphQLType, + QueryT: GraphQLType, + MutationT: GraphQLType, + SubscriptionT: GraphQLType, { match *self { Self::Single(ref req) => { @@ -283,27 +283,27 @@ where /// /// This is a simple wrapper around the `execute` function exposed in /// GraphQLRequest - pub async fn execute<'a, CtxT, QueryT, MutationT, SubscriptionT>( + pub async fn execute<'a, QueryT, MutationT, SubscriptionT>( &'a self, root_node: &'a RootNode<'a, QueryT, MutationT, SubscriptionT, S>, - context: &'a CtxT, + context: &'a QueryT::Context, ) -> GraphQLBatchResponse<'a, S> where - QueryT: GraphQLTypeAsync + Send + Sync, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync, + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + QueryT::Context: Sync, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLSubscriptionType, + SubscriptionT::TypeInfo: Sync, S: Send + Sync, { - match *self { - Self::Single(ref req) => { + match self { + Self::Single(req) => { let resp = req.execute(root_node, context).await; GraphQLBatchResponse::Single(resp) } - Self::Batch(ref reqs) => { + Self::Batch(reqs) => { let resps = futures::future::join_all( reqs.iter().map(|req| req.execute(root_node, context)), ) diff --git a/juniper/src/lib.rs b/juniper/src/lib.rs index 8b9f6894..baba1af2 100644 --- a/juniper/src/lib.rs +++ b/juniper/src/lib.rs @@ -229,18 +229,18 @@ impl<'a> fmt::Display for GraphQLError<'a> { impl<'a> std::error::Error for GraphQLError<'a> {} /// Execute a query synchronously in a provided schema -pub fn execute_sync<'a, S, CtxT, QueryT, MutationT, SubscriptionT>( +pub fn execute_sync<'a, S, QueryT, MutationT, SubscriptionT>( document_source: &'a str, operation_name: Option<&str>, root_node: &'a RootNode, variables: &Variables, - context: &CtxT, + context: &QueryT::Context, ) -> Result<(Value, Vec>), GraphQLError<'a>> where S: ScalarValue, - QueryT: GraphQLType, - MutationT: GraphQLType, - SubscriptionT: GraphQLType, + QueryT: GraphQLType, + MutationT: GraphQLType, + SubscriptionT: GraphQLType, { let document = parse_document_source(document_source, &root_node.schema)?; @@ -268,22 +268,22 @@ where } /// Execute a query in a provided schema -pub async fn execute<'a, S, CtxT, QueryT, MutationT, SubscriptionT>( +pub async fn execute<'a, S, QueryT, MutationT, SubscriptionT>( document_source: &'a str, operation_name: Option<&str>, root_node: &'a RootNode<'a, QueryT, MutationT, SubscriptionT, S>, variables: &Variables, - context: &CtxT, + context: &QueryT::Context, ) -> Result<(Value, Vec>), GraphQLError<'a>> where + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + QueryT::Context: Sync, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLType + Sync, + SubscriptionT::TypeInfo: Sync, S: ScalarValue + Send + Sync, - QueryT: GraphQLTypeAsync + Send + Sync, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLType + Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync, { let document = parse_document_source(document_source, &root_node.schema)?; @@ -312,22 +312,22 @@ where } /// Resolve subscription into `ValuesStream` -pub async fn resolve_into_stream<'a, S, CtxT, QueryT, MutationT, SubscriptionT>( +pub async fn resolve_into_stream<'a, S, QueryT, MutationT, SubscriptionT>( document_source: &'a str, operation_name: Option<&str>, root_node: &'a RootNode<'a, QueryT, MutationT, SubscriptionT, S>, variables: &Variables, - context: &'a CtxT, + context: &'a QueryT::Context, ) -> Result<(Value>, Vec>), GraphQLError<'a>> where + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + QueryT::Context: Sync, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLSubscriptionType, + SubscriptionT::TypeInfo: Sync, S: ScalarValue + Send + Sync, - QueryT: GraphQLTypeAsync + Send + Sync, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync, { let document: crate::ast::Document<'a, S> = parse_document_source(document_source, &root_node.schema)?; @@ -357,16 +357,16 @@ where } /// Execute the reference introspection query in the provided schema -pub fn introspect<'a, S, CtxT, QueryT, MutationT, SubscriptionT>( +pub fn introspect<'a, S, QueryT, MutationT, SubscriptionT>( root_node: &'a RootNode, - context: &CtxT, + context: &QueryT::Context, format: IntrospectionFormat, ) -> Result<(Value, Vec>), GraphQLError<'a>> where S: ScalarValue, - QueryT: GraphQLType, - MutationT: GraphQLType, - SubscriptionT: GraphQLType, + QueryT: GraphQLType, + MutationT: GraphQLType, + SubscriptionT: GraphQLType, { execute_sync( match format { diff --git a/juniper/src/macros/tests/impl_object.rs b/juniper/src/macros/tests/impl_object.rs index fc15a1a1..d82607da 100644 --- a/juniper/src/macros/tests/impl_object.rs +++ b/juniper/src/macros/tests/impl_object.rs @@ -134,7 +134,7 @@ impl Subscription { #[tokio::test] async fn object_introspect() { - let res = util::run_info_query::("Query").await; + let res = util::run_info_query::("Query").await; assert_eq!( res, crate::graphql_value!({ diff --git a/juniper/src/macros/tests/impl_subscription.rs b/juniper/src/macros/tests/impl_subscription.rs index 30244a42..d802a9d8 100644 --- a/juniper/src/macros/tests/impl_subscription.rs +++ b/juniper/src/macros/tests/impl_subscription.rs @@ -149,7 +149,7 @@ impl Subscription { #[tokio::test] async fn object_introspect() { - let res = util::run_info_query::("Subscription").await; + let res = util::run_info_query::("Subscription").await; assert_eq!( res, crate::graphql_value!({ diff --git a/juniper/src/macros/tests/util.rs b/juniper/src/macros/tests/util.rs index 04188371..42ccadd4 100644 --- a/juniper/src/macros/tests/util.rs +++ b/juniper/src/macros/tests/util.rs @@ -1,34 +1,41 @@ use crate::{DefaultScalarValue, GraphQLType, GraphQLTypeAsync, RootNode, Value, Variables}; -pub async fn run_query(query: &str) -> Value +pub async fn run_query(query: &str) -> Value where - Query: GraphQLTypeAsync + Default, - Mutation: GraphQLTypeAsync + Default, + Query: GraphQLTypeAsync + Default, + Query::Context: Default + Sync, + Mutation: + GraphQLTypeAsync + Default, Subscription: - GraphQLType + Default + Sync + Send, - Context: Default + Send + Sync, + GraphQLType + Default + Sync, { let schema = RootNode::new( Query::default(), Mutation::default(), Subscription::default(), ); - let (result, errs) = - crate::execute(query, None, &schema, &Variables::new(), &Context::default()) - .await - .expect("Execution failed"); + let (result, errs) = crate::execute( + query, + None, + &schema, + &Variables::new(), + &Query::Context::default(), + ) + .await + .expect("Execution failed"); assert_eq!(errs, []); result } -pub async fn run_info_query(type_name: &str) -> Value +pub async fn run_info_query(type_name: &str) -> Value where - Query: GraphQLTypeAsync + Default, - Mutation: GraphQLTypeAsync + Default, + Query: GraphQLTypeAsync + Default, + Query::Context: Default + Sync, + Mutation: + GraphQLTypeAsync + Default, Subscription: - GraphQLType + Default + Sync + Send, - Context: Default + Send + Sync, + GraphQLType + Default + Sync, { let query = format!( r#" @@ -52,7 +59,7 @@ where "#, type_name ); - let result = run_query::(&query).await; + let result = run_query::(&query).await; result .as_object_value() .expect("Result is not an object") diff --git a/juniper/src/schema/schema.rs b/juniper/src/schema/schema.rs index fd67b99a..62f03cc5 100644 --- a/juniper/src/schema/schema.rs +++ b/juniper/src/schema/schema.rs @@ -16,19 +16,19 @@ use crate::schema::{ model::{DirectiveLocation, DirectiveType, RootNode, SchemaType, TypeType}, }; -impl<'a, CtxT, S, QueryT, MutationT, SubscriptionT> GraphQLType +impl<'a, S, QueryT, MutationT, SubscriptionT> GraphQLType for RootNode<'a, QueryT, MutationT, SubscriptionT, S> where S: ScalarValue, - QueryT: GraphQLType, - MutationT: GraphQLType, - SubscriptionT: GraphQLType, + QueryT: GraphQLType, + MutationT: GraphQLType, + SubscriptionT: GraphQLType, { - fn name(info: &QueryT::TypeInfo) -> Option<&str> { + fn name(info: &Self::TypeInfo) -> Option<&str> { QueryT::name(info) } - fn meta<'r>(info: &QueryT::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> + fn meta<'r>(info: &Self::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> where S: 'r, { @@ -36,27 +36,27 @@ where } } -impl<'a, CtxT, S, QueryT, MutationT, SubscriptionT> GraphQLValue +impl<'a, S, QueryT, MutationT, SubscriptionT> GraphQLValue for RootNode<'a, QueryT, MutationT, SubscriptionT, S> where S: ScalarValue, - QueryT: GraphQLType, - MutationT: GraphQLType, - SubscriptionT: GraphQLType, + QueryT: GraphQLType, + MutationT: GraphQLType, + SubscriptionT: GraphQLType, { - type Context = CtxT; + type Context = QueryT::Context; type TypeInfo = QueryT::TypeInfo; - fn type_name<'i>(&self, info: &'i QueryT::TypeInfo) -> Option<&'i str> { + fn type_name<'i>(&self, info: &'i Self::TypeInfo) -> Option<&'i str> { QueryT::name(info) } fn resolve_field( &self, - info: &QueryT::TypeInfo, + info: &Self::TypeInfo, field: &str, args: &Arguments, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { match field { "__schema" => executor @@ -93,17 +93,17 @@ where } } -impl<'a, CtxT, S, QueryT, MutationT, SubscriptionT> GraphQLValueAsync +impl<'a, S, QueryT, MutationT, SubscriptionT> GraphQLValueAsync for RootNode<'a, QueryT, MutationT, SubscriptionT, S> where + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + QueryT::Context: Sync + 'a, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLType + Sync, + SubscriptionT::TypeInfo: Sync, S: ScalarValue + Send + Sync, - QueryT: GraphQLTypeAsync, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLType + Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync + 'a, { fn resolve_field_async<'b>( &'b self, diff --git a/juniper/src/types/async_await.rs b/juniper/src/types/async_await.rs index 159a52fb..c43e30d6 100644 --- a/juniper/src/types/async_await.rs +++ b/juniper/src/types/async_await.rs @@ -13,10 +13,10 @@ use super::base::{is_excluded, merge_key_into, Arguments, GraphQLType, GraphQLVa /// /// Convenience macros related to asynchronous queries/mutations expand into an implementation of /// this trait and [`GraphQLValue`] for the given type. -pub trait GraphQLValueAsync: GraphQLValue + Send + Sync +pub trait GraphQLValueAsync: GraphQLValue + Sync where - Self::Context: Send + Sync, - Self::TypeInfo: Send + Sync, + Self::TypeInfo: Sync, + Self::Context: Sync, S: ScalarValue + Send + Sync, { /// Resolves the value of a single field on this [`GraphQLValueAsync`]. @@ -119,34 +119,34 @@ crate::sa::assert_obj_safe!(GraphQLValueAsync); /// doesn't require manual or code-generated implementation. pub trait GraphQLTypeAsync: GraphQLValueAsync + GraphQLType where - Self::Context: Send + Sync, - Self::TypeInfo: Send + Sync, + Self::Context: Sync, + Self::TypeInfo: Sync, S: ScalarValue + Send + Sync, { } impl GraphQLTypeAsync for T where - T: GraphQLValueAsync + GraphQLType, - T::Context: Send + Sync, - T::TypeInfo: Send + Sync, + T: GraphQLValueAsync + GraphQLType + ?Sized, + T::Context: Sync, + T::TypeInfo: Sync, S: ScalarValue + Send + Sync, { } // Wrapper function around resolve_selection_set_into_async_recursive. // This wrapper is necessary because async fns can not be recursive. -fn resolve_selection_set_into_async<'a, 'e, T, CtxT, S>( +fn resolve_selection_set_into_async<'a, 'e, T, S>( instance: &'a T, info: &'a T::TypeInfo, selection_set: &'e [Selection<'e, S>], - executor: &'e Executor<'e, 'e, CtxT, S>, + executor: &'e Executor<'e, 'e, T::Context, S>, ) -> BoxFuture<'a, Value> where - T: GraphQLValueAsync + ?Sized, - T::TypeInfo: Send + Sync, + T: GraphQLValueAsync + ?Sized, + T::TypeInfo: Sync, + T::Context: Sync, S: ScalarValue + Send + Sync, - CtxT: Send + Sync, 'e: 'a, { Box::pin(resolve_selection_set_into_async_recursive( @@ -167,23 +167,31 @@ enum AsyncValue { Nested(Value), } -pub(crate) async fn resolve_selection_set_into_async_recursive<'a, T, CtxT, S>( +pub(crate) async fn resolve_selection_set_into_async_recursive<'a, T, S>( instance: &'a T, info: &'a T::TypeInfo, selection_set: &'a [Selection<'a, S>], - executor: &'a Executor<'a, 'a, CtxT, S>, + executor: &'a Executor<'a, 'a, T::Context, S>, ) -> Value where - T: GraphQLValueAsync + Send + Sync + ?Sized, - T::TypeInfo: Send + Sync, + T: GraphQLValueAsync + ?Sized, + T::TypeInfo: Sync, + T::Context: Sync, S: ScalarValue + Send + Sync, - CtxT: Send + Sync, { use futures::stream::{FuturesOrdered, StreamExt as _}; + #[derive(futures_enum::Future)] + enum AsyncValueFuture { + Field(A), + FragmentSpread(B), + InlineFragment1(C), + InlineFragment2(D), + } + let mut object = Object::with_capacity(selection_set.len()); - let mut async_values = FuturesOrdered::>>::new(); + let mut async_values = FuturesOrdered::>::new(); let meta_type = executor .schema() @@ -246,7 +254,7 @@ where let is_non_null = meta_field.field_type.is_non_null(); let response_name = response_name.to_string(); - let field_future = async move { + async_values.push(AsyncValueFuture::Field(async move { // TODO: implement custom future type instead of // two-level boxing. let res = instance @@ -270,18 +278,16 @@ where name: response_name, value, }) - }; - async_values.push(Box::pin(field_future)); + })); } + Selection::FragmentSpread(Spanning { item: ref spread, .. }) => { if is_excluded(&spread.directives, executor.variables()) { continue; } - - // TODO: prevent duplicate boxing. - let f = async move { + async_values.push(AsyncValueFuture::FragmentSpread(async move { let fragment = &executor .fragment_by_name(spread.name.item) .expect("Fragment could not be found"); @@ -293,9 +299,9 @@ where ) .await; AsyncValue::Nested(value) - }; - async_values.push(Box::pin(f)); + })); } + Selection::InlineFragment(Spanning { item: ref fragment, start: ref start_pos, @@ -322,20 +328,18 @@ where if let Ok(Value::Object(obj)) = sub_result { for (k, v) in obj { - // TODO: prevent duplicate boxing. - let f = async move { + async_values.push(AsyncValueFuture::InlineFragment1(async move { AsyncValue::Field(AsyncField { name: k, value: Some(v), }) - }; - async_values.push(Box::pin(f)); + })); } } else if let Err(e) = sub_result { sub_exec.push_error_at(e, start_pos.clone()); } } else { - let f = async move { + async_values.push(AsyncValueFuture::InlineFragment2(async move { let value = resolve_selection_set_into_async( instance, info, @@ -344,8 +348,7 @@ where ) .await; AsyncValue::Nested(value) - }; - async_values.push(Box::pin(f)); + })); } } } diff --git a/juniper/src/types/base.rs b/juniper/src/types/base.rs index 567e0e2f..f646f517 100644 --- a/juniper/src/types/base.rs +++ b/juniper/src/types/base.rs @@ -401,16 +401,16 @@ where /// and then merges returned values into `result` or pushes errors to /// field's/fragment's sub executor. /// -/// Returns false if any errors occured and true otherwise. -pub(crate) fn resolve_selection_set_into( +/// Returns false if any errors occurred and true otherwise. +pub(crate) fn resolve_selection_set_into( instance: &T, info: &T::TypeInfo, selection_set: &[Selection], - executor: &Executor, + executor: &Executor, result: &mut Object, ) -> bool where - T: GraphQLValue + ?Sized, + T: GraphQLValue + ?Sized, S: ScalarValue, { let meta_type = executor diff --git a/juniper/src/types/containers.rs b/juniper/src/types/containers.rs index 330bdb19..bb9d643f 100644 --- a/juniper/src/types/containers.rs +++ b/juniper/src/types/containers.rs @@ -9,16 +9,16 @@ use crate::{ value::{ScalarValue, Value}, }; -impl GraphQLType for Option +impl GraphQLType for Option where + T: GraphQLType, S: ScalarValue, - T: GraphQLType, { - fn name(_: &T::TypeInfo) -> Option<&'static str> { + fn name(_: &Self::TypeInfo) -> Option<&'static str> { None } - fn meta<'r>(info: &T::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> + fn meta<'r>(info: &Self::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> where S: 'r, { @@ -26,23 +26,23 @@ where } } -impl GraphQLValue for Option +impl GraphQLValue for Option where S: ScalarValue, - T: GraphQLValue, + T: GraphQLValue, { - type Context = CtxT; + type Context = T::Context; type TypeInfo = T::TypeInfo; - fn type_name(&self, _: &T::TypeInfo) -> Option<&'static str> { + fn type_name(&self, _: &Self::TypeInfo) -> Option<&'static str> { None } fn resolve( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, _: Option<&[Selection]>, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { match *self { Some(ref obj) => executor.resolve(info, obj), @@ -51,12 +51,12 @@ where } } -impl GraphQLValueAsync for Option +impl GraphQLValueAsync for Option where - T: GraphQLValueAsync, - T::TypeInfo: Send + Sync, + T: GraphQLValueAsync, + T::TypeInfo: Sync, + T::Context: Sync, S: ScalarValue + Send + Sync, - CtxT: Send + Sync, { fn resolve_async<'a>( &'a self, @@ -65,8 +65,8 @@ where executor: &'a Executor, ) -> crate::BoxFuture<'a, ExecutionResult> { let f = async move { - let value = match *self { - Some(ref obj) => executor.resolve_into_value_async(info, obj).await, + let value = match self { + Some(obj) => executor.resolve_into_value_async(info, obj).await, None => Value::null(), }; Ok(value) @@ -101,16 +101,16 @@ where } } -impl GraphQLType for Vec +impl GraphQLType for Vec where - T: GraphQLType, + T: GraphQLType, S: ScalarValue, { - fn name(_: &T::TypeInfo) -> Option<&'static str> { + fn name(_: &Self::TypeInfo) -> Option<&'static str> { None } - fn meta<'r>(info: &T::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> + fn meta<'r>(info: &Self::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> where S: 'r, { @@ -118,34 +118,34 @@ where } } -impl GraphQLValue for Vec +impl GraphQLValue for Vec where - T: GraphQLValue, + T: GraphQLValue, S: ScalarValue, { - type Context = CtxT; + type Context = T::Context; type TypeInfo = T::TypeInfo; - fn type_name(&self, _: &T::TypeInfo) -> Option<&'static str> { + fn type_name(&self, _: &Self::TypeInfo) -> Option<&'static str> { None } fn resolve( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, _: Option<&[Selection]>, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { resolve_into_list(executor, info, self.iter()) } } -impl GraphQLValueAsync for Vec +impl GraphQLValueAsync for Vec where - T: GraphQLValueAsync, - T::TypeInfo: Send + Sync, + T: GraphQLValueAsync, + T::TypeInfo: Sync, + T::Context: Sync, S: ScalarValue + Send + Sync, - CtxT: Send + Sync, { fn resolve_async<'a>( &'a self, @@ -190,16 +190,16 @@ where } } -impl GraphQLType for [T] +impl GraphQLType for [T] where S: ScalarValue, - T: GraphQLType, + T: GraphQLType, { - fn name(_: &T::TypeInfo) -> Option<&'static str> { + fn name(_: &Self::TypeInfo) -> Option<&'static str> { None } - fn meta<'r>(info: &T::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> + fn meta<'r>(info: &Self::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> where S: 'r, { @@ -207,34 +207,34 @@ where } } -impl GraphQLValue for [T] +impl GraphQLValue for [T] where S: ScalarValue, - T: GraphQLValue, + T: GraphQLValue, { - type Context = CtxT; + type Context = T::Context; type TypeInfo = T::TypeInfo; - fn type_name(&self, _: &T::TypeInfo) -> Option<&'static str> { + fn type_name(&self, _: &Self::TypeInfo) -> Option<&'static str> { None } fn resolve( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, _: Option<&[Selection]>, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { resolve_into_list(executor, info, self.iter()) } } -impl GraphQLValueAsync for [T] +impl GraphQLValueAsync for [T] where - T: GraphQLValueAsync, - T::TypeInfo: Send + Sync, + T: GraphQLValueAsync, + T::TypeInfo: Sync, + T::Context: Sync, S: ScalarValue + Send + Sync, - CtxT: Send + Sync, { fn resolve_async<'a>( &'a self, @@ -292,11 +292,11 @@ async fn resolve_into_list_async<'a, 't, S, T, I>( items: I, ) -> ExecutionResult where - S: ScalarValue + Send + Sync, I: Iterator + ExactSizeIterator, T: GraphQLValueAsync + ?Sized + 't, - T::TypeInfo: Send + Sync, - T::Context: Send + Sync, + T::TypeInfo: Sync, + T::Context: Sync, + S: ScalarValue + Send + Sync, { use futures::stream::{FuturesOrdered, StreamExt as _}; use std::iter::FromIterator; @@ -307,7 +307,7 @@ where .expect("Current type is not a list type") .is_non_null(); - let iter = items.map(|item| async move { executor.resolve_into_value_async(info, item).await }); + let iter = items.map(|it| async move { executor.resolve_into_value_async(info, it).await }); let mut futures = FuturesOrdered::from_iter(iter); let mut values = Vec::with_capacity(futures.len()); diff --git a/juniper/src/types/pointers.rs b/juniper/src/types/pointers.rs index 6cf6ca84..a1c45f81 100644 --- a/juniper/src/types/pointers.rs +++ b/juniper/src/types/pointers.rs @@ -12,16 +12,16 @@ use crate::{ BoxFuture, }; -impl GraphQLType for Box +impl GraphQLType for Box where + T: GraphQLType + ?Sized, S: ScalarValue, - T: GraphQLType + ?Sized, { - fn name(info: &T::TypeInfo) -> Option<&str> { + fn name(info: &Self::TypeInfo) -> Option<&str> { T::name(info) } - fn meta<'r>(info: &T::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> + fn meta<'r>(info: &Self::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> where S: 'r, { @@ -29,54 +29,54 @@ where } } -impl GraphQLValue for Box +impl GraphQLValue for Box where + T: GraphQLValue + ?Sized, S: ScalarValue, - T: GraphQLValue + ?Sized, { - type Context = CtxT; + type Context = T::Context; type TypeInfo = T::TypeInfo; - fn type_name<'i>(&self, info: &'i T::TypeInfo) -> Option<&'i str> { + fn type_name<'i>(&self, info: &'i Self::TypeInfo) -> Option<&'i str> { (**self).type_name(info) } fn resolve_into_type( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, name: &str, selection_set: Option<&[Selection]>, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { (**self).resolve_into_type(info, name, selection_set, executor) } fn resolve_field( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, field: &str, args: &Arguments, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { (**self).resolve_field(info, field, args, executor) } fn resolve( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, selection_set: Option<&[Selection]>, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { (**self).resolve(info, selection_set, executor) } } -impl GraphQLValueAsync for Box +impl GraphQLValueAsync for Box where - T: GraphQLValueAsync + ?Sized, - T::TypeInfo: Send + Sync, + T: GraphQLValueAsync + ?Sized, + T::TypeInfo: Sync, + T::Context: Sync, S: ScalarValue + Send + Sync, - CtxT: Send + Sync, { fn resolve_async<'a>( &'a self, @@ -111,16 +111,16 @@ where } } -impl<'e, S, T, CtxT> GraphQLType for &'e T +impl<'e, S, T> GraphQLType for &'e T where + T: GraphQLType + ?Sized, S: ScalarValue, - T: GraphQLType + ?Sized, { - fn name(info: &T::TypeInfo) -> Option<&str> { + fn name(info: &Self::TypeInfo) -> Option<&str> { T::name(info) } - fn meta<'r>(info: &T::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> + fn meta<'r>(info: &Self::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> where S: 'r, { @@ -128,43 +128,43 @@ where } } -impl<'e, S, T, CtxT> GraphQLValue for &'e T +impl<'e, S, T> GraphQLValue for &'e T where S: ScalarValue, - T: GraphQLValue + ?Sized, + T: GraphQLValue + ?Sized, { - type Context = CtxT; + type Context = T::Context; type TypeInfo = T::TypeInfo; - fn type_name<'i>(&self, info: &'i T::TypeInfo) -> Option<&'i str> { + fn type_name<'i>(&self, info: &'i Self::TypeInfo) -> Option<&'i str> { (**self).type_name(info) } fn resolve_into_type( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, name: &str, selection_set: Option<&[Selection]>, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { (**self).resolve_into_type(info, name, selection_set, executor) } fn resolve_field( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, field: &str, args: &Arguments, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { (**self).resolve_field(info, field, args, executor) } fn resolve( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, selection_set: Option<&[Selection]>, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { (**self).resolve(info, selection_set, executor) } @@ -172,10 +172,10 @@ where impl<'e, S, T> GraphQLValueAsync for &'e T where - S: ScalarValue + Send + Sync, T: GraphQLValueAsync + ?Sized, - T::TypeInfo: Send + Sync, - T::Context: Send + Sync, + T::TypeInfo: Sync, + T::Context: Sync, + S: ScalarValue + Send + Sync, { fn resolve_field_async<'b>( &'b self, @@ -212,11 +212,11 @@ where S: ScalarValue, T: GraphQLType + ?Sized, { - fn name(info: &T::TypeInfo) -> Option<&str> { + fn name(info: &Self::TypeInfo) -> Option<&str> { T::name(info) } - fn meta<'r>(info: &T::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> + fn meta<'r>(info: &Self::TypeInfo, registry: &mut Registry<'r, S>) -> MetaType<'r, S> where S: 'r, { @@ -232,35 +232,35 @@ where type Context = T::Context; type TypeInfo = T::TypeInfo; - fn type_name<'i>(&self, info: &'i T::TypeInfo) -> Option<&'i str> { + fn type_name<'i>(&self, info: &'i Self::TypeInfo) -> Option<&'i str> { (**self).type_name(info) } fn resolve_into_type( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, name: &str, selection_set: Option<&[Selection]>, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { (**self).resolve_into_type(info, name, selection_set, executor) } fn resolve_field( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, field: &str, args: &Arguments, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { (**self).resolve_field(info, field, args, executor) } fn resolve( &self, - info: &T::TypeInfo, + info: &Self::TypeInfo, selection_set: Option<&[Selection]>, - executor: &Executor, + executor: &Executor, ) -> ExecutionResult { (**self).resolve(info, selection_set, executor) } @@ -268,10 +268,10 @@ where impl<'e, S, T> GraphQLValueAsync for Arc where + T: GraphQLValueAsync + Send + ?Sized, + T::TypeInfo: Sync, + T::Context: Sync, S: ScalarValue + Send + Sync, - T: GraphQLValueAsync + ?Sized, - >::TypeInfo: Send + Sync, - >::Context: Send + Sync, { fn resolve_async<'a>( &'a self, diff --git a/juniper/src/types/scalars.rs b/juniper/src/types/scalars.rs index a2116a51..53ebc1ac 100644 --- a/juniper/src/types/scalars.rs +++ b/juniper/src/types/scalars.rs @@ -382,9 +382,9 @@ where impl GraphQLValueAsync for EmptyMutation where + Self::TypeInfo: Sync, + Self::Context: Sync, S: ScalarValue + Send + Sync, - Self::TypeInfo: Send + Sync, - Self::Context: Send + Sync, { } @@ -442,9 +442,9 @@ where impl GraphQLSubscriptionValue for EmptySubscription where + Self::TypeInfo: Sync, + Self::Context: Sync, S: ScalarValue + Send + Sync + 'static, - Self::TypeInfo: Send + Sync, - Self::Context: Send + Sync, { } diff --git a/juniper/src/types/subscriptions.rs b/juniper/src/types/subscriptions.rs index 362f2f1a..0efd1b38 100644 --- a/juniper/src/types/subscriptions.rs +++ b/juniper/src/types/subscriptions.rs @@ -1,3 +1,5 @@ +use futures::{future, stream}; + use crate::{ http::{GraphQLRequest, GraphQLResponse}, parser::Spanning, @@ -69,10 +71,10 @@ pub trait SubscriptionConnection<'a, S>: futures::Stream: GraphQLValue + Send + Sync +pub trait GraphQLSubscriptionValue: GraphQLValue + Sync where - Self::Context: Send + Sync, - Self::TypeInfo: Send + Sync, + Self::TypeInfo: Sync, + Self::Context: Sync, S: ScalarValue + Send + Sync, { /// Resolves into `Value`. @@ -181,17 +183,17 @@ crate::sa::assert_obj_safe!(GraphQLSubscriptionValue: GraphQLSubscriptionValue + GraphQLType where - Self::Context: Send + Sync, - Self::TypeInfo: Send + Sync, + Self::Context: Sync, + Self::TypeInfo: Sync, S: ScalarValue + Send + Sync, { } impl GraphQLSubscriptionType for T where - T: GraphQLSubscriptionValue + GraphQLType, - T::Context: Send + Sync, - T::TypeInfo: Send + Sync, + T: GraphQLSubscriptionValue + GraphQLType + ?Sized, + T::Context: Sync, + T::TypeInfo: Sync, S: ScalarValue + Send + Sync, { } @@ -199,10 +201,10 @@ where /// Wrapper function around `resolve_selection_set_into_stream_recursive`. /// This wrapper is necessary because async fns can not be recursive. /// Panics if executor's current selection set is None. -pub(crate) fn resolve_selection_set_into_stream<'i, 'inf, 'ref_e, 'e, 'res, 'fut, T, CtxT, S>( +pub(crate) fn resolve_selection_set_into_stream<'i, 'inf, 'ref_e, 'e, 'res, 'fut, T, S>( instance: &'i T, info: &'inf T::TypeInfo, - executor: &'ref_e Executor<'ref_e, 'e, CtxT, S>, + executor: &'ref_e Executor<'ref_e, 'e, T::Context, S>, ) -> BoxFuture<'fut, Value>> where 'inf: 'res, @@ -211,10 +213,10 @@ where 'e: 'fut, 'ref_e: 'fut, 'res: 'fut, - T: GraphQLSubscriptionValue + ?Sized, - T::TypeInfo: Send + Sync, + T: GraphQLSubscriptionValue + ?Sized, + T::TypeInfo: Sync, + T::Context: Sync, S: ScalarValue + Send + Sync, - CtxT: Send + Sync, { Box::pin(resolve_selection_set_into_stream_recursive( instance, info, executor, @@ -224,16 +226,16 @@ where /// Selection set default resolver logic. /// Returns `Value::Null` if cannot keep resolving. Otherwise pushes errors to /// `Executor`. -async fn resolve_selection_set_into_stream_recursive<'i, 'inf, 'ref_e, 'e, 'res, T, CtxT, S>( +async fn resolve_selection_set_into_stream_recursive<'i, 'inf, 'ref_e, 'e, 'res, T, S>( instance: &'i T, info: &'inf T::TypeInfo, - executor: &'ref_e Executor<'ref_e, 'e, CtxT, S>, + executor: &'ref_e Executor<'ref_e, 'e, T::Context, S>, ) -> Value> where - T: GraphQLSubscriptionValue + Send + Sync + ?Sized, - T::TypeInfo: Send + Sync, + T: GraphQLSubscriptionValue + ?Sized, + T::TypeInfo: Sync, + T::Context: Sync, S: ScalarValue + Send + Sync, - CtxT: Send + Sync, 'inf: 'res, 'e: 'res, { @@ -270,7 +272,7 @@ where Value::scalar(instance.concrete_type_name(executor.context(), info)); object.add_field( response_name, - Value::Scalar(Box::pin(futures::stream::once(async { Ok(typename) }))), + Value::Scalar(Box::pin(stream::once(future::ok(typename)))), ); continue; } @@ -278,11 +280,11 @@ where let meta_field = meta_type .field_by_name(f.name.item) .unwrap_or_else(|| { - panic!(format!( + panic!( "Field {} not found on type {:?}", f.name.item, - meta_type.name() - )) + meta_type.name(), + ) }) .clone(); @@ -366,6 +368,7 @@ where Err(e) => sub_exec.push_error_at(e, start_pos.clone()), } } + Selection::InlineFragment(Spanning { item: ref fragment, start: ref start_pos, diff --git a/juniper_actix/examples/actix_server.rs b/juniper_actix/examples/actix_server.rs index daf63411..9a1355e0 100644 --- a/juniper_actix/examples/actix_server.rs +++ b/juniper_actix/examples/actix_server.rs @@ -1,6 +1,7 @@ #![deny(warnings)] -extern crate log; +use std::env; + use actix_cors::Cors; use actix_web::{middleware, web, App, Error, HttpResponse, HttpServer}; use juniper::{ @@ -38,8 +39,9 @@ async fn graphql( #[actix_rt::main] async fn main() -> std::io::Result<()> { - ::std::env::set_var("RUST_LOG", "actix_web=info"); + env::set_var("RUST_LOG", "info"); env_logger::init(); + let server = HttpServer::new(move || { App::new() .data(schema()) diff --git a/juniper_actix/src/lib.rs b/juniper_actix/src/lib.rs index e85f536d..cdff3521 100644 --- a/juniper_actix/src/lib.rs +++ b/juniper_actix/src/lib.rs @@ -83,21 +83,21 @@ where } /// Actix Web GraphQL Handler for GET and POST requests -pub async fn graphql_handler( +pub async fn graphql_handler( schema: &juniper::RootNode<'static, Query, Mutation, Subscription, S>, - context: &Context, + context: &CtxT, req: HttpRequest, payload: actix_web::web::Payload, ) -> Result where - S: ScalarValue + Send + Sync + 'static, - Context: Send + Sync + 'static, - Query: juniper::GraphQLTypeAsync + Send + Sync + 'static, - Query::TypeInfo: Send + Sync, - Mutation: juniper::GraphQLTypeAsync + Send + Sync + 'static, - Mutation::TypeInfo: Send + Sync, - Subscription: juniper::GraphQLSubscriptionType + Send + Sync + 'static, - Subscription::TypeInfo: Send + Sync, + Query: juniper::GraphQLTypeAsync, + Query::TypeInfo: Sync, + Mutation: juniper::GraphQLTypeAsync, + Mutation::TypeInfo: Sync, + Subscription: juniper::GraphQLSubscriptionType, + Subscription::TypeInfo: Sync, + CtxT: Sync, + S: ScalarValue + Send + Sync, { match *req.method() { Method::POST => post_graphql_handler(schema, context, req, payload).await, @@ -108,20 +108,20 @@ where } } /// Actix GraphQL Handler for GET requests -pub async fn get_graphql_handler( +pub async fn get_graphql_handler( schema: &juniper::RootNode<'static, Query, Mutation, Subscription, S>, - context: &Context, + context: &CtxT, req: HttpRequest, ) -> Result where - S: ScalarValue + Send + Sync + 'static, - Context: Send + Sync + 'static, - Query: juniper::GraphQLTypeAsync + Send + Sync + 'static, - Query::TypeInfo: Send + Sync, - Mutation: juniper::GraphQLTypeAsync + Send + Sync + 'static, - Mutation::TypeInfo: Send + Sync, - Subscription: juniper::GraphQLSubscriptionType + Send + Sync + 'static, - Subscription::TypeInfo: Send + Sync, + Query: juniper::GraphQLTypeAsync, + Query::TypeInfo: Sync, + Mutation: juniper::GraphQLTypeAsync, + Mutation::TypeInfo: Sync, + Subscription: juniper::GraphQLSubscriptionType, + Subscription::TypeInfo: Sync, + CtxT: Sync, + S: ScalarValue + Send + Sync, { let get_req = web::Query::::from_query(req.query_string())?; let req = GraphQLRequest::from(get_req.into_inner()); @@ -137,21 +137,21 @@ where } /// Actix GraphQL Handler for POST requests -pub async fn post_graphql_handler( +pub async fn post_graphql_handler( schema: &juniper::RootNode<'static, Query, Mutation, Subscription, S>, - context: &Context, + context: &CtxT, req: HttpRequest, payload: actix_web::web::Payload, ) -> Result where - S: ScalarValue + Send + Sync + 'static, - Context: Send + Sync + 'static, - Query: juniper::GraphQLTypeAsync + Send + Sync + 'static, - Query::TypeInfo: Send + Sync, - Mutation: juniper::GraphQLTypeAsync + Send + Sync + 'static, - Mutation::TypeInfo: Send + Sync, - Subscription: juniper::GraphQLSubscriptionType + Send + Sync + 'static, - Subscription::TypeInfo: Send + Sync, + Query: juniper::GraphQLTypeAsync, + Query::TypeInfo: Sync, + Mutation: juniper::GraphQLTypeAsync, + Mutation::TypeInfo: Sync, + Subscription: juniper::GraphQLSubscriptionType, + Subscription::TypeInfo: Sync, + CtxT: Sync, + S: ScalarValue + Send + Sync, { let content_type_header = req .headers() diff --git a/juniper_codegen/src/derive_scalar_value.rs b/juniper_codegen/src/derive_scalar_value.rs index f5e372b5..254fb93e 100644 --- a/juniper_codegen/src/derive_scalar_value.rs +++ b/juniper_codegen/src/derive_scalar_value.rs @@ -101,12 +101,12 @@ fn impl_scalar_struct( }; let _async = quote!( - impl <__S> ::juniper::GraphQLValueAsync<__S> for #ident + impl<__S> ::juniper::GraphQLValueAsync<__S> for #ident where + Self: Sync, + Self::TypeInfo: Sync, + Self::Context: Sync, __S: ::juniper::ScalarValue + Send + Sync, - Self: Send + Sync, - Self::Context: Send + Sync, - Self::TypeInfo: Send + Sync, { fn resolve_async<'a>( &'a self, diff --git a/juniper_codegen/src/graphql_union/mod.rs b/juniper_codegen/src/graphql_union/mod.rs index aaec570a..59f066d6 100644 --- a/juniper_codegen/src/graphql_union/mod.rs +++ b/juniper_codegen/src/graphql_union/mod.rs @@ -504,9 +504,7 @@ impl ToTokens for UnionDefinition { let mut where_async = where_clause .cloned() .unwrap_or_else(|| parse_quote! { where }); - where_async - .predicates - .push(parse_quote! { Self: Send + Sync }); + where_async.predicates.push(parse_quote! { Self: Sync }); if self.scalar.is_none() { where_async .predicates diff --git a/juniper_codegen/src/impl_scalar.rs b/juniper_codegen/src/impl_scalar.rs index 8f647e48..e9ee8cd5 100644 --- a/juniper_codegen/src/impl_scalar.rs +++ b/juniper_codegen/src/impl_scalar.rs @@ -247,10 +247,10 @@ pub fn build_scalar( let _async = quote!( impl#async_generic_type_decl ::juniper::GraphQLValueAsync<#async_generic_type> for #impl_for_type where + Self: Sync, + Self::TypeInfo: Sync, + Self::Context: Sync, #async_generic_type: ::juniper::ScalarValue + Send + Sync, - Self: Send + Sync, - Self::Context: Send + Sync, - Self::TypeInfo: Send + Sync, { fn resolve_async<'a>( &'a self, diff --git a/juniper_codegen/src/util/mod.rs b/juniper_codegen/src/util/mod.rs index 893ea23d..f35e4395 100644 --- a/juniper_codegen/src/util/mod.rs +++ b/juniper_codegen/src/util/mod.rs @@ -930,7 +930,7 @@ impl GraphQLTypeDefiniton { where_async .predicates .push(parse_quote!( #scalar: Send + Sync )); - where_async.predicates.push(parse_quote!(Self: Send + Sync)); + where_async.predicates.push(parse_quote!(Self: Sync)); // FIXME: add where clause for interfaces. @@ -1431,7 +1431,7 @@ impl GraphQLTypeDefiniton { where_async .predicates .push(parse_quote!( #scalar: Send + Sync )); - where_async.predicates.push(parse_quote!(Self: Send + Sync)); + where_async.predicates.push(parse_quote!(Self: Sync)); let _async = quote!( impl#impl_generics ::juniper::GraphQLValueAsync<#scalar> for #ty @@ -1683,7 +1683,7 @@ impl GraphQLTypeDefiniton { where_async .predicates .push(parse_quote!( #scalar: Send + Sync )); - where_async.predicates.push(parse_quote!(Self: Send + Sync)); + where_async.predicates.push(parse_quote!(Self: Sync)); let async_type = quote!( impl#impl_generics ::juniper::GraphQLValueAsync<#scalar> for #ty #type_generics_tokens diff --git a/juniper_hyper/examples/hyper_server.rs b/juniper_hyper/examples/hyper_server.rs index 0d638d8e..6da13ed5 100644 --- a/juniper_hyper/examples/hyper_server.rs +++ b/juniper_hyper/examples/hyper_server.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use hyper::{ service::{make_service_fn, service_fn}, Body, Method, Response, Server, StatusCode, @@ -6,7 +8,6 @@ use juniper::{ tests::{model::Database, schema::Query}, EmptyMutation, EmptySubscription, RootNode, }; -use std::sync::Arc; #[tokio::main] async fn main() { diff --git a/juniper_hyper/src/lib.rs b/juniper_hyper/src/lib.rs index 5d6d90a1..014fd806 100644 --- a/juniper_hyper/src/lib.rs +++ b/juniper_hyper/src/lib.rs @@ -21,14 +21,14 @@ pub async fn graphql_sync( req: Request, ) -> Result, hyper::Error> where - S: ScalarValue + Send + Sync + 'static, - CtxT: Send + Sync + 'static, - QueryT: GraphQLType + Send + Sync + 'static, - MutationT: GraphQLType + Send + Sync + 'static, - SubscriptionT: GraphQLType + Send + Sync + 'static, - QueryT::TypeInfo: Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, + QueryT: GraphQLType, + QueryT::TypeInfo: Sync, + MutationT: GraphQLType, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLType, + SubscriptionT::TypeInfo: Sync, + CtxT: Sync, + S: ScalarValue + Send + Sync, { Ok(match parse_req(req).await { Ok(req) => execute_request_sync(root_node, context, req).await, @@ -42,14 +42,14 @@ pub async fn graphql( req: Request, ) -> Result, hyper::Error> where - S: ScalarValue + Send + Sync + 'static, - CtxT: Send + Sync + 'static, - QueryT: GraphQLTypeAsync + Send + Sync + 'static, - MutationT: GraphQLTypeAsync + Send + Sync + 'static, - SubscriptionT: GraphQLSubscriptionType + Send + Sync, - QueryT::TypeInfo: Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLSubscriptionType, + SubscriptionT::TypeInfo: Sync, + CtxT: Sync, + S: ScalarValue + Send + Sync, { Ok(match parse_req(req).await { Ok(req) => execute_request(root_node, context, req).await, @@ -158,14 +158,14 @@ async fn execute_request_sync( request: GraphQLBatchRequest, ) -> Response where - S: ScalarValue + Send + Sync + 'static, - CtxT: Send + Sync + 'static, - QueryT: GraphQLType + Send + Sync + 'static, - MutationT: GraphQLType + Send + Sync + 'static, - SubscriptionT: GraphQLType + Send + Sync + 'static, - QueryT::TypeInfo: Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, + QueryT: GraphQLType, + QueryT::TypeInfo: Sync, + MutationT: GraphQLType, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLType, + SubscriptionT::TypeInfo: Sync, + CtxT: Sync, + S: ScalarValue + Send + Sync, { let res = request.execute_sync(&*root_node, &context); let body = Body::from(serde_json::to_string_pretty(&res).unwrap()); @@ -189,14 +189,14 @@ async fn execute_request( request: GraphQLBatchRequest, ) -> Response where - S: ScalarValue + Send + Sync + 'static, - CtxT: Send + Sync + 'static, - QueryT: GraphQLTypeAsync + Send + Sync + 'static, - MutationT: GraphQLTypeAsync + Send + Sync + 'static, - SubscriptionT: GraphQLSubscriptionType + Send + Sync, - QueryT::TypeInfo: Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLSubscriptionType, + SubscriptionT::TypeInfo: Sync, + CtxT: Sync, + S: ScalarValue + Send + Sync, { let res = request.execute(&*root_node, &context).await; let body = Body::from(serde_json::to_string_pretty(&res).unwrap()); diff --git a/juniper_rocket/src/lib.rs b/juniper_rocket/src/lib.rs index e6d29ca2..dcac0e09 100644 --- a/juniper_rocket/src/lib.rs +++ b/juniper_rocket/src/lib.rs @@ -41,6 +41,10 @@ Check the LICENSE file for details. use std::io::{Cursor, Read}; +use juniper::{ + http::{self, GraphQLBatchRequest}, + DefaultScalarValue, FieldError, GraphQLType, InputValue, RootNode, ScalarValue, +}; use rocket::{ data::{FromDataSimple, Outcome as FromDataOutcome}, http::{ContentType, RawStr, Status}, @@ -51,12 +55,6 @@ use rocket::{ Request, }; -use juniper::{http, InputValue}; - -use juniper::{ - http::GraphQLBatchRequest, DefaultScalarValue, FieldError, GraphQLType, RootNode, ScalarValue, -}; - /// Simple wrapper around an incoming GraphQL request /// /// See the `http` module for more information. This type can be constructed @@ -422,7 +420,11 @@ mod fromform_tests { #[cfg(test)] mod tests { - + use juniper::{ + http::tests as http_tests, + tests::{model::Database, schema::Query}, + EmptyMutation, EmptySubscription, RootNode, + }; use rocket::{ self, get, http::ContentType, @@ -432,12 +434,6 @@ mod tests { routes, Rocket, State, }; - use juniper::{ - http::tests as http_tests, - tests::{model::Database, schema::Query}, - EmptyMutation, EmptySubscription, RootNode, - }; - type Schema = RootNode<'static, Query, EmptyMutation, EmptySubscription>; #[get("/?")] diff --git a/juniper_rocket_async/examples/rocket_server.rs b/juniper_rocket_async/examples/rocket_server.rs index 734d901c..a2720fc0 100644 --- a/juniper_rocket_async/examples/rocket_server.rs +++ b/juniper_rocket_async/examples/rocket_server.rs @@ -1,11 +1,10 @@ #![feature(decl_macro, proc_macro_hygiene)] -use rocket::{response::content, State}; - use juniper::{ tests::{model::Database, schema::Query}, EmptyMutation, EmptySubscription, RootNode, }; +use rocket::{response::content, State}; type Schema = RootNode<'static, Query, EmptyMutation, EmptySubscription>; diff --git a/juniper_rocket_async/src/lib.rs b/juniper_rocket_async/src/lib.rs index 9c91640b..41cd12d4 100644 --- a/juniper_rocket_async/src/lib.rs +++ b/juniper_rocket_async/src/lib.rs @@ -119,13 +119,13 @@ where context: &CtxT, ) -> GraphQLResponse where - QueryT: GraphQLTypeAsync + Send + Sync, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + Sync, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync, + QueryT: GraphQLTypeAsync, + QueryT::TypeInfo: Sync, + MutationT: GraphQLTypeAsync, + MutationT::TypeInfo: Sync, + SubscriptionT: GraphQLSubscriptionType, + SubscriptionT::TypeInfo: Sync, + CtxT: Sync, S: Send + Sync, { let response = self.0.execute(root_node, context).await; @@ -204,7 +204,7 @@ impl GraphQLResponse { impl<'f, S> FromForm<'f> for GraphQLRequest where - S: ScalarValue + Send + Sync, + S: ScalarValue, { type Error = String; @@ -275,7 +275,7 @@ where impl<'v, S> FromFormValue<'v> for GraphQLRequest where - S: ScalarValue + Send + Sync, + S: ScalarValue, { type Error = String; @@ -290,7 +290,7 @@ const BODY_LIMIT: u64 = 1024 * 100; impl FromDataSimple for GraphQLRequest where - S: ScalarValue + Send + Sync, + S: ScalarValue, { type Error = String; diff --git a/juniper_subscriptions/src/lib.rs b/juniper_subscriptions/src/lib.rs index 89a32f45..756a4f87 100644 --- a/juniper_subscriptions/src/lib.rs +++ b/juniper_subscriptions/src/lib.rs @@ -11,9 +11,13 @@ #![deny(warnings)] #![doc(html_root_url = "https://docs.rs/juniper_subscriptions/0.14.2")] -use std::{iter::FromIterator, pin::Pin}; +use std::{ + iter::FromIterator, + pin::Pin, + task::{self, Poll}, +}; -use futures::{task::Poll, Stream}; +use futures::{future, stream, FutureExt as _, Stream, StreamExt as _, TryFutureExt as _}; use juniper::{ http::{GraphQLRequest, GraphQLResponse}, BoxFuture, ExecutionError, GraphQLError, GraphQLSubscriptionType, GraphQLTypeAsync, Object, @@ -25,14 +29,14 @@ use juniper::{ /// - handles subscription start pub struct Coordinator<'a, QueryT, MutationT, SubscriptionT, CtxT, S> where - S: ScalarValue + Send + Sync, - QueryT: GraphQLTypeAsync + Send + Sync, + QueryT: GraphQLTypeAsync + Send, QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, + MutationT: GraphQLTypeAsync + Send, MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + Sync, + SubscriptionT: GraphQLSubscriptionType + Send, SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync, + CtxT: Sync, + S: ScalarValue + Send + Sync, { root_node: juniper::RootNode<'a, QueryT, MutationT, SubscriptionT, S>, } @@ -40,14 +44,14 @@ where impl<'a, QueryT, MutationT, SubscriptionT, CtxT, S> Coordinator<'a, QueryT, MutationT, SubscriptionT, CtxT, S> where - S: ScalarValue + Send + Sync, - QueryT: GraphQLTypeAsync + Send + Sync, + QueryT: GraphQLTypeAsync + Send, QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, + MutationT: GraphQLTypeAsync + Send, MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + Sync, + SubscriptionT: GraphQLSubscriptionType + Send, SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync, + CtxT: Sync, + S: ScalarValue + Send + Sync, { /// Builds new [`Coordinator`] with specified `root_node` pub fn new(root_node: juniper::RootNode<'a, QueryT, MutationT, SubscriptionT, S>) -> Self { @@ -58,14 +62,14 @@ where impl<'a, QueryT, MutationT, SubscriptionT, CtxT, S> SubscriptionCoordinator<'a, CtxT, S> for Coordinator<'a, QueryT, MutationT, SubscriptionT, CtxT, S> where - S: ScalarValue + Send + Sync + 'a, - QueryT: GraphQLTypeAsync + Send + Sync, + QueryT: GraphQLTypeAsync + Send, QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + Sync, + MutationT: GraphQLTypeAsync + Send, MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + Sync, + SubscriptionT: GraphQLSubscriptionType + Send, SubscriptionT::TypeInfo: Send + Sync, - CtxT: Send + Sync, + CtxT: Sync, + S: ScalarValue + Send + Sync + 'a, { type Connection = Connection<'a, S>; @@ -76,13 +80,9 @@ where req: &'a GraphQLRequest, context: &'a CtxT, ) -> BoxFuture<'a, Result> { - let rn = &self.root_node; - - Box::pin(async move { - let (stream, errors) = juniper::http::resolve_into_stream(req, rn, context).await?; - - Ok(Connection::from_stream(stream, errors)) - }) + juniper::http::resolve_into_stream(req, &self.root_node, context) + .map_ok(|(stream, errors)| Connection::from_stream(stream, errors)) + .boxed() } } @@ -98,7 +98,7 @@ where /// [`Value::Object`] - waits while each field of the [`Object`] is returned, then yields the whole object /// `Value::Object>` - returns [`Value::Null`] if [`Value::Object`] consists of sub-objects pub struct Connection<'a, S> { - stream: Pin> + Send + 'a>>, + stream: Pin> + Send + 'a>>, } impl<'a, S> Connection<'a, S> @@ -118,16 +118,13 @@ impl<'a, S> SubscriptionConnection<'a, S> for Connection<'a, S> where { } -impl<'a, S> futures::Stream for Connection<'a, S> +impl<'a, S> Stream for Connection<'a, S> where S: ScalarValue + Send + Sync + 'a, { type Item = GraphQLResponse<'a, S>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut futures::task::Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { // this is safe as stream is only mutated here and is not moved anywhere let Connection { stream } = unsafe { self.get_unchecked_mut() }; let stream = unsafe { Pin::new_unchecked(stream) }; @@ -146,22 +143,20 @@ where fn whole_responses_stream<'a, S>( stream: Value>, errors: Vec>, -) -> Pin> + Send + 'a>> +) -> Pin> + Send + 'a>> where S: ScalarValue + Send + Sync + 'a, { - use futures::stream::{self, StreamExt as _}; - if !errors.is_empty() { - return Box::pin(stream::once(async move { - GraphQLResponse::from_result(Ok((Value::Null, errors))) - })); + return Box::pin(stream::once(future::ready(GraphQLResponse::from_result( + Ok((Value::Null, errors)), + )))); } match stream { - Value::Null => Box::pin(stream::once(async move { - GraphQLResponse::from_result(Ok((Value::Null, vec![]))) - })), + Value::Null => Box::pin(stream::once(future::ready(GraphQLResponse::from_result( + Ok((Value::Null, vec![])), + )))), Value::Scalar(s) => Box::pin(s.map(|res| match res { Ok(val) => GraphQLResponse::from_result(Ok((val, vec![]))), Err(err) => GraphQLResponse::from_result(Ok((Value::Null, vec![err]))), @@ -176,9 +171,9 @@ where Value::Object(mut object) => { let obj_len = object.field_count(); if obj_len == 0 { - return Box::pin(stream::once(async move { - GraphQLResponse::from_result(Ok((Value::Null, vec![]))) - })); + return Box::pin(stream::once(future::ready(GraphQLResponse::from_result( + Ok((Value::Null, vec![])), + )))); } let mut filled_count = 0; @@ -187,66 +182,65 @@ where ready_vec.push(None); } - let stream = - futures::stream::poll_fn(move |mut ctx| -> Poll>> { - let mut obj_iterator = object.iter_mut(); + let stream = stream::poll_fn(move |mut ctx| -> Poll>> { + let mut obj_iterator = object.iter_mut(); - // Due to having to modify `ready_vec` contents (by-move pattern) - // and only being able to iterate over `object`'s mutable references (by-ref pattern) - // `ready_vec` and `object` cannot be iterated simultaneously. - // TODO: iterate over i and (ref field_name, ref val) once - // [this RFC](https://github.com/rust-lang/rust/issues/68354) - // is implemented - for ready in ready_vec.iter_mut().take(obj_len) { - let (field_name, val) = match obj_iterator.next() { - Some(v) => v, - None => break, - }; + // Due to having to modify `ready_vec` contents (by-move pattern) + // and only being able to iterate over `object`'s mutable references (by-ref pattern) + // `ready_vec` and `object` cannot be iterated simultaneously. + // TODO: iterate over i and (ref field_name, ref val) once + // [this RFC](https://github.com/rust-lang/rust/issues/68354) + // is implemented + for ready in ready_vec.iter_mut().take(obj_len) { + let (field_name, val) = match obj_iterator.next() { + Some(v) => v, + None => break, + }; - if ready.is_some() { - continue; - } + if ready.is_some() { + continue; + } - match val { - Value::Scalar(stream) => { - match Pin::new(stream).poll_next(&mut ctx) { - Poll::Ready(None) => return Poll::Ready(None), - Poll::Ready(Some(value)) => { - *ready = Some((field_name.clone(), value)); - filled_count += 1; - } - Poll::Pending => { /* check back later */ } + match val { + Value::Scalar(stream) => { + match Pin::new(stream).poll_next(&mut ctx) { + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(value)) => { + *ready = Some((field_name.clone(), value)); + filled_count += 1; } - } - _ => { - // For now only `Object` is supported - *ready = Some((field_name.clone(), Ok(Value::Null))); - filled_count += 1; + Poll::Pending => { /* check back later */ } } } + _ => { + // For now only `Object` is supported + *ready = Some((field_name.clone(), Ok(Value::Null))); + filled_count += 1; + } } + } - if filled_count == obj_len { - filled_count = 0; - let new_vec = (0..obj_len).map(|_| None).collect::>(); - let ready_vec = std::mem::replace(&mut ready_vec, new_vec); - let ready_vec_iterator = ready_vec.into_iter().map(|el| { - let (name, val) = el.unwrap(); - if let Ok(value) = val { - (name, value) - } else { - (name, Value::Null) - } - }); - let obj = Object::from_iter(ready_vec_iterator); - Poll::Ready(Some(GraphQLResponse::from_result(Ok(( - Value::Object(obj), - vec![], - ))))) - } else { - Poll::Pending - } - }); + if filled_count == obj_len { + filled_count = 0; + let new_vec = (0..obj_len).map(|_| None).collect::>(); + let ready_vec = std::mem::replace(&mut ready_vec, new_vec); + let ready_vec_iterator = ready_vec.into_iter().map(|el| { + let (name, val) = el.unwrap(); + if let Ok(value) = val { + (name, value) + } else { + (name, Value::Null) + } + }); + let obj = Object::from_iter(ready_vec_iterator); + Poll::Ready(Some(GraphQLResponse::from_result(Ok(( + Value::Object(obj), + vec![], + ))))) + } else { + Poll::Pending + } + }); Box::pin(stream) } @@ -255,10 +249,11 @@ where #[cfg(test)] mod whole_responses_stream { - use super::*; use futures::{stream, StreamExt as _}; use juniper::{DefaultScalarValue, ExecutionError, FieldError}; + use super::*; + #[tokio::test] async fn with_error() { let expected = vec![GraphQLResponse::::error( diff --git a/juniper_warp/examples/warp_server.rs b/juniper_warp/examples/warp_server.rs index 7939c6ee..af70fedc 100644 --- a/juniper_warp/examples/warp_server.rs +++ b/juniper_warp/examples/warp_server.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -extern crate log; +use std::env; use juniper::{ tests::{model::Database, schema::Query}, @@ -20,7 +20,7 @@ fn schema() -> Schema { #[tokio::main] async fn main() { - ::std::env::set_var("RUST_LOG", "warp_server"); + env::set_var("RUST_LOG", "warp_server"); env_logger::init(); let log = warp::log("warp_server"); diff --git a/juniper_warp/src/lib.rs b/juniper_warp/src/lib.rs index d7c4a932..eea18fef 100644 --- a/juniper_warp/src/lib.rs +++ b/juniper_warp/src/lib.rs @@ -111,25 +111,25 @@ use warp::{body, filters::BoxedFilter, header, http, query, Filter}; /// .and(warp::post()) /// .and(graphql_filter); /// ``` -pub fn make_graphql_filter( +pub fn make_graphql_filter( schema: juniper::RootNode<'static, Query, Mutation, Subscription, S>, - context_extractor: BoxedFilter<(Context,)>, + context_extractor: BoxedFilter<(CtxT,)>, ) -> BoxedFilter<(http::Response>,)> where - S: ScalarValue + Send + Sync + 'static, - Context: Send + Sync + 'static, - Query: juniper::GraphQLTypeAsync + Send + Sync + 'static, + Query: juniper::GraphQLTypeAsync + Send + 'static, Query::TypeInfo: Send + Sync, - Mutation: juniper::GraphQLTypeAsync + Send + Sync + 'static, + Mutation: juniper::GraphQLTypeAsync + Send + 'static, Mutation::TypeInfo: Send + Sync, - Subscription: juniper::GraphQLSubscriptionType + Send + Sync + 'static, + Subscription: juniper::GraphQLSubscriptionType + Send + 'static, Subscription::TypeInfo: Send + Sync, + CtxT: Send + Sync + 'static, + S: ScalarValue + Send + Sync + 'static, { let schema = Arc::new(schema); let post_json_schema = schema.clone(); let post_graphql_schema = schema.clone(); - let handle_post_json_request = move |context: Context, req: GraphQLBatchRequest| { + let handle_post_json_request = move |context: CtxT, req: GraphQLBatchRequest| { let schema = post_json_schema.clone(); async move { let resp = req.execute(&schema, &context).await; @@ -150,7 +150,7 @@ where .and(body::json()) .and_then(handle_post_json_request); - let handle_post_graphql_request = move |context: Context, body: Bytes| { + let handle_post_graphql_request = move |context: CtxT, body: Bytes| { let schema = post_graphql_schema.clone(); async move { let query = str::from_utf8(body.as_ref()).map_err(|e| { @@ -173,7 +173,7 @@ where .and(body::bytes()) .and_then(handle_post_graphql_request); - let handle_get_request = move |context: Context, mut qry: HashMap| { + let handle_get_request = move |context: CtxT, mut qry: HashMap| { let schema = schema.clone(); async move { let req = GraphQLRequest::new( @@ -206,22 +206,22 @@ where } /// Make a synchronous filter for graphql endpoint. -pub fn make_graphql_filter_sync( +pub fn make_graphql_filter_sync( schema: juniper::RootNode<'static, Query, Mutation, Subscription, S>, - context_extractor: BoxedFilter<(Context,)>, + context_extractor: BoxedFilter<(CtxT,)>, ) -> BoxedFilter<(http::Response>,)> where + Query: juniper::GraphQLType + Send + Sync + 'static, + Mutation: juniper::GraphQLType + Send + Sync + 'static, + Subscription: juniper::GraphQLType + Send + Sync + 'static, + CtxT: Send + Sync + 'static, S: ScalarValue + Send + Sync + 'static, - Context: Send + Sync + 'static, - Query: juniper::GraphQLType + Send + Sync + 'static, - Mutation: juniper::GraphQLType + Send + Sync + 'static, - Subscription: juniper::GraphQLType + Send + Sync + 'static, { let schema = Arc::new(schema); let post_json_schema = schema.clone(); let post_graphql_schema = schema.clone(); - let handle_post_json_request = move |context: Context, req: GraphQLBatchRequest| { + let handle_post_json_request = move |context: CtxT, req: GraphQLBatchRequest| { let schema = post_json_schema.clone(); async move { let res = task::spawn_blocking(move || { @@ -243,7 +243,7 @@ where .and(body::json()) .and_then(handle_post_json_request); - let handle_post_graphql_request = move |context: Context, body: Bytes| { + let handle_post_graphql_request = move |context: CtxT, body: Bytes| { let schema = post_graphql_schema.clone(); async move { let res = task::spawn_blocking(move || { @@ -270,7 +270,7 @@ where .and(body::bytes()) .and_then(handle_post_graphql_request); - let handle_get_request = move |context: Context, mut qry: HashMap| { + let handle_get_request = move |context: CtxT, mut qry: HashMap| { let schema = schema.clone(); async move { let res = task::spawn_blocking(move || { @@ -430,21 +430,20 @@ pub mod subscriptions { /// - execute subscription and return values from stream /// - stop stream and close ws connection #[allow(dead_code)] - pub fn graphql_subscriptions( + pub fn graphql_subscriptions( websocket: warp::ws::WebSocket, - coordinator: Arc>, - context: Context, + coordinator: Arc>, + context: CtxT, ) -> impl Future> + Send where - S: ScalarValue + Send + Sync + 'static, - Context: Clone + Send + Sync + 'static, - Query: juniper::GraphQLTypeAsync + Send + Sync + 'static, + Query: juniper::GraphQLTypeAsync + Send + 'static, Query::TypeInfo: Send + Sync, - Mutation: juniper::GraphQLTypeAsync + Send + Sync + 'static, + Mutation: juniper::GraphQLTypeAsync + Send + 'static, Mutation::TypeInfo: Send + Sync, - Subscription: - juniper::GraphQLSubscriptionType + Send + Sync + 'static, + Subscription: juniper::GraphQLSubscriptionType + Send + 'static, Subscription::TypeInfo: Send + Sync, + CtxT: Clone + Send + Sync + 'static, + S: ScalarValue + Send + Sync + 'static, { let (sink_tx, sink_rx) = websocket.split(); let (ws_tx, ws_rx) = mpsc::unbounded();