diff --git a/Cargo.toml b/Cargo.toml index 705cd8f1..35fbad11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "juniper_codegen", "juniper", "integration_tests/juniper_tests", + "integration_tests/async_await", "juniper_hyper", "juniper_iron", "juniper_rocket", diff --git a/integration_tests/async_await/Cargo.toml b/integration_tests/async_await/Cargo.toml new file mode 100644 index 00000000..3bbc0436 --- /dev/null +++ b/integration_tests/async_await/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "async_await" +version = "0.1.0" +authors = ["Christoph Herzog "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +juniper = { path = "../../juniper", features = ["async"] } +futures-preview = "0.3.0-alpha.18" +tokio = "0.2.0-alpha.2" diff --git a/integration_tests/async_await/src/main.rs b/integration_tests/async_await/src/main.rs new file mode 100644 index 00000000..cbdfc4e3 --- /dev/null +++ b/integration_tests/async_await/src/main.rs @@ -0,0 +1,126 @@ +#![feature(async_await, async_closure)] + +use juniper::{graphql_value, RootNode, Value}; + +#[derive(juniper::GraphQLEnum)] +enum UserKind { + Admin, + User, + Guest, +} + +struct User { + id: u64, + name: String, + kind: UserKind, +} + +#[juniper::object] +impl User { + async fn name(&self) -> &str { + &self.name + } + + async fn friends(&self) -> Vec { + let friends = (0..10) + .map(|index| User { + id: index, + name: format!("user{}", index), + kind: UserKind::User, + }) + .collect(); + friends + } + + async fn kind(&self) -> &UserKind { + &self.kind + } + + async fn delayed() -> bool { + let when = tokio::clock::now() + std::time::Duration::from_millis(100); + tokio::timer::Delay::new(when).await; + true + } +} + +struct Query; + +#[juniper::object] +impl Query { + fn field_sync(&self) -> &'static str { + "field_sync" + } + + async fn field_async_plain() -> String { + "field_async_plain".to_string() + } + + fn user(id: String) -> User { + User { + id: 1, + name: id, + kind: UserKind::User, + } + } + + async fn delayed() -> bool { + let when = tokio::clock::now() + std::time::Duration::from_millis(100); + tokio::timer::Delay::new(when).await; + true + } +} + +struct Mutation; + +#[juniper::object] +impl Mutation {} + +fn run(f: impl std::future::Future) -> O { + tokio::runtime::current_thread::Runtime::new() + .unwrap() + .block_on(f) +} + +#[test] +fn async_simple() { + let schema = RootNode::new(Query, Mutation); + let doc = r#" + query { + fieldSync + fieldAsyncPlain + delayed + user(id: "user1") { + kind + name + delayed + } + } + "#; + + let vars = Default::default(); + let f = juniper::execute_async(doc, None, &schema, &vars, &()); + + let (res, errs) = run(f).unwrap(); + + assert!(errs.is_empty()); + + let mut obj = res.into_object().unwrap(); + obj.sort_by_field(); + let value = Value::Object(obj); + + assert_eq!( + value, + graphql_value!({ + "delayed": true, + "fieldAsyncPlain": "field_async_plain", + "fieldSync": "field_sync", + "user": { + "delayed": true, + "kind": "USER", + "name": "user1", + }, + }), + ); +} + +fn main() {} diff --git a/juniper/Cargo.toml b/juniper/Cargo.toml index b8176c46..7a57180d 100644 --- a/juniper/Cargo.toml +++ b/juniper/Cargo.toml @@ -24,6 +24,7 @@ harness = false path = "benches/bench.rs" [features] +async = ["juniper_codegen/async", "futures-preview"] expose-test-schema = [] default = [ "chrono", @@ -44,6 +45,9 @@ serde_json = { version="1.0.2", optional = true } url = { version = "2", optional = true } uuid = { version = "0.7", optional = true } +futures-preview = { version = "0.3.0-alpha.18", optional = true, features = ["nightly", "async-await"] } + [dev-dependencies] bencher = "0.1.2" serde_json = { version = "1.0.2" } +tokio = "0.2.0-alpha.2" diff --git a/juniper/src/executor/mod.rs b/juniper/src/executor/mod.rs index d7489393..8b96b9ac 100644 --- a/juniper/src/executor/mod.rs +++ b/juniper/src/executor/mod.rs @@ -372,6 +372,37 @@ where Ok(value.resolve(info, self.current_selection_set, self)) } + /// Resolve a single arbitrary value into an `ExecutionResult` + #[cfg(feature = "async")] + pub async fn resolve_async(&self, info: &T::TypeInfo, value: &T) -> ExecutionResult + where + T: crate::GraphQLTypeAsync + Send + Sync, + T::TypeInfo: Send + Sync, + CtxT: Send + Sync, + S: Send + Sync, + { + Ok(value + .resolve_async(info, self.current_selection_set, self) + .await) + } + + /// Resolve a single arbitrary value, mapping the context to a new type + #[cfg(feature = "async")] + pub async fn resolve_with_ctx_async( + &self, + info: &T::TypeInfo, + value: &T, + ) -> ExecutionResult + where + T: crate::GraphQLTypeAsync + Send + Sync, + T::TypeInfo: Send + Sync, + S: Send + Sync, + NewCtxT: FromContext + Send + Sync, + { + let e = self.replaced_context(>::from(self.context)); + e.resolve_async(info, value).await + } + /// Resolve a single arbitrary value into a return value /// /// If the field fails to resolve, `null` will be returned. @@ -388,6 +419,26 @@ where } } + /// Resolve a single arbitrary value into a return value + /// + /// If the field fails to resolve, `null` will be returned. + #[cfg(feature = "async")] + pub async fn resolve_into_value_async(&self, info: &T::TypeInfo, value: &T) -> Value + where + T: crate::GraphQLTypeAsync + Send + Sync, + T::TypeInfo: Send + Sync, + CtxT: Send + Sync, + S: Send + Sync, + { + match self.resolve_async(info, value).await { + Ok(v) => v, + Err(e) => { + self.push_error(e); + Value::null() + } + } + } + /// Derive a new executor by replacing the context /// /// This can be used to connect different types, e.g. from different Rust @@ -480,7 +531,7 @@ where } #[doc(hidden)] - pub fn fragment_by_name(&self, name: &str) -> Option<&'a Fragment> { + pub fn fragment_by_name(&'a self, name: &str) -> Option<&'a Fragment<'a, S>> { self.fragments.get(name).cloned() } @@ -720,6 +771,121 @@ where Ok((value, errors)) } +#[cfg(feature = "async")] +pub async fn execute_validated_query_async<'a, QueryT, MutationT, CtxT, S>( + document: Document<'a, S>, + operation_name: Option<&str>, + root_node: &RootNode<'a, QueryT, MutationT, S>, + variables: &Variables, + context: &CtxT, +) -> Result<(Value, Vec>), GraphQLError<'a>> +where + S: ScalarValue + Send + Sync, + QueryT: crate::GraphQLTypeAsync + Send + Sync, + QueryT::TypeInfo: Send + Sync, + MutationT: crate::GraphQLTypeAsync + Send + Sync, + MutationT::TypeInfo: Send + Sync, + CtxT: Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + let mut fragments = vec![]; + let mut operation = None; + + for def in document { + match def { + Definition::Operation(op) => { + if operation_name.is_none() && operation.is_some() { + return Err(GraphQLError::MultipleOperationsProvided); + } + + let move_op = operation_name.is_none() + || op.item.name.as_ref().map(|s| s.item) == operation_name; + + if move_op { + operation = Some(op); + } + } + Definition::Fragment(f) => fragments.push(f), + }; + } + + let op = match operation { + Some(op) => op, + None => return Err(GraphQLError::UnknownOperationName), + }; + + let default_variable_values = op.item.variable_definitions.map(|defs| { + defs.item + .items + .iter() + .filter_map(|&(ref name, ref def)| { + def.default_value + .as_ref() + .map(|i| (name.item.to_owned(), i.item.clone())) + }) + .collect::>>() + }); + + let errors = RwLock::new(Vec::new()); + let value; + + { + let mut all_vars; + let mut final_vars = variables; + + if let Some(defaults) = default_variable_values { + all_vars = variables.clone(); + + for (name, value) in defaults { + all_vars.entry(name).or_insert(value); + } + + final_vars = &all_vars; + } + + let root_type = match op.item.operation_type { + OperationType::Query => root_node.schema.query_type(), + OperationType::Mutation => root_node + .schema + .mutation_type() + .expect("No mutation type found"), + }; + + let executor = Executor { + fragments: &fragments + .iter() + .map(|f| (f.item.name.item, &f.item)) + .collect(), + variables: final_vars, + current_selection_set: Some(&op.item.selection_set[..]), + parent_selection_set: None, + current_type: root_type, + schema: &root_node.schema, + context, + errors: &errors, + field_path: FieldPath::Root(op.start), + }; + + value = match op.item.operation_type { + OperationType::Query => { + executor + .resolve_into_value_async(&root_node.query_info, &root_node) + .await + } + OperationType::Mutation => { + executor + .resolve_into_value_async(&root_node.mutation_info, &root_node.mutation_type) + .await + } + }; + } + + let mut errors = errors.into_inner().unwrap(); + errors.sort(); + + Ok((value, errors)) +} + impl<'r, S> Registry<'r, S> where S: ScalarValue + 'r, diff --git a/juniper/src/executor_tests/async_await/mod.rs b/juniper/src/executor_tests/async_await/mod.rs new file mode 100644 index 00000000..cd4771c5 --- /dev/null +++ b/juniper/src/executor_tests/async_await/mod.rs @@ -0,0 +1,120 @@ +use crate::{RootNode, Value}; + +#[derive(crate::GraphQLEnumInternal)] +enum UserKind { + Admin, + User, + Guest, +} + +struct User { + id: u64, + name: String, + kind: UserKind, +} + +#[crate::object_internal] +impl User { + async fn name(&self) -> &str { + &self.name + } + + async fn friends(&self) -> Vec { + let friends = (0..10) + .map(|index| User { + id: index, + name: format!("user{}", index), + kind: UserKind::User, + }) + .collect(); + friends + } + + async fn kind(&self) -> &UserKind { + &self.kind + } + + async fn delayed() -> bool { + let when = tokio::clock::now() + std::time::Duration::from_millis(100); + tokio::timer::Delay::new(when).await; + true + } +} + +struct Query; + +#[crate::object_internal] +impl Query { + fn field_sync(&self) -> &'static str { + "field_sync" + } + + async fn field_async_plain() -> String { + "field_async_plain".to_string() + } + + fn user(id: String) -> User { + User { + id: 1, + name: id, + kind: UserKind::User, + } + } + + async fn delayed() -> bool { + let when = tokio::clock::now() + std::time::Duration::from_millis(100); + tokio::timer::Delay::new(when).await; + true + } +} + +struct Mutation; + +#[crate::object_internal] +impl Mutation {} + +fn run(f: impl std::future::Future) -> O { + tokio::runtime::current_thread::Runtime::new() + .unwrap() + .block_on(f) +} + +#[test] +fn async_simple() { + let schema = RootNode::new(Query, Mutation); + let doc = r#" + query { + fieldSync + fieldAsyncPlain + delayed + user(id: "user1") { + name + } + } + "#; + + let vars = Default::default(); + let f = crate::execute_async(doc, None, &schema, &vars, &()); + + let (res, errs) = run(f).unwrap(); + + assert!(errs.is_empty()); + + let mut obj = res.into_object().unwrap(); + obj.sort_by_field(); + let value = Value::Object(obj); + + assert_eq!( + value, + crate::graphql_value!({ + "delayed": true, + "fieldAsyncPlain": "field_async_plain", + "fieldSync": "field_sync", + "user": { + "kind": "USER", + // "name": "user1", + // "delayed": true, + }, + }), + ); +} diff --git a/juniper/src/executor_tests/mod.rs b/juniper/src/executor_tests/mod.rs index 01097618..aefac036 100644 --- a/juniper/src/executor_tests/mod.rs +++ b/juniper/src/executor_tests/mod.rs @@ -1,6 +1,12 @@ mod directives; mod enums; mod executor; -mod interfaces_unions; mod introspection; mod variables; + +// FIXME: re-enable +#[cfg(not(feature = "async"))] +mod interfaces_unions; + +#[cfg(feature = "async")] +mod async_await; diff --git a/juniper/src/lib.rs b/juniper/src/lib.rs index 434d7763..95ce987b 100644 --- a/juniper/src/lib.rs +++ b/juniper/src/lib.rs @@ -90,6 +90,7 @@ Juniper has not reached 1.0 yet, thus some API instability should be expected. */ #![doc(html_root_url = "https://docs.rs/juniper/0.14.0")] #![warn(missing_docs)] +#![cfg_attr(feature = "async", feature(async_await, async_closure))] #[doc(hidden)] pub extern crate serde; @@ -150,7 +151,6 @@ mod executor_tests; pub use crate::util::to_camel_case; use crate::{ - executor::execute_validated_query, introspection::{INTROSPECTION_QUERY, INTROSPECTION_QUERY_WITHOUT_DESCRIPTIONS}, parser::{parse_document_source, ParseError, Spanning}, validation::{validate_input_values, visit_all_rules, ValidatorContext}, @@ -176,6 +176,9 @@ pub use crate::{ }, }; +#[cfg(feature = "async")] +pub use crate::types::async_await::GraphQLTypeAsync; + /// An error that prevented query execution #[derive(Debug, PartialEq)] #[allow(missing_docs)] @@ -221,7 +224,48 @@ where } } - execute_validated_query(document, operation_name, root_node, variables, context) + executor::execute_validated_query(document, operation_name, root_node, variables, context) +} + +/// Execute a query in a provided schema +#[cfg(feature = "async")] +pub async fn execute_async<'a, S, CtxT, QueryT, MutationT>( + document_source: &'a str, + operation_name: Option<&str>, + root_node: &'a RootNode<'a, QueryT, MutationT, S>, + variables: &Variables, + context: &CtxT, +) -> Result<(Value, Vec>), GraphQLError<'a>> +where + S: ScalarValue + Send + Sync, + QueryT: GraphQLTypeAsync + Send + Sync, + QueryT::TypeInfo: Send + Sync, + MutationT: GraphQLTypeAsync + Send + Sync, + MutationT::TypeInfo: Send + Sync, + CtxT: Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + let document = parse_document_source(document_source, &root_node.schema)?; + { + let errors = validate_input_values(variables, &document, &root_node.schema); + + if !errors.is_empty() { + return Err(GraphQLError::ValidationError(errors)); + } + } + + { + let mut ctx = ValidatorContext::new(&root_node.schema, &document); + visit_all_rules(&mut ctx, &document); + + let errors = ctx.into_errors(); + if !errors.is_empty() { + return Err(GraphQLError::ValidationError(errors)); + } + } + + executor::execute_validated_query_async(document, operation_name, root_node, variables, context) + .await } /// Execute the reference introspection query in the provided schema diff --git a/juniper/src/macros/common.rs b/juniper/src/macros/common.rs index 416686a2..9a6ddef3 100644 --- a/juniper/src/macros/common.rs +++ b/juniper/src/macros/common.rs @@ -7,29 +7,73 @@ macro_rules! __juniper_impl_trait { } ) => { impl<$($other,)*> $crate::$impl_trait<$crate::DefaultScalarValue> for $name { - $($body)+ + $($body)* } }; ( - impl< <$generic:tt $(: $bound: tt)*> $(, $other: tt)* > $impl_trait:tt for $name:ty { + impl< < DefaultScalarValue > $(, $other: tt)* > $impl_trait:tt for $name:ty + where ( $($where:tt)* ) + { $($body:tt)+ } + ) => { + impl<$($other,)*> $crate::$impl_trait<$crate::DefaultScalarValue> for $name + where $($where)* + { + $($body)* + } + }; + + ( + impl< <$generic:tt $(: $bound: tt)*> $(, $other: tt)* > $impl_trait:tt for $name:ty { + $($body:tt)* + } ) => { impl<$($other,)* $generic $(: $bound)*> $crate::$impl_trait<$generic> for $name where $generic: $crate::ScalarValue, for<'__b> &'__b $generic: $crate::ScalarRefValue<'__b>, { - $($body)+ + $($body)* } }; + ( + impl< <$generic:tt $(: $bound: tt)*> $(, $other: tt)* > $impl_trait:tt for $name:ty + where ( $($where:tt)* ) + { + $($body:tt)* + } + ) => { + impl<$($other,)* $generic $(: $bound)*> $crate::$impl_trait<$generic> for $name + where + $($where)* + $generic: $crate::ScalarValue, + for<'__b> &'__b $generic: $crate::ScalarRefValue<'__b>, + { + $($body)* + } + }; + ( impl<$scalar:ty $(, $other: tt )*> $impl_trait:tt for $name:ty { - $($body:tt)+ + $($body:tt)* } ) => { impl<$($other, )*> $crate::$impl_trait<$scalar> for $name { - $($body)+ + $($body)* + } + }; + ( + impl<$scalar:ty $(, $other: tt )*> $impl_trait:tt for $name:ty + where ( $($where:tt)* ) + { + $($body:tt)* + } + ) => { + impl<$($other, )*> $crate::$impl_trait<$scalar> for $name + where $($where)* + { + $($body)* } }; } @@ -52,6 +96,25 @@ macro_rules! __juniper_insert_generic { }; } +// TODO: remove me. +#[doc(hidden)] +#[macro_export] +macro_rules! __juniper_extract_generic { + (<$name:ident>) => { + $name + }; + ( + <$generic:tt $(: $bound: tt)*> + ) => { + $generic + }; + ( + $scalar: ty + ) => { + $scalar + }; +} + #[doc(hidden)] #[macro_export] macro_rules! __juniper_parse_object_header { diff --git a/juniper/src/macros/scalar.rs b/juniper/src/macros/scalar.rs index 56537072..50753f3b 100644 --- a/juniper/src/macros/scalar.rs +++ b/juniper/src/macros/scalar.rs @@ -45,9 +45,12 @@ In addition to implementing `GraphQLType` for the type in question, usable as arguments and default values. */ + +#[cfg(not(feature = "async"))] #[macro_export] macro_rules! graphql_scalar { ( @as_expr $e:expr) => { $e }; + ( @generate, meta = { @@ -341,3 +344,328 @@ macro_rules! graphql_scalar { ); } } + +// FIXME: prevent duplicating the whole macro for async. +#[cfg(feature = "async")] +#[macro_export] +macro_rules! graphql_scalar { + ( @as_expr $e:expr) => { $e }; + + ( + @generate, + meta = { + name = $name:ty, + outname = {$($outname:tt)+}, + scalar = {$($scalar:tt)+}, + $(description = $descr:tt,)* + }, + resolve = { + self_var = $resolve_self_var:ident, + body = $resolve_body: block, + return_type = $resolve_retun_type: ty, + }, + from_input_value = { + arg = $from_input_value_arg: ident, + result = $from_input_value_result: ty, + body = $from_input_value_body: block, + }, + from_str = { + value_arg = $from_str_arg: ident, + result = $from_str_result: ty, + body = $from_str_body: block, + lifetime = $from_str_lt: tt, + }, + + ) => { + $crate::__juniper_impl_trait!( + impl <$($scalar)+> GraphQLType for $name { + type Context = (); + type TypeInfo = (); + + fn name(_: &Self::TypeInfo) -> Option<&str> { + Some($crate::graphql_scalar!(@as_expr $($outname)+)) + } + + fn meta<'r>( + info: &Self::TypeInfo, + registry: &mut $crate::Registry<'r, $crate::__juniper_insert_generic!($($scalar)+)> + ) -> $crate::meta::MetaType<'r, $crate::__juniper_insert_generic!($($scalar)+)> + where for<'__b> &'__b $crate::__juniper_insert_generic!($($scalar)+): $crate::ScalarRefValue<'__b>, + $crate::__juniper_insert_generic!($($scalar)+): 'r + { + let meta = registry.build_scalar_type::(info); + $( + let meta = meta.description($descr); + )* + meta.into_meta() + } + + fn resolve( + &$resolve_self_var, + _: &(), + _: Option<&[$crate::Selection<$crate::__juniper_insert_generic!($($scalar)+)>]>, + _: &$crate::Executor< + Self::Context, + $crate::__juniper_insert_generic!($($scalar)+) + >) -> $crate::Value<$crate::__juniper_insert_generic!($($scalar)+)> { + $resolve_body + } + } + ); + + $crate::__juniper_impl_trait!( + impl <$($scalar)+> GraphQLTypeAsync for $name + where ( + $crate::__juniper_insert_generic!($($scalar)+): Send + Sync, + Self: $crate::GraphQLType<$crate::__juniper_insert_generic!($($scalar)+)> + Send + Sync, + Self::Context: Send + Sync, + Self::TypeInfo: Send + Sync, + ) + { + + fn resolve_async<'a>( + &'a self, + info: &'a Self::TypeInfo, + selection_set: Option<&'a [$crate::Selection<$crate::__juniper_insert_generic!($($scalar)+)>]>, + executor: &'a $crate::Executor, + ) -> futures::future::BoxFuture<'a, $crate::Value<$crate::__juniper_insert_generic!($($scalar)+)>> { + use $crate::GraphQLType; + use futures::future; + let v = self.resolve(info, selection_set, executor); + future::FutureExt::boxed(future::ready(v)) + } + } + ); + + $crate::__juniper_impl_trait!( + impl<$($scalar)+> ToInputValue for $name { + fn to_input_value(&$resolve_self_var) -> $crate::InputValue<$crate::__juniper_insert_generic!($($scalar)+)> { + let v = $resolve_body; + $crate::ToInputValue::to_input_value(&v) + } + } + ); + + $crate::__juniper_impl_trait!( + impl<$($scalar)+> FromInputValue for $name { + fn from_input_value( + $from_input_value_arg: &$crate::InputValue<$crate::__juniper_insert_generic!($($scalar)+)> + ) -> $from_input_value_result { + $from_input_value_body + } + } + ); + + $crate::__juniper_impl_trait!( + impl<$($scalar)+> ParseScalarValue for $name { + fn from_str<$from_str_lt>($from_str_arg: $crate::parser::ScalarToken<$from_str_lt>) -> $from_str_result { + $from_str_body + } + } + ); + }; + + // No more items to parse + ( + @parse_functions, + meta = { + name = $name:ty, + outname = {$($outname:tt)+}, + scalar = {$($scalar:tt)+}, + $(description = $descr:tt,)* + }, + resolve = {$($resolve_body:tt)+}, + from_input_value = {$($from_input_value_body:tt)+}, + from_str = {$($from_str_body:tt)+}, + rest = + ) => { + $crate::graphql_scalar!( + @generate, + meta = { + name = $name, + outname = {$($outname)+}, + scalar = {$($scalar)+}, + $(description = $descr,)* + }, + resolve = {$($resolve_body)+}, + from_input_value = {$($from_input_value_body)+}, + from_str = {$($from_str_body)+}, + ); + }; + + ( + @parse_functions, + meta = { + name = $name:ty, + outname = {$($outname:tt)+}, + scalar = {$($scalar:tt)+}, + $(description = $descr:tt,)* + }, + $(from_input_value = {$($from_input_value_body:tt)+})*, + $(from_str = {$($from_str_body:tt)+})*, + rest = + ) => { + compile_error!("Missing resolve function"); + }; + + ( + @parse_functions, + meta = { + name = $name:ty, + outname = {$($outname:tt)+}, + scalar = {$($scalar:tt)+}, + $(description = $descr:tt,)* + }, + resolve = {$($resolve_body:tt)+}, + $(from_str = {$($from_str_body:tt)+})*, + rest = + ) => { + compile_error!("Missing from_input_value function"); + }; + + ( + @parse_functions, + meta = { + name = $name:ty, + outname = {$($outname:tt)+}, + scalar = {$($scalar:tt)+}, + $(description = $descr:tt,)* + }, + resolve = {$($resolve_body:tt)+}, + from_input_value = {$($from_input_value_body:tt)+}, + rest = + ) =>{ + compile_error!("Missing from_str function"); + }; + + + // resolve(&self) -> Value { ... } + ( + @parse_functions, + meta = {$($meta:tt)*}, + $(resolve = {$($resolve_body:tt)+},)* + $(from_input_value = {$($from_input_value_body:tt)+},)* + $(from_str = {$($from_str_body:tt)+},)* + rest = resolve(&$selfvar:ident) -> $return_ty:ty $body:block $($rest:tt)* + ) => { + $crate::graphql_scalar!( + @parse_functions, + meta = {$($meta)*}, + resolve = { + self_var = $selfvar, + body = $body, + return_type = $return_ty, + }, + $(from_input_value = {$($from_input_value_body)+},)* + $(from_str = {$($from_str_body)+},)* + rest = $($rest)* + ); + }; + + // from_input_value(arg: &InputValue) -> ... { ... } + ( + @parse_functions, + meta = { $($meta:tt)* }, + $(resolve = {$($resolve_body:tt)+})*, + $(from_input_value = {$($from_input_value_body:tt)+},)* + $(from_str = {$($from_str_body:tt)+},)* + rest = from_input_value($arg:ident: &InputValue) -> $result:ty $body:block $($rest:tt)* + ) => { + $crate::graphql_scalar!( + @parse_functions, + meta = { $($meta)* }, + $(resolve = {$($resolve_body)+},)* + from_input_value = { + arg = $arg, + result = $result, + body = $body, + }, + $(from_str = {$($from_str_body)+},)* + rest = $($rest)* + ); + }; + + // from_str(value: &str) -> Result + ( + @parse_functions, + meta = { $($meta:tt)* }, + $(resolve = {$($resolve_body:tt)+},)* + $(from_input_value = {$($from_input_value_body:tt)+},)* + $(from_str = {$($from_str_body:tt)+},)* + rest = from_str<$from_str_lt: tt>($value_arg:ident: ScalarToken<$ignored_lt2: tt>) -> $result:ty $body:block $($rest:tt)* + ) => { + $crate::graphql_scalar!( + @parse_functions, + meta = { $($meta)* }, + $(resolve = {$($resolve_body)+},)* + $(from_input_value = {$($from_input_value_body)+},)* + from_str = { + value_arg = $value_arg, + result = $result, + body = $body, + lifetime = $from_str_lt, + }, + rest = $($rest)* + ); + }; + + // description: + ( + @parse_functions, + meta = { + name = $name:ty, + outname = {$($outname:tt)+}, + scalar = {$($scalar:tt)+}, + }, + $(resolve = {$($resolve_body:tt)+},)* + $(from_input_value = {$($from_input_value_body:tt)+},)* + $(from_str = {$($from_str_body:tt)+},)* + rest = description: $descr:tt $($rest:tt)* + ) => { + $crate::graphql_scalar!( + @parse_functions, + meta = { + name = $name, + outname = {$($outname)+}, + scalar = {$($scalar)+}, + description = $descr, + }, + $(resolve = {$($resolve_body)+},)* + $(from_input_value = {$($from_input_value_body)+},)* + $(from_str = {$($from_str_body)+},)* + rest = $($rest)* + ); + }; + + ( + @parse, + meta = { + lifetimes = [], + name = $name: ty, + outname = {$($outname:tt)*}, + scalar = {$($scalar:tt)*}, + }, + rest = $($rest:tt)* + ) => { + $crate::graphql_scalar!( + @parse_functions, + meta = { + name = $name, + outname = {$($outname)*}, + scalar = {$($scalar)*}, + }, + rest = $($rest)* + ); + }; + + (@$($stuff:tt)*) => { + compile_error!("Invalid syntax for `graphql_scalar!`"); + }; + + ($($rest:tt)*) => { + $crate::__juniper_parse_object_header!( + callback = graphql_scalar, + rest = $($rest)* + ); + } +} diff --git a/juniper/src/schema/schema.rs b/juniper/src/schema/schema.rs index a273fa7e..49b6940d 100644 --- a/juniper/src/schema/schema.rs +++ b/juniper/src/schema/schema.rs @@ -76,10 +76,44 @@ where } } +#[cfg(feature = "async")] +impl<'a, CtxT, S, QueryT, MutationT> crate::GraphQLTypeAsync + for RootNode<'a, QueryT, MutationT, S> +where + S: ScalarValue + Send + Sync, + QueryT: crate::GraphQLTypeAsync, + QueryT::TypeInfo: Send + Sync, + MutationT: crate::GraphQLTypeAsync, + MutationT::TypeInfo: Send + Sync, + CtxT: Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + fn resolve_field_async<'b>( + &'b self, + info: &'b Self::TypeInfo, + field_name: &'b str, + arguments: &'b Arguments, + executor: &'b Executor, + ) -> futures::future::BoxFuture<'b, ExecutionResult> { + use futures::future::{ready, FutureExt}; + match field_name { + "__schema" | "__type" => { + let v = self.resolve_field(info, field_name, arguments, executor); + ready(v).boxed() + } + _ => self + .query_type + .resolve_field_async(info, field_name, arguments, executor), + } + } +} + #[crate::object_internal( name = "__Schema" Context = SchemaType<'a, S>, Scalar = S, + // FIXME: make this redundant. + noasync, )] impl<'a, S> SchemaType<'a, S> where @@ -117,6 +151,8 @@ where name = "__Type" Context = SchemaType<'a, S>, Scalar = S, + // FIXME: make this redundant. + noasync, )] impl<'a, S> TypeType<'a, S> where @@ -248,6 +284,8 @@ where name = "__Field", Context = SchemaType<'a, S>, Scalar = S, + // FIXME: make this redundant. + noasync, )] impl<'a, S> Field<'a, S> where @@ -285,6 +323,8 @@ where name = "__InputValue", Context = SchemaType<'a, S>, Scalar = S, + // FIXME: make this redundant. + noasync, )] impl<'a, S> Argument<'a, S> where @@ -311,6 +351,8 @@ where #[crate::object_internal( name = "__EnumValue", Scalar = S, + // FIXME: make this redundant. + noasync, )] impl<'a, S> EnumValue where @@ -337,6 +379,8 @@ where name = "__Directive", Context = SchemaType<'a, S>, Scalar = S, + // FIXME: make this redundant. + noasync, )] impl<'a, S> DirectiveType<'a, S> where diff --git a/juniper/src/types/async_await.rs b/juniper/src/types/async_await.rs new file mode 100644 index 00000000..05fb502c --- /dev/null +++ b/juniper/src/types/async_await.rs @@ -0,0 +1,281 @@ +use futures::future::BoxFuture; + +use crate::ast::{Directive, FromInputValue, InputValue, Selection}; +use crate::value::{Object, ScalarRefValue, ScalarValue, Value}; + +use crate::executor::{ExecutionResult, Executor}; +use crate::parser::Spanning; + +use super::base::{is_excluded, merge_key_into, Arguments, GraphQLType}; + +pub trait GraphQLTypeAsync: GraphQLType + Send + Sync +where + Self::Context: Send + Sync, + Self::TypeInfo: Send + Sync, + S: ScalarValue + Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + fn resolve_field_async<'a>( + &'a self, + info: &'a Self::TypeInfo, + field_name: &'a str, + arguments: &'a Arguments, + executor: &'a Executor, + ) -> futures::future::BoxFuture<'a, ExecutionResult> { + panic!("resolve_field must be implemented by object types"); + } + + fn resolve_async<'a>( + &'a self, + info: &'a Self::TypeInfo, + selection_set: Option<&'a [Selection]>, + executor: &'a Executor, + ) -> futures::future::BoxFuture<'a, Value> { + if let Some(selection_set) = selection_set { + resolve_selection_set_into_async(self, info, selection_set, executor) + } else { + panic!("resolve() must be implemented by non-object output types"); + } + } +} + +// Wrapper function around resolve_selection_set_into_async_recursive. +// This wrapper is necessary because async fns can not be recursive. +#[cfg(feature = "async")] +pub(crate) fn resolve_selection_set_into_async<'a, 'e, T, CtxT, S>( + instance: &'a T, + info: &'a T::TypeInfo, + selection_set: &'e [Selection<'e, S>], + executor: &'e Executor<'e, CtxT, S>, +) -> futures::future::BoxFuture<'a, Value> +where + T: GraphQLTypeAsync, + T::TypeInfo: Send + Sync, + S: ScalarValue + Send + Sync, + CtxT: Send + Sync, + 'e: 'a, + for<'b> &'b S: ScalarRefValue<'b>, +{ + use futures::future::FutureExt; + + resolve_selection_set_into_async_recursive(instance, info, selection_set, executor).boxed() +} + +struct AsyncField { + name: String, + value: Option>, +} + +enum AsyncValue { + Field(AsyncField), + Nested(Value), +} + +// type ResolveFuture<'a, S> = BoxFuture<'a, AsyncResolve>; + +#[cfg(feature = "async")] +pub(crate) async fn resolve_selection_set_into_async_recursive<'a, T, CtxT, S>( + instance: &'a T, + info: &'a T::TypeInfo, + selection_set: &'a [Selection<'a, S>], + executor: &'a Executor<'a, CtxT, S>, +) -> Value +where + T: GraphQLTypeAsync + Send + Sync, + T::TypeInfo: Send + Sync, + S: ScalarValue + Send + Sync, + CtxT: Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + use futures::{ + future::FutureExt, + stream::{FuturesOrdered, StreamExt}, + }; + + let mut object = Object::with_capacity(selection_set.len()); + + let mut async_values = FuturesOrdered::>>::new(); + + let meta_type = executor + .schema() + .concrete_type_by_name( + T::name(info) + .expect("Resolving named type's selection set") + .as_ref(), + ) + .expect("Type not found in schema"); + + for selection in selection_set { + match *selection { + Selection::Field(Spanning { + item: ref f, + start: ref start_pos, + .. + }) => { + if is_excluded(&f.directives, executor.variables()) { + continue; + } + + let response_name = f.alias.as_ref().unwrap_or(&f.name).item; + + if f.name.item == "__typename" { + object.add_field( + response_name, + Value::scalar(instance.concrete_type_name(executor.context(), info)), + ); + continue; + } + + let meta_field = meta_type.field_by_name(f.name.item).unwrap_or_else(|| { + panic!(format!( + "Field {} not found on type {:?}", + f.name.item, + meta_type.name() + )) + }); + + let exec_vars = executor.variables(); + + let sub_exec = executor.field_sub_executor( + &response_name, + f.name.item, + start_pos.clone(), + f.selection_set.as_ref().map(|v| &v[..]), + ); + let args = Arguments::new( + f.arguments.as_ref().map(|m| { + m.item + .iter() + .map(|&(ref k, ref v)| (k.item, v.item.clone().into_const(exec_vars))) + .collect() + }), + &meta_field.arguments, + ); + + let pos = start_pos.clone(); + let is_non_null = meta_field.field_type.is_non_null(); + + let response_name = response_name.to_string(); + let field_future = async move { + // TODO: implement custom future type instead of + // two-level boxing. + let res = instance + .resolve_field_async(info, f.name.item, &args, &sub_exec) + .await; + + let value = match res { + Ok(Value::Null) if is_non_null => None, + Ok(v) => Some(v), + Err(e) => { + sub_exec.push_error_at(e, pos); + + if is_non_null { + None + } else { + Some(Value::null()) + } + } + }; + AsyncValue::Field(AsyncField { + name: response_name, + value, + }) + }; + async_values.push(field_future.boxed()); + } + Selection::FragmentSpread(Spanning { + item: ref spread, .. + }) => { + if is_excluded(&spread.directives, executor.variables()) { + continue; + } + + // TODO: prevent duplicate boxing. + let f = async move { + let fragment = &executor + .fragment_by_name(spread.name.item) + .expect("Fragment could not be found"); + let value = resolve_selection_set_into_async( + instance, + info, + &fragment.selection_set[..], + executor, + ) + .await; + AsyncValue::Nested(value) + }; + async_values.push(f.boxed()); + } + Selection::InlineFragment(Spanning { + item: ref fragment, + start: ref start_pos, + .. + }) => { + if is_excluded(&fragment.directives, executor.variables()) { + continue; + } + + let sub_exec = executor.type_sub_executor( + fragment.type_condition.as_ref().map(|c| c.item), + Some(&fragment.selection_set[..]), + ); + + if let Some(ref type_condition) = fragment.type_condition { + // FIXME: implement async version. + + let sub_result = instance.resolve_into_type( + info, + type_condition.item, + Some(&fragment.selection_set[..]), + &sub_exec, + ); + + if let Ok(Value::Object(obj)) = sub_result { + for (k, v) in obj { + merge_key_into(&mut object, &k, v); + } + } else if let Err(e) = sub_result { + sub_exec.push_error_at(e, start_pos.clone()); + } + } else { + let f = async move { + let value = resolve_selection_set_into_async( + instance, + info, + &fragment.selection_set[..], + &sub_exec, + ) + .await; + AsyncValue::Nested(value) + }; + async_values.push(f.boxed()); + } + } + } + } + + while let Some(item) = async_values.next().await { + match item { + AsyncValue::Field(AsyncField { name, value }) => { + if let Some(value) = value { + object.add_field(&name, value); + } else { + return Value::null(); + } + } + AsyncValue::Nested(obj) => match obj { + v @ Value::Null => { + return v; + } + Value::Object(obj) => { + for (k, v) in obj { + merge_key_into(&mut object, &k, v); + } + } + _ => unreachable!(), + }, + } + } + + Value::Object(object) +} diff --git a/juniper/src/types/base.rs b/juniper/src/types/base.rs index d17f47a9..4a2e2315 100644 --- a/juniper/src/types/base.rs +++ b/juniper/src/types/base.rs @@ -343,7 +343,7 @@ where } } -pub(crate) fn resolve_selection_set_into( +pub fn resolve_selection_set_into( instance: &T, info: &T::TypeInfo, selection_set: &[Selection], @@ -499,7 +499,10 @@ where true } -fn is_excluded(directives: &Option>>>, vars: &Variables) -> bool +pub(super) fn is_excluded( + directives: &Option>>>, + vars: &Variables, +) -> bool where S: ScalarValue, for<'b> &'b S: ScalarRefValue<'b>, @@ -528,7 +531,7 @@ where false } -fn merge_key_into(result: &mut Object, response_name: &str, value: Value) { +pub(crate) fn merge_key_into(result: &mut Object, response_name: &str, value: Value) { if let Some(&mut (_, ref mut e)) = result .iter_mut() .find(|&&mut (ref key, _)| key == response_name) diff --git a/juniper/src/types/containers.rs b/juniper/src/types/containers.rs index 6a93e631..caa0e453 100644 --- a/juniper/src/types/containers.rs +++ b/juniper/src/types/containers.rs @@ -217,3 +217,106 @@ where Value::list(result) } + +#[cfg(feature = "async")] +async fn resolve_into_list_async<'a, S, T, I>( + executor: &'a Executor<'a, T::Context, S>, + info: &'a T::TypeInfo, + items: I, +) -> Value +where + S: ScalarValue + Send + Sync, + I: Iterator + ExactSizeIterator, + T: crate::GraphQLTypeAsync, + T::TypeInfo: Send + Sync, + T::Context: Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + use futures::stream::{FuturesOrdered, StreamExt}; + use std::iter::FromIterator; + + let stop_on_null = executor + .current_type() + .list_contents() + .expect("Current type is not a list type") + .is_non_null(); + + let iter = + items.map(|item| async move { executor.resolve_into_value_async(info, &item).await }); + let mut futures = FuturesOrdered::from_iter(iter); + + let mut values = Vec::with_capacity(futures.len()); + while let Some(value) = futures.next().await { + if stop_on_null && value.is_null() { + return value; + } + values.push(value); + } + + Value::list(values) +} + +#[cfg(feature = "async")] +impl crate::GraphQLTypeAsync for Vec +where + T: crate::GraphQLTypeAsync, + T::TypeInfo: Send + Sync, + S: ScalarValue + Send + Sync, + CtxT: Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + fn resolve_async<'a>( + &'a self, + info: &'a Self::TypeInfo, + selection_set: Option<&'a [Selection]>, + executor: &'a Executor, + ) -> futures::future::BoxFuture<'a, Value> { + let f = resolve_into_list_async(executor, info, self.iter()); + futures::future::FutureExt::boxed(f) + } +} + +#[cfg(feature = "async")] +impl crate::GraphQLTypeAsync for &[T] +where + T: crate::GraphQLTypeAsync, + T::TypeInfo: Send + Sync, + S: ScalarValue + Send + Sync, + CtxT: Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + fn resolve_async<'a>( + &'a self, + info: &'a Self::TypeInfo, + selection_set: Option<&'a [Selection]>, + executor: &'a Executor, + ) -> futures::future::BoxFuture<'a, Value> { + let f = resolve_into_list_async(executor, info, self.iter()); + futures::future::FutureExt::boxed(f) + } +} + +#[cfg(feature = "async")] +impl crate::GraphQLTypeAsync for Option +where + T: crate::GraphQLTypeAsync, + T::TypeInfo: Send + Sync, + S: ScalarValue + Send + Sync, + CtxT: Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + fn resolve_async<'a>( + &'a self, + info: &'a Self::TypeInfo, + selection_set: Option<&'a [Selection]>, + executor: &'a Executor, + ) -> futures::future::BoxFuture<'a, Value> { + let f = async move { + match *self { + Some(ref obj) => executor.resolve_into_value_async(info, obj).await, + None => Value::null(), + } + }; + futures::future::FutureExt::boxed(f) + } +} diff --git a/juniper/src/types/mod.rs b/juniper/src/types/mod.rs index 3780e184..a394161e 100644 --- a/juniper/src/types/mod.rs +++ b/juniper/src/types/mod.rs @@ -4,3 +4,6 @@ pub mod name; pub mod pointers; pub mod scalars; pub mod utilities; + +#[cfg(feature = "async")] +pub mod async_await; diff --git a/juniper/src/types/pointers.rs b/juniper/src/types/pointers.rs index 12f905ba..118f9fcc 100644 --- a/juniper/src/types/pointers.rs +++ b/juniper/src/types/pointers.rs @@ -85,7 +85,7 @@ where } } -impl<'a, S, T, CtxT> GraphQLType for &'a T +impl<'e, S, T, CtxT> GraphQLType for &'e T where S: ScalarValue, T: GraphQLType, @@ -136,6 +136,35 @@ where } } +#[cfg(feature = "async")] +impl<'e, S, T> crate::GraphQLTypeAsync for &'e T +where + S: ScalarValue + Send + Sync, + T: crate::GraphQLTypeAsync, + T::TypeInfo: Send + Sync, + T::Context: Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + fn resolve_field_async<'b>( + &'b self, + info: &'b Self::TypeInfo, + field_name: &'b str, + arguments: &'b Arguments, + executor: &'b Executor, + ) -> futures::future::BoxFuture<'b, ExecutionResult> { + crate::GraphQLTypeAsync::resolve_field_async(&**self, info, field_name, arguments, executor) + } + + fn resolve_async<'a>( + &'a self, + info: &'a Self::TypeInfo, + selection_set: Option<&'a [Selection]>, + executor: &'a Executor, + ) -> futures::future::BoxFuture<'a, Value> { + crate::GraphQLTypeAsync::resolve_async(&**self, info, selection_set, executor) + } +} + impl<'a, T, S> ToInputValue for &'a T where S: Debug, diff --git a/juniper/src/types/scalars.rs b/juniper/src/types/scalars.rs index 8ed92ea4..dc30c39a 100644 --- a/juniper/src/types/scalars.rs +++ b/juniper/src/types/scalars.rs @@ -196,6 +196,23 @@ where } } +#[cfg(feature = "async")] +impl<'e, S> crate::GraphQLTypeAsync for &'e str +where + S: ScalarValue + Send + Sync, + for<'b> &'b S: ScalarRefValue<'b>, +{ + fn resolve_async<'a>( + &'a self, + info: &'a Self::TypeInfo, + selection_set: Option<&'a [Selection]>, + executor: &'a Executor, + ) -> futures::future::BoxFuture<'a, crate::Value> { + use futures::future; + future::FutureExt::boxed(future::ready(self.resolve(info, selection_set, executor))) + } +} + impl<'a, S> ToInputValue for &'a str where S: ScalarValue, diff --git a/juniper/src/value/mod.rs b/juniper/src/value/mod.rs index 3a61ccc7..6faa8904 100644 --- a/juniper/src/value/mod.rs +++ b/juniper/src/value/mod.rs @@ -120,6 +120,13 @@ where } } + pub fn into_object(self) -> Option> { + match self { + Value::Object(o) => Some(o), + _ => None, + } + } + /// Mutable view into the underlying object value, if present. pub fn as_mut_object_value(&mut self) -> Option<&mut Object> { match *self { diff --git a/juniper/src/value/object.rs b/juniper/src/value/object.rs index 36f8432e..1e8ee25d 100644 --- a/juniper/src/value/object.rs +++ b/juniper/src/value/object.rs @@ -76,6 +76,19 @@ impl Object { .find(|&&(ref k, _)| (k as &str) == key) .map(|&(_, ref value)| value) } + + pub fn sort_by_field(&mut self) { + self.key_value_list + .sort_by(|(key1, _), (key2, _)| key1.cmp(key2)); + for (_, ref mut value) in &mut self.key_value_list { + match value { + Value::Object(ref mut o) => { + o.sort_by_field(); + } + _ => {} + } + } + } } impl IntoIterator for Object { diff --git a/juniper/src/value/scalar.rs b/juniper/src/value/scalar.rs index 78042591..d6a384ce 100644 --- a/juniper/src/value/scalar.rs +++ b/juniper/src/value/scalar.rs @@ -260,6 +260,8 @@ pub enum DefaultScalarValue { Boolean(bool), } +trait S: Send + Sync {} + impl ScalarValue for DefaultScalarValue { type Visitor = DefaultScalarValueVisitor; diff --git a/juniper_codegen/Cargo.toml b/juniper_codegen/Cargo.toml index fe4ed6ae..d8c83e1b 100644 --- a/juniper_codegen/Cargo.toml +++ b/juniper_codegen/Cargo.toml @@ -14,6 +14,9 @@ edition = "2018" [lib] proc-macro = true +[features] +async = [] + [dependencies] proc-macro2 = "1.0.1" syn = { version = "1.0.3", features = ["full", "extra-traits", "parsing"] } diff --git a/juniper_codegen/src/derive_enum.rs b/juniper_codegen/src/derive_enum.rs index 6246d0ff..d1262e7d 100644 --- a/juniper_codegen/src/derive_enum.rs +++ b/juniper_codegen/src/derive_enum.rs @@ -206,9 +206,34 @@ pub fn impl_enum(ast: &syn::DeriveInput, is_internal: bool) -> TokenStream { }); } + #[cfg(feature = "async")] + let _async = quote!( + impl<__S> #juniper_path::GraphQLTypeAsync<__S> for #ident + where + __S: #juniper_path::ScalarValue + Send + Sync, + for<'__b> &'__b __S: #juniper_path::ScalarRefValue<'__b> + { + fn resolve_async<'a>( + &'a self, + info: &'a Self::TypeInfo, + selection_set: Option<&'a [#juniper_path::Selection<__S>]>, + executor: &'a #juniper_path::Executor, + ) -> futures::future::BoxFuture<'a, #juniper_path::Value<__S>> { + use #juniper_path::GraphQLType; + use futures::future; + let v = self.resolve(info, selection_set, executor); + future::FutureExt::boxed(future::ready(v)) + } + } + ); + + #[cfg(not(feature = "async"))] + let _async = quote!(); + let body = quote! { impl<__S> #juniper_path::GraphQLType<__S> for #ident - where __S: #juniper_path::ScalarValue, + where __S: + #juniper_path::ScalarValue, for<'__b> &'__b __S: #juniper_path::ScalarRefValue<'__b> { type Context = (); @@ -261,6 +286,8 @@ pub fn impl_enum(ast: &syn::DeriveInput, is_internal: bool) -> TokenStream { } } } + + #_async }; body } diff --git a/juniper_codegen/src/derive_object.rs b/juniper_codegen/src/derive_object.rs index f76b2505..c6042aa7 100644 --- a/juniper_codegen/src/derive_object.rs +++ b/juniper_codegen/src/derive_object.rs @@ -59,6 +59,7 @@ pub fn build_derive_object(ast: syn::DeriveInput, is_internal: bool) -> TokenStr description: field_attrs.description, deprecation: field_attrs.deprecation, resolver_code, + resolver_code_async: None, }) } }); @@ -74,6 +75,7 @@ pub fn build_derive_object(ast: syn::DeriveInput, is_internal: bool) -> TokenStr interfaces: None, include_type_generics: true, generic_scalar: true, + no_async: attrs.no_async, }; let juniper_crate_name = if is_internal { "crate" } else { "juniper" }; diff --git a/juniper_codegen/src/impl_object.rs b/juniper_codegen/src/impl_object.rs index 12af7f41..992e3570 100644 --- a/juniper_codegen/src/impl_object.rs +++ b/juniper_codegen/src/impl_object.rs @@ -86,6 +86,7 @@ pub fn build_object(args: TokenStream, body: TokenStream, is_internal: bool) -> }, include_type_generics: false, generic_scalar: false, + no_async: impl_attrs.no_async, }; for item in _impl.items { @@ -101,6 +102,8 @@ pub fn build_object(args: TokenStream, body: TokenStream, is_internal: bool) -> } }; + let is_async = method.sig.asyncness.is_some(); + let attrs = match util::FieldAttributes::from_attrs( method.attrs, util::FieldAttributeParseMode::Impl, @@ -195,12 +198,28 @@ pub fn build_object(args: TokenStream, body: TokenStream, is_internal: bool) -> let body = &method.block; let return_ty = &method.sig.output; - let resolver_code = quote!( - (|| #return_ty { - #( #resolve_parts )* - #body - })() - ); + + let (resolver_code, resolver_code_async) = if is_async { + ( + quote!(), + Some(quote!( + (async move || #return_ty { + #( #resolve_parts )* + #body + })() + )), + ) + } else { + ( + quote!( + (|| #return_ty { + #( #resolve_parts )* + #body + })() + ), + None, + ) + }; let ident = &method.sig.ident; let name = attrs @@ -214,6 +233,7 @@ pub fn build_object(args: TokenStream, body: TokenStream, is_internal: bool) -> description: attrs.description, deprecation: attrs.deprecation, resolver_code, + resolver_code_async, }); } _ => { diff --git a/juniper_codegen/src/util.rs b/juniper_codegen/src/util.rs index d11f8788..3c3b7d6c 100644 --- a/juniper_codegen/src/util.rs +++ b/juniper_codegen/src/util.rs @@ -297,6 +297,7 @@ pub struct ObjectAttributes { pub context: Option, pub scalar: Option, pub interfaces: Vec, + pub no_async: bool, } impl syn::parse::Parse for ObjectAttributes { @@ -307,6 +308,7 @@ impl syn::parse::Parse for ObjectAttributes { context: None, scalar: None, interfaces: Vec::new(), + no_async: false, }; while !input.is_empty() { @@ -350,6 +352,10 @@ impl syn::parse::Parse for ObjectAttributes { .into_iter() .collect(); } + // FIXME: make this unneccessary. + "noasync" => { + output.no_async = true; + } other => { return Err(input.error(format!("Unknown attribute: {}", other))); } @@ -591,6 +597,14 @@ pub struct GraphQLTypeDefinitionField { pub deprecation: Option, pub args: Vec, pub resolver_code: proc_macro2::TokenStream, + pub resolver_code_async: Option, +} + +impl GraphQLTypeDefinitionField { + #[inline] + fn is_async(&self) -> bool { + self.resolver_code_async.is_some() + } } /// Definition of a graphql type based on information extracted @@ -618,9 +632,15 @@ pub struct GraphQLTypeDefiniton { // If false, the scalar is only generic if a generic parameter // is specified manually. pub generic_scalar: bool, + // FIXME: make this redundant. + pub no_async: bool, } impl GraphQLTypeDefiniton { + fn has_async_field(&self) -> bool { + self.fields.iter().any(|field| field.is_async()) + } + pub fn into_tokens(self, juniper_crate_name: &str) -> proc_macro2::TokenStream { let juniper_crate_name = syn::parse_str::(juniper_crate_name).unwrap(); @@ -691,21 +711,30 @@ impl GraphQLTypeDefiniton { let name = &field.name; let code = &field.resolver_code; - quote!( - #name => { - let res = { #code }; - #juniper_crate_name::IntoResolvable::into( - res, - executor.context() - ) - .and_then(|res| { - match res { - Some((ctx, r)) => executor.replaced_context(ctx).resolve_with_ctx(&(), &r), - None => Ok(#juniper_crate_name::Value::null()), - } - }) - }, - ) + if field.is_async() { + // TODO: better error message with field/type name. + quote!( + #name => { + panic!("Tried to resolve async field with a sync resolver"); + }, + ) + } else { + quote!( + #name => { + let res = { #code }; + #juniper_crate_name::IntoResolvable::into( + res, + executor.context() + ) + .and_then(|res| { + match res { + Some((ctx, r)) => executor.replaced_context(ctx).resolve_with_ctx(&(), &r), + None => Ok(#juniper_crate_name::Value::null()), + } + }) + }, + ) + } }); let description = self @@ -778,6 +807,114 @@ impl GraphQLTypeDefiniton { }; let (impl_generics, _, where_clause) = generics.split_for_impl(); + #[cfg(feature = "async")] + let resolve_field_async = { + let resolve_matches_async = self.fields.iter().map(|field| { + let name = &field.name; + + if let Some(code) = field.resolver_code_async.as_ref() { + quote!( + #name => { + let f = async move { + let res = { #code }.await; + + let inner_res = #juniper_crate_name::IntoResolvable::into( + res, + executor.context() + ); + match inner_res { + Ok(Some((ctx, r))) => { + let subexec = executor + .replaced_context(ctx); + subexec.resolve_with_ctx_async(&(), &r) + .await + }, + Ok(None) => Ok(#juniper_crate_name::Value::null()), + Err(e) => Err(e), + } + }; + future::FutureExt::boxed(f) + }, + ) + } else { + let code = &field.resolver_code; + + let inner = if !self.no_async { + quote!( + let f = async move { + match res2 { + Ok(Some((ctx, r))) => { + let sub = executor.replaced_context(ctx); + sub.resolve_with_ctx_async(&(), &r).await + }, + Ok(None) => Ok(#juniper_crate_name::Value::null()), + Err(e) => Err(e), + } + }; + future::FutureExt::boxed(f) + ) + } else { + quote!( + let v = match res2 { + Ok(Some((ctx, r))) => executor.replaced_context(ctx).resolve_with_ctx(&(), &r), + Ok(None) => Ok(#juniper_crate_name::Value::null()), + Err(e) => Err(e), + }; + future::FutureExt::boxed(future::ready(v)) + ) + }; + + quote!( + #name => { + let res = { #code }; + let res2 = #juniper_crate_name::IntoResolvable::into( + res, + executor.context() + ); + #inner + }, + ) + } + }); + + let mut where_async = where_clause.cloned().unwrap_or_else(|| parse_quote!(where));; + + where_async + .predicates + .push(parse_quote!( #scalar: Send + Sync )); + where_async.predicates.push(parse_quote!(Self: Send + Sync)); + + // FIXME: add where clause for interfaces. + + quote!( + impl#impl_generics #juniper_crate_name::GraphQLTypeAsync<#scalar> for #ty #type_generics_tokens + #where_async + { + fn resolve_field_async<'b>( + &'b self, + info: &'b Self::TypeInfo, + field: &'b str, + args: &'b #juniper_crate_name::Arguments<#scalar>, + executor: &'b #juniper_crate_name::Executor, + ) -> futures::future::BoxFuture<'b, #juniper_crate_name::ExecutionResult<#scalar>> + where #scalar: Send + Sync, + { + use futures::future; + use #juniper_crate_name::GraphQLType; + match field { + #( #resolve_matches_async )* + _ => { + panic!("Field {} not found on type {}", field, "Mutation"); + } + } + } + } + ) + }; + + #[cfg(not(feature = "async"))] + let resolve_field_async = quote!(); + let output = quote!( impl#impl_generics #juniper_crate_name::GraphQLType<#scalar> for #ty #type_generics_tokens #where_clause @@ -822,11 +959,14 @@ impl GraphQLTypeDefiniton { } } + fn concrete_type_name(&self, _: &Self::Context, _: &Self::TypeInfo) -> String { #name.to_string() } - } + } + + #resolve_field_async ); output } diff --git a/juniper_warp/Cargo.toml b/juniper_warp/Cargo.toml index 75b28bb9..0af899da 100644 --- a/juniper_warp/Cargo.toml +++ b/juniper_warp/Cargo.toml @@ -8,6 +8,9 @@ documentation = "https://docs.rs/juniper_warp" repository = "https://github.com/graphql-rust/juniper" edition = "2018" +[features] +async = [ "juniper/async", "futures03" ] + [dependencies] warp = "0.1.8" juniper = { version = "0.14.0", path = "../juniper", default-features = false } @@ -18,6 +21,8 @@ futures = "0.1.23" serde = "1.0.75" tokio-threadpool = "0.1.7" +futures03 = { version = "0.3.0-alpha.18", optional = true, package = "futures-preview" } + [dev-dependencies] juniper = { version = "0.14.0", path = "../juniper", features = ["expose-test-schema", "serde_json"] } env_logger = "0.5.11" diff --git a/juniper_warp/src/lib.rs b/juniper_warp/src/lib.rs index a9f37d08..e0bda8c2 100644 --- a/juniper_warp/src/lib.rs +++ b/juniper_warp/src/lib.rs @@ -41,11 +41,12 @@ Check the LICENSE file for details. #![doc(html_root_url = "https://docs.rs/juniper_warp/0.2.0")] use futures::{future::poll_fn, Future}; -use juniper::{DefaultScalarValue, InputValue, ScalarRefValue, ScalarValue}; use serde::Deserialize; use std::sync::Arc; use warp::{filters::BoxedFilter, Filter}; +use juniper::{DefaultScalarValue, InputValue, ScalarRefValue, ScalarValue}; + #[derive(Debug, serde_derive::Deserialize, PartialEq)] #[serde(untagged)] #[serde(bound = "InputValue: Deserialize<'de>")] @@ -240,6 +241,80 @@ where get_filter.or(post_filter).unify().boxed() } +/// FIXME: docs +pub fn make_graphql_filter_async( + schema: juniper::RootNode<'static, Query, Mutation, S>, + context_extractor: BoxedFilter<(Context,)>, +) -> BoxedFilter<(warp::http::Response>,)> +where + S: ScalarValue + Send + Sync + 'static, + for<'b> &'b S: ScalarRefValue<'b>, + Context: Send + Sync + 'static, + Query: juniper::GraphQLTypeAsync + Send + Sync + 'static, + Query::TypeInfo: Send + Sync, + Mutation: juniper::GraphQLTypeAsync + Send + Sync + 'static, + Mutation::TypeInfo: Send + Sync, +{ + let schema = Arc::new(schema); + let post_schema = schema.clone(); + + let handle_post_request = + move |context: Context, request: GraphQLBatchRequest| -> Response { + let schema = post_schema.clone(); + Box::new( + poll_fn(move || { + tokio_threadpool::blocking(|| { + let response = request.execute(&schema, &context); + Ok((serde_json::to_vec(&response)?, response.is_ok())) + }) + }) + .and_then(|result| ::futures::future::done(Ok(build_response(result)))) + .map_err(|e: tokio_threadpool::BlockingError| warp::reject::custom(e)), + ) + }; + + let post_filter = warp::post2() + .and(context_extractor.clone()) + .and(warp::body::json()) + .and_then(handle_post_request); + + let handle_get_request = move |context: Context, + mut request: std::collections::HashMap| + -> Response { + let schema = schema.clone(); + Box::new( + poll_fn(move || { + tokio_threadpool::blocking(|| { + let variables = match request.remove("variables") { + None => None, + Some(vs) => serde_json::from_str(&vs)?, + }; + + let graphql_request = juniper::http::GraphQLRequest::new( + request.remove("query").ok_or_else(|| { + failure::format_err!("Missing GraphQL query string in query parameters") + })?, + request.get("operation_name").map(|s| s.to_owned()), + variables, + ); + + let response = graphql_request.execute(&schema, &context); + Ok((serde_json::to_vec(&response)?, response.is_ok())) + }) + }) + .and_then(|result| ::futures::future::done(Ok(build_response(result)))) + .map_err(|e: tokio_threadpool::BlockingError| warp::reject::custom(e)), + ) + }; + + let get_filter = warp::get2() + .and(context_extractor.clone()) + .and(warp::filters::query::query()) + .and_then(handle_get_request); + + get_filter.or(post_filter).unify().boxed() +} + fn build_response( response: Result<(Vec, bool), failure::Error>, ) -> warp::http::Response> {