WIP async/await implementation

This commit is contained in:
Christoph Herzog 2019-08-18 21:36:44 +02:00 committed by Christian Legnitto
parent 61c0543523
commit 56a4f2558a
27 changed files with 1681 additions and 37 deletions

View file

@ -4,6 +4,7 @@ members = [
"juniper_codegen",
"juniper",
"integration_tests/juniper_tests",
"integration_tests/async_await",
"juniper_hyper",
"juniper_iron",
"juniper_rocket",

View file

@ -0,0 +1,12 @@
[package]
name = "async_await"
version = "0.1.0"
authors = ["Christoph Herzog <chris@theduke.at>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
juniper = { path = "../../juniper", features = ["async"] }
futures-preview = "0.3.0-alpha.18"
tokio = "0.2.0-alpha.2"

View file

@ -0,0 +1,126 @@
#![feature(async_await, async_closure)]
use juniper::{graphql_value, RootNode, Value};
#[derive(juniper::GraphQLEnum)]
enum UserKind {
Admin,
User,
Guest,
}
struct User {
id: u64,
name: String,
kind: UserKind,
}
#[juniper::object]
impl User {
async fn name(&self) -> &str {
&self.name
}
async fn friends(&self) -> Vec<User> {
let friends = (0..10)
.map(|index| User {
id: index,
name: format!("user{}", index),
kind: UserKind::User,
})
.collect();
friends
}
async fn kind(&self) -> &UserKind {
&self.kind
}
async fn delayed() -> bool {
let when = tokio::clock::now() + std::time::Duration::from_millis(100);
tokio::timer::Delay::new(when).await;
true
}
}
struct Query;
#[juniper::object]
impl Query {
fn field_sync(&self) -> &'static str {
"field_sync"
}
async fn field_async_plain() -> String {
"field_async_plain".to_string()
}
fn user(id: String) -> User {
User {
id: 1,
name: id,
kind: UserKind::User,
}
}
async fn delayed() -> bool {
let when = tokio::clock::now() + std::time::Duration::from_millis(100);
tokio::timer::Delay::new(when).await;
true
}
}
struct Mutation;
#[juniper::object]
impl Mutation {}
fn run<O>(f: impl std::future::Future<Output = O>) -> O {
tokio::runtime::current_thread::Runtime::new()
.unwrap()
.block_on(f)
}
#[test]
fn async_simple() {
let schema = RootNode::new(Query, Mutation);
let doc = r#"
query {
fieldSync
fieldAsyncPlain
delayed
user(id: "user1") {
kind
name
delayed
}
}
"#;
let vars = Default::default();
let f = juniper::execute_async(doc, None, &schema, &vars, &());
let (res, errs) = run(f).unwrap();
assert!(errs.is_empty());
let mut obj = res.into_object().unwrap();
obj.sort_by_field();
let value = Value::Object(obj);
assert_eq!(
value,
graphql_value!({
"delayed": true,
"fieldAsyncPlain": "field_async_plain",
"fieldSync": "field_sync",
"user": {
"delayed": true,
"kind": "USER",
"name": "user1",
},
}),
);
}
fn main() {}

View file

@ -24,6 +24,7 @@ harness = false
path = "benches/bench.rs"
[features]
async = ["juniper_codegen/async", "futures-preview"]
expose-test-schema = []
default = [
"chrono",
@ -44,6 +45,9 @@ serde_json = { version="1.0.2", optional = true }
url = { version = "2", optional = true }
uuid = { version = "0.7", optional = true }
futures-preview = { version = "0.3.0-alpha.18", optional = true, features = ["nightly", "async-await"] }
[dev-dependencies]
bencher = "0.1.2"
serde_json = { version = "1.0.2" }
tokio = "0.2.0-alpha.2"

View file

@ -372,6 +372,37 @@ where
Ok(value.resolve(info, self.current_selection_set, self))
}
/// Resolve a single arbitrary value into an `ExecutionResult`
#[cfg(feature = "async")]
pub async fn resolve_async<T>(&self, info: &T::TypeInfo, value: &T) -> ExecutionResult<S>
where
T: crate::GraphQLTypeAsync<S, Context = CtxT> + Send + Sync,
T::TypeInfo: Send + Sync,
CtxT: Send + Sync,
S: Send + Sync,
{
Ok(value
.resolve_async(info, self.current_selection_set, self)
.await)
}
/// Resolve a single arbitrary value, mapping the context to a new type
#[cfg(feature = "async")]
pub async fn resolve_with_ctx_async<NewCtxT, T>(
&self,
info: &T::TypeInfo,
value: &T,
) -> ExecutionResult<S>
where
T: crate::GraphQLTypeAsync<S, Context = NewCtxT> + Send + Sync,
T::TypeInfo: Send + Sync,
S: Send + Sync,
NewCtxT: FromContext<CtxT> + Send + Sync,
{
let e = self.replaced_context(<NewCtxT as FromContext<CtxT>>::from(self.context));
e.resolve_async(info, value).await
}
/// Resolve a single arbitrary value into a return value
///
/// If the field fails to resolve, `null` will be returned.
@ -388,6 +419,26 @@ where
}
}
/// Resolve a single arbitrary value into a return value
///
/// If the field fails to resolve, `null` will be returned.
#[cfg(feature = "async")]
pub async fn resolve_into_value_async<T>(&self, info: &T::TypeInfo, value: &T) -> Value<S>
where
T: crate::GraphQLTypeAsync<S, Context = CtxT> + Send + Sync,
T::TypeInfo: Send + Sync,
CtxT: Send + Sync,
S: Send + Sync,
{
match self.resolve_async(info, value).await {
Ok(v) => v,
Err(e) => {
self.push_error(e);
Value::null()
}
}
}
/// Derive a new executor by replacing the context
///
/// This can be used to connect different types, e.g. from different Rust
@ -480,7 +531,7 @@ where
}
#[doc(hidden)]
pub fn fragment_by_name(&self, name: &str) -> Option<&'a Fragment<S>> {
pub fn fragment_by_name(&'a self, name: &str) -> Option<&'a Fragment<'a, S>> {
self.fragments.get(name).cloned()
}
@ -720,6 +771,121 @@ where
Ok((value, errors))
}
#[cfg(feature = "async")]
pub async fn execute_validated_query_async<'a, QueryT, MutationT, CtxT, S>(
document: Document<'a, S>,
operation_name: Option<&str>,
root_node: &RootNode<'a, QueryT, MutationT, S>,
variables: &Variables<S>,
context: &CtxT,
) -> Result<(Value<S>, Vec<ExecutionError<S>>), GraphQLError<'a>>
where
S: ScalarValue + Send + Sync,
QueryT: crate::GraphQLTypeAsync<S, Context = CtxT> + Send + Sync,
QueryT::TypeInfo: Send + Sync,
MutationT: crate::GraphQLTypeAsync<S, Context = CtxT> + Send + Sync,
MutationT::TypeInfo: Send + Sync,
CtxT: Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
let mut fragments = vec![];
let mut operation = None;
for def in document {
match def {
Definition::Operation(op) => {
if operation_name.is_none() && operation.is_some() {
return Err(GraphQLError::MultipleOperationsProvided);
}
let move_op = operation_name.is_none()
|| op.item.name.as_ref().map(|s| s.item) == operation_name;
if move_op {
operation = Some(op);
}
}
Definition::Fragment(f) => fragments.push(f),
};
}
let op = match operation {
Some(op) => op,
None => return Err(GraphQLError::UnknownOperationName),
};
let default_variable_values = op.item.variable_definitions.map(|defs| {
defs.item
.items
.iter()
.filter_map(|&(ref name, ref def)| {
def.default_value
.as_ref()
.map(|i| (name.item.to_owned(), i.item.clone()))
})
.collect::<HashMap<String, InputValue<S>>>()
});
let errors = RwLock::new(Vec::new());
let value;
{
let mut all_vars;
let mut final_vars = variables;
if let Some(defaults) = default_variable_values {
all_vars = variables.clone();
for (name, value) in defaults {
all_vars.entry(name).or_insert(value);
}
final_vars = &all_vars;
}
let root_type = match op.item.operation_type {
OperationType::Query => root_node.schema.query_type(),
OperationType::Mutation => root_node
.schema
.mutation_type()
.expect("No mutation type found"),
};
let executor = Executor {
fragments: &fragments
.iter()
.map(|f| (f.item.name.item, &f.item))
.collect(),
variables: final_vars,
current_selection_set: Some(&op.item.selection_set[..]),
parent_selection_set: None,
current_type: root_type,
schema: &root_node.schema,
context,
errors: &errors,
field_path: FieldPath::Root(op.start),
};
value = match op.item.operation_type {
OperationType::Query => {
executor
.resolve_into_value_async(&root_node.query_info, &root_node)
.await
}
OperationType::Mutation => {
executor
.resolve_into_value_async(&root_node.mutation_info, &root_node.mutation_type)
.await
}
};
}
let mut errors = errors.into_inner().unwrap();
errors.sort();
Ok((value, errors))
}
impl<'r, S> Registry<'r, S>
where
S: ScalarValue + 'r,

View file

@ -0,0 +1,120 @@
use crate::{RootNode, Value};
#[derive(crate::GraphQLEnumInternal)]
enum UserKind {
Admin,
User,
Guest,
}
struct User {
id: u64,
name: String,
kind: UserKind,
}
#[crate::object_internal]
impl User {
async fn name(&self) -> &str {
&self.name
}
async fn friends(&self) -> Vec<User> {
let friends = (0..10)
.map(|index| User {
id: index,
name: format!("user{}", index),
kind: UserKind::User,
})
.collect();
friends
}
async fn kind(&self) -> &UserKind {
&self.kind
}
async fn delayed() -> bool {
let when = tokio::clock::now() + std::time::Duration::from_millis(100);
tokio::timer::Delay::new(when).await;
true
}
}
struct Query;
#[crate::object_internal]
impl Query {
fn field_sync(&self) -> &'static str {
"field_sync"
}
async fn field_async_plain() -> String {
"field_async_plain".to_string()
}
fn user(id: String) -> User {
User {
id: 1,
name: id,
kind: UserKind::User,
}
}
async fn delayed() -> bool {
let when = tokio::clock::now() + std::time::Duration::from_millis(100);
tokio::timer::Delay::new(when).await;
true
}
}
struct Mutation;
#[crate::object_internal]
impl Mutation {}
fn run<O>(f: impl std::future::Future<Output = O>) -> O {
tokio::runtime::current_thread::Runtime::new()
.unwrap()
.block_on(f)
}
#[test]
fn async_simple() {
let schema = RootNode::new(Query, Mutation);
let doc = r#"
query {
fieldSync
fieldAsyncPlain
delayed
user(id: "user1") {
name
}
}
"#;
let vars = Default::default();
let f = crate::execute_async(doc, None, &schema, &vars, &());
let (res, errs) = run(f).unwrap();
assert!(errs.is_empty());
let mut obj = res.into_object().unwrap();
obj.sort_by_field();
let value = Value::Object(obj);
assert_eq!(
value,
crate::graphql_value!({
"delayed": true,
"fieldAsyncPlain": "field_async_plain",
"fieldSync": "field_sync",
"user": {
"kind": "USER",
// "name": "user1",
// "delayed": true,
},
}),
);
}

View file

@ -1,6 +1,12 @@
mod directives;
mod enums;
mod executor;
mod interfaces_unions;
mod introspection;
mod variables;
// FIXME: re-enable
#[cfg(not(feature = "async"))]
mod interfaces_unions;
#[cfg(feature = "async")]
mod async_await;

View file

@ -90,6 +90,7 @@ Juniper has not reached 1.0 yet, thus some API instability should be expected.
*/
#![doc(html_root_url = "https://docs.rs/juniper/0.14.0")]
#![warn(missing_docs)]
#![cfg_attr(feature = "async", feature(async_await, async_closure))]
#[doc(hidden)]
pub extern crate serde;
@ -150,7 +151,6 @@ mod executor_tests;
pub use crate::util::to_camel_case;
use crate::{
executor::execute_validated_query,
introspection::{INTROSPECTION_QUERY, INTROSPECTION_QUERY_WITHOUT_DESCRIPTIONS},
parser::{parse_document_source, ParseError, Spanning},
validation::{validate_input_values, visit_all_rules, ValidatorContext},
@ -176,6 +176,9 @@ pub use crate::{
},
};
#[cfg(feature = "async")]
pub use crate::types::async_await::GraphQLTypeAsync;
/// An error that prevented query execution
#[derive(Debug, PartialEq)]
#[allow(missing_docs)]
@ -221,7 +224,48 @@ where
}
}
execute_validated_query(document, operation_name, root_node, variables, context)
executor::execute_validated_query(document, operation_name, root_node, variables, context)
}
/// Execute a query in a provided schema
#[cfg(feature = "async")]
pub async fn execute_async<'a, S, CtxT, QueryT, MutationT>(
document_source: &'a str,
operation_name: Option<&str>,
root_node: &'a RootNode<'a, QueryT, MutationT, S>,
variables: &Variables<S>,
context: &CtxT,
) -> Result<(Value<S>, Vec<ExecutionError<S>>), GraphQLError<'a>>
where
S: ScalarValue + Send + Sync,
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + Sync,
QueryT::TypeInfo: Send + Sync,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + Sync,
MutationT::TypeInfo: Send + Sync,
CtxT: Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
let document = parse_document_source(document_source, &root_node.schema)?;
{
let errors = validate_input_values(variables, &document, &root_node.schema);
if !errors.is_empty() {
return Err(GraphQLError::ValidationError(errors));
}
}
{
let mut ctx = ValidatorContext::new(&root_node.schema, &document);
visit_all_rules(&mut ctx, &document);
let errors = ctx.into_errors();
if !errors.is_empty() {
return Err(GraphQLError::ValidationError(errors));
}
}
executor::execute_validated_query_async(document, operation_name, root_node, variables, context)
.await
}
/// Execute the reference introspection query in the provided schema

View file

@ -7,29 +7,73 @@ macro_rules! __juniper_impl_trait {
}
) => {
impl<$($other,)*> $crate::$impl_trait<$crate::DefaultScalarValue> for $name {
$($body)+
$($body)*
}
};
(
impl< <$generic:tt $(: $bound: tt)*> $(, $other: tt)* > $impl_trait:tt for $name:ty {
impl< < DefaultScalarValue > $(, $other: tt)* > $impl_trait:tt for $name:ty
where ( $($where:tt)* )
{
$($body:tt)+
}
) => {
impl<$($other,)*> $crate::$impl_trait<$crate::DefaultScalarValue> for $name
where $($where)*
{
$($body)*
}
};
(
impl< <$generic:tt $(: $bound: tt)*> $(, $other: tt)* > $impl_trait:tt for $name:ty {
$($body:tt)*
}
) => {
impl<$($other,)* $generic $(: $bound)*> $crate::$impl_trait<$generic> for $name
where
$generic: $crate::ScalarValue,
for<'__b> &'__b $generic: $crate::ScalarRefValue<'__b>,
{
$($body)+
$($body)*
}
};
(
impl< <$generic:tt $(: $bound: tt)*> $(, $other: tt)* > $impl_trait:tt for $name:ty
where ( $($where:tt)* )
{
$($body:tt)*
}
) => {
impl<$($other,)* $generic $(: $bound)*> $crate::$impl_trait<$generic> for $name
where
$($where)*
$generic: $crate::ScalarValue,
for<'__b> &'__b $generic: $crate::ScalarRefValue<'__b>,
{
$($body)*
}
};
(
impl<$scalar:ty $(, $other: tt )*> $impl_trait:tt for $name:ty {
$($body:tt)+
$($body:tt)*
}
) => {
impl<$($other, )*> $crate::$impl_trait<$scalar> for $name {
$($body)+
$($body)*
}
};
(
impl<$scalar:ty $(, $other: tt )*> $impl_trait:tt for $name:ty
where ( $($where:tt)* )
{
$($body:tt)*
}
) => {
impl<$($other, )*> $crate::$impl_trait<$scalar> for $name
where $($where)*
{
$($body)*
}
};
}
@ -52,6 +96,25 @@ macro_rules! __juniper_insert_generic {
};
}
// TODO: remove me.
#[doc(hidden)]
#[macro_export]
macro_rules! __juniper_extract_generic {
(<$name:ident>) => {
$name
};
(
<$generic:tt $(: $bound: tt)*>
) => {
$generic
};
(
$scalar: ty
) => {
$scalar
};
}
#[doc(hidden)]
#[macro_export]
macro_rules! __juniper_parse_object_header {

View file

@ -45,9 +45,12 @@ In addition to implementing `GraphQLType` for the type in question,
usable as arguments and default values.
*/
#[cfg(not(feature = "async"))]
#[macro_export]
macro_rules! graphql_scalar {
( @as_expr $e:expr) => { $e };
(
@generate,
meta = {
@ -341,3 +344,328 @@ macro_rules! graphql_scalar {
);
}
}
// FIXME: prevent duplicating the whole macro for async.
#[cfg(feature = "async")]
#[macro_export]
macro_rules! graphql_scalar {
( @as_expr $e:expr) => { $e };
(
@generate,
meta = {
name = $name:ty,
outname = {$($outname:tt)+},
scalar = {$($scalar:tt)+},
$(description = $descr:tt,)*
},
resolve = {
self_var = $resolve_self_var:ident,
body = $resolve_body: block,
return_type = $resolve_retun_type: ty,
},
from_input_value = {
arg = $from_input_value_arg: ident,
result = $from_input_value_result: ty,
body = $from_input_value_body: block,
},
from_str = {
value_arg = $from_str_arg: ident,
result = $from_str_result: ty,
body = $from_str_body: block,
lifetime = $from_str_lt: tt,
},
) => {
$crate::__juniper_impl_trait!(
impl <$($scalar)+> GraphQLType for $name {
type Context = ();
type TypeInfo = ();
fn name(_: &Self::TypeInfo) -> Option<&str> {
Some($crate::graphql_scalar!(@as_expr $($outname)+))
}
fn meta<'r>(
info: &Self::TypeInfo,
registry: &mut $crate::Registry<'r, $crate::__juniper_insert_generic!($($scalar)+)>
) -> $crate::meta::MetaType<'r, $crate::__juniper_insert_generic!($($scalar)+)>
where for<'__b> &'__b $crate::__juniper_insert_generic!($($scalar)+): $crate::ScalarRefValue<'__b>,
$crate::__juniper_insert_generic!($($scalar)+): 'r
{
let meta = registry.build_scalar_type::<Self>(info);
$(
let meta = meta.description($descr);
)*
meta.into_meta()
}
fn resolve(
&$resolve_self_var,
_: &(),
_: Option<&[$crate::Selection<$crate::__juniper_insert_generic!($($scalar)+)>]>,
_: &$crate::Executor<
Self::Context,
$crate::__juniper_insert_generic!($($scalar)+)
>) -> $crate::Value<$crate::__juniper_insert_generic!($($scalar)+)> {
$resolve_body
}
}
);
$crate::__juniper_impl_trait!(
impl <$($scalar)+> GraphQLTypeAsync for $name
where (
$crate::__juniper_insert_generic!($($scalar)+): Send + Sync,
Self: $crate::GraphQLType<$crate::__juniper_insert_generic!($($scalar)+)> + Send + Sync,
Self::Context: Send + Sync,
Self::TypeInfo: Send + Sync,
)
{
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
selection_set: Option<&'a [$crate::Selection<$crate::__juniper_insert_generic!($($scalar)+)>]>,
executor: &'a $crate::Executor<Self::Context, $crate::__juniper_insert_generic!($($scalar)+)>,
) -> futures::future::BoxFuture<'a, $crate::Value<$crate::__juniper_insert_generic!($($scalar)+)>> {
use $crate::GraphQLType;
use futures::future;
let v = self.resolve(info, selection_set, executor);
future::FutureExt::boxed(future::ready(v))
}
}
);
$crate::__juniper_impl_trait!(
impl<$($scalar)+> ToInputValue for $name {
fn to_input_value(&$resolve_self_var) -> $crate::InputValue<$crate::__juniper_insert_generic!($($scalar)+)> {
let v = $resolve_body;
$crate::ToInputValue::to_input_value(&v)
}
}
);
$crate::__juniper_impl_trait!(
impl<$($scalar)+> FromInputValue for $name {
fn from_input_value(
$from_input_value_arg: &$crate::InputValue<$crate::__juniper_insert_generic!($($scalar)+)>
) -> $from_input_value_result {
$from_input_value_body
}
}
);
$crate::__juniper_impl_trait!(
impl<$($scalar)+> ParseScalarValue for $name {
fn from_str<$from_str_lt>($from_str_arg: $crate::parser::ScalarToken<$from_str_lt>) -> $from_str_result {
$from_str_body
}
}
);
};
// No more items to parse
(
@parse_functions,
meta = {
name = $name:ty,
outname = {$($outname:tt)+},
scalar = {$($scalar:tt)+},
$(description = $descr:tt,)*
},
resolve = {$($resolve_body:tt)+},
from_input_value = {$($from_input_value_body:tt)+},
from_str = {$($from_str_body:tt)+},
rest =
) => {
$crate::graphql_scalar!(
@generate,
meta = {
name = $name,
outname = {$($outname)+},
scalar = {$($scalar)+},
$(description = $descr,)*
},
resolve = {$($resolve_body)+},
from_input_value = {$($from_input_value_body)+},
from_str = {$($from_str_body)+},
);
};
(
@parse_functions,
meta = {
name = $name:ty,
outname = {$($outname:tt)+},
scalar = {$($scalar:tt)+},
$(description = $descr:tt,)*
},
$(from_input_value = {$($from_input_value_body:tt)+})*,
$(from_str = {$($from_str_body:tt)+})*,
rest =
) => {
compile_error!("Missing resolve function");
};
(
@parse_functions,
meta = {
name = $name:ty,
outname = {$($outname:tt)+},
scalar = {$($scalar:tt)+},
$(description = $descr:tt,)*
},
resolve = {$($resolve_body:tt)+},
$(from_str = {$($from_str_body:tt)+})*,
rest =
) => {
compile_error!("Missing from_input_value function");
};
(
@parse_functions,
meta = {
name = $name:ty,
outname = {$($outname:tt)+},
scalar = {$($scalar:tt)+},
$(description = $descr:tt,)*
},
resolve = {$($resolve_body:tt)+},
from_input_value = {$($from_input_value_body:tt)+},
rest =
) =>{
compile_error!("Missing from_str function");
};
// resolve(&self) -> Value { ... }
(
@parse_functions,
meta = {$($meta:tt)*},
$(resolve = {$($resolve_body:tt)+},)*
$(from_input_value = {$($from_input_value_body:tt)+},)*
$(from_str = {$($from_str_body:tt)+},)*
rest = resolve(&$selfvar:ident) -> $return_ty:ty $body:block $($rest:tt)*
) => {
$crate::graphql_scalar!(
@parse_functions,
meta = {$($meta)*},
resolve = {
self_var = $selfvar,
body = $body,
return_type = $return_ty,
},
$(from_input_value = {$($from_input_value_body)+},)*
$(from_str = {$($from_str_body)+},)*
rest = $($rest)*
);
};
// from_input_value(arg: &InputValue) -> ... { ... }
(
@parse_functions,
meta = { $($meta:tt)* },
$(resolve = {$($resolve_body:tt)+})*,
$(from_input_value = {$($from_input_value_body:tt)+},)*
$(from_str = {$($from_str_body:tt)+},)*
rest = from_input_value($arg:ident: &InputValue) -> $result:ty $body:block $($rest:tt)*
) => {
$crate::graphql_scalar!(
@parse_functions,
meta = { $($meta)* },
$(resolve = {$($resolve_body)+},)*
from_input_value = {
arg = $arg,
result = $result,
body = $body,
},
$(from_str = {$($from_str_body)+},)*
rest = $($rest)*
);
};
// from_str(value: &str) -> Result<S, ParseError>
(
@parse_functions,
meta = { $($meta:tt)* },
$(resolve = {$($resolve_body:tt)+},)*
$(from_input_value = {$($from_input_value_body:tt)+},)*
$(from_str = {$($from_str_body:tt)+},)*
rest = from_str<$from_str_lt: tt>($value_arg:ident: ScalarToken<$ignored_lt2: tt>) -> $result:ty $body:block $($rest:tt)*
) => {
$crate::graphql_scalar!(
@parse_functions,
meta = { $($meta)* },
$(resolve = {$($resolve_body)+},)*
$(from_input_value = {$($from_input_value_body)+},)*
from_str = {
value_arg = $value_arg,
result = $result,
body = $body,
lifetime = $from_str_lt,
},
rest = $($rest)*
);
};
// description: <description>
(
@parse_functions,
meta = {
name = $name:ty,
outname = {$($outname:tt)+},
scalar = {$($scalar:tt)+},
},
$(resolve = {$($resolve_body:tt)+},)*
$(from_input_value = {$($from_input_value_body:tt)+},)*
$(from_str = {$($from_str_body:tt)+},)*
rest = description: $descr:tt $($rest:tt)*
) => {
$crate::graphql_scalar!(
@parse_functions,
meta = {
name = $name,
outname = {$($outname)+},
scalar = {$($scalar)+},
description = $descr,
},
$(resolve = {$($resolve_body)+},)*
$(from_input_value = {$($from_input_value_body)+},)*
$(from_str = {$($from_str_body)+},)*
rest = $($rest)*
);
};
(
@parse,
meta = {
lifetimes = [],
name = $name: ty,
outname = {$($outname:tt)*},
scalar = {$($scalar:tt)*},
},
rest = $($rest:tt)*
) => {
$crate::graphql_scalar!(
@parse_functions,
meta = {
name = $name,
outname = {$($outname)*},
scalar = {$($scalar)*},
},
rest = $($rest)*
);
};
(@$($stuff:tt)*) => {
compile_error!("Invalid syntax for `graphql_scalar!`");
};
($($rest:tt)*) => {
$crate::__juniper_parse_object_header!(
callback = graphql_scalar,
rest = $($rest)*
);
}
}

View file

@ -76,10 +76,44 @@ where
}
}
#[cfg(feature = "async")]
impl<'a, CtxT, S, QueryT, MutationT> crate::GraphQLTypeAsync<S>
for RootNode<'a, QueryT, MutationT, S>
where
S: ScalarValue + Send + Sync,
QueryT: crate::GraphQLTypeAsync<S, Context = CtxT>,
QueryT::TypeInfo: Send + Sync,
MutationT: crate::GraphQLTypeAsync<S, Context = CtxT>,
MutationT::TypeInfo: Send + Sync,
CtxT: Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
fn resolve_field_async<'b>(
&'b self,
info: &'b Self::TypeInfo,
field_name: &'b str,
arguments: &'b Arguments<S>,
executor: &'b Executor<Self::Context, S>,
) -> futures::future::BoxFuture<'b, ExecutionResult<S>> {
use futures::future::{ready, FutureExt};
match field_name {
"__schema" | "__type" => {
let v = self.resolve_field(info, field_name, arguments, executor);
ready(v).boxed()
}
_ => self
.query_type
.resolve_field_async(info, field_name, arguments, executor),
}
}
}
#[crate::object_internal(
name = "__Schema"
Context = SchemaType<'a, S>,
Scalar = S,
// FIXME: make this redundant.
noasync,
)]
impl<'a, S> SchemaType<'a, S>
where
@ -117,6 +151,8 @@ where
name = "__Type"
Context = SchemaType<'a, S>,
Scalar = S,
// FIXME: make this redundant.
noasync,
)]
impl<'a, S> TypeType<'a, S>
where
@ -248,6 +284,8 @@ where
name = "__Field",
Context = SchemaType<'a, S>,
Scalar = S,
// FIXME: make this redundant.
noasync,
)]
impl<'a, S> Field<'a, S>
where
@ -285,6 +323,8 @@ where
name = "__InputValue",
Context = SchemaType<'a, S>,
Scalar = S,
// FIXME: make this redundant.
noasync,
)]
impl<'a, S> Argument<'a, S>
where
@ -311,6 +351,8 @@ where
#[crate::object_internal(
name = "__EnumValue",
Scalar = S,
// FIXME: make this redundant.
noasync,
)]
impl<'a, S> EnumValue
where
@ -337,6 +379,8 @@ where
name = "__Directive",
Context = SchemaType<'a, S>,
Scalar = S,
// FIXME: make this redundant.
noasync,
)]
impl<'a, S> DirectiveType<'a, S>
where

View file

@ -0,0 +1,281 @@
use futures::future::BoxFuture;
use crate::ast::{Directive, FromInputValue, InputValue, Selection};
use crate::value::{Object, ScalarRefValue, ScalarValue, Value};
use crate::executor::{ExecutionResult, Executor};
use crate::parser::Spanning;
use super::base::{is_excluded, merge_key_into, Arguments, GraphQLType};
pub trait GraphQLTypeAsync<S>: GraphQLType<S> + Send + Sync
where
Self::Context: Send + Sync,
Self::TypeInfo: Send + Sync,
S: ScalarValue + Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
fn resolve_field_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
field_name: &'a str,
arguments: &'a Arguments<S>,
executor: &'a Executor<Self::Context, S>,
) -> futures::future::BoxFuture<'a, ExecutionResult<S>> {
panic!("resolve_field must be implemented by object types");
}
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
selection_set: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> futures::future::BoxFuture<'a, Value<S>> {
if let Some(selection_set) = selection_set {
resolve_selection_set_into_async(self, info, selection_set, executor)
} else {
panic!("resolve() must be implemented by non-object output types");
}
}
}
// Wrapper function around resolve_selection_set_into_async_recursive.
// This wrapper is necessary because async fns can not be recursive.
#[cfg(feature = "async")]
pub(crate) fn resolve_selection_set_into_async<'a, 'e, T, CtxT, S>(
instance: &'a T,
info: &'a T::TypeInfo,
selection_set: &'e [Selection<'e, S>],
executor: &'e Executor<'e, CtxT, S>,
) -> futures::future::BoxFuture<'a, Value<S>>
where
T: GraphQLTypeAsync<S, Context = CtxT>,
T::TypeInfo: Send + Sync,
S: ScalarValue + Send + Sync,
CtxT: Send + Sync,
'e: 'a,
for<'b> &'b S: ScalarRefValue<'b>,
{
use futures::future::FutureExt;
resolve_selection_set_into_async_recursive(instance, info, selection_set, executor).boxed()
}
struct AsyncField<S> {
name: String,
value: Option<Value<S>>,
}
enum AsyncValue<S> {
Field(AsyncField<S>),
Nested(Value<S>),
}
// type ResolveFuture<'a, S> = BoxFuture<'a, AsyncResolve<S>>;
#[cfg(feature = "async")]
pub(crate) async fn resolve_selection_set_into_async_recursive<'a, T, CtxT, S>(
instance: &'a T,
info: &'a T::TypeInfo,
selection_set: &'a [Selection<'a, S>],
executor: &'a Executor<'a, CtxT, S>,
) -> Value<S>
where
T: GraphQLTypeAsync<S, Context = CtxT> + Send + Sync,
T::TypeInfo: Send + Sync,
S: ScalarValue + Send + Sync,
CtxT: Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
use futures::{
future::FutureExt,
stream::{FuturesOrdered, StreamExt},
};
let mut object = Object::with_capacity(selection_set.len());
let mut async_values = FuturesOrdered::<BoxFuture<'a, AsyncValue<S>>>::new();
let meta_type = executor
.schema()
.concrete_type_by_name(
T::name(info)
.expect("Resolving named type's selection set")
.as_ref(),
)
.expect("Type not found in schema");
for selection in selection_set {
match *selection {
Selection::Field(Spanning {
item: ref f,
start: ref start_pos,
..
}) => {
if is_excluded(&f.directives, executor.variables()) {
continue;
}
let response_name = f.alias.as_ref().unwrap_or(&f.name).item;
if f.name.item == "__typename" {
object.add_field(
response_name,
Value::scalar(instance.concrete_type_name(executor.context(), info)),
);
continue;
}
let meta_field = meta_type.field_by_name(f.name.item).unwrap_or_else(|| {
panic!(format!(
"Field {} not found on type {:?}",
f.name.item,
meta_type.name()
))
});
let exec_vars = executor.variables();
let sub_exec = executor.field_sub_executor(
&response_name,
f.name.item,
start_pos.clone(),
f.selection_set.as_ref().map(|v| &v[..]),
);
let args = Arguments::new(
f.arguments.as_ref().map(|m| {
m.item
.iter()
.map(|&(ref k, ref v)| (k.item, v.item.clone().into_const(exec_vars)))
.collect()
}),
&meta_field.arguments,
);
let pos = start_pos.clone();
let is_non_null = meta_field.field_type.is_non_null();
let response_name = response_name.to_string();
let field_future = async move {
// TODO: implement custom future type instead of
// two-level boxing.
let res = instance
.resolve_field_async(info, f.name.item, &args, &sub_exec)
.await;
let value = match res {
Ok(Value::Null) if is_non_null => None,
Ok(v) => Some(v),
Err(e) => {
sub_exec.push_error_at(e, pos);
if is_non_null {
None
} else {
Some(Value::null())
}
}
};
AsyncValue::Field(AsyncField {
name: response_name,
value,
})
};
async_values.push(field_future.boxed());
}
Selection::FragmentSpread(Spanning {
item: ref spread, ..
}) => {
if is_excluded(&spread.directives, executor.variables()) {
continue;
}
// TODO: prevent duplicate boxing.
let f = async move {
let fragment = &executor
.fragment_by_name(spread.name.item)
.expect("Fragment could not be found");
let value = resolve_selection_set_into_async(
instance,
info,
&fragment.selection_set[..],
executor,
)
.await;
AsyncValue::Nested(value)
};
async_values.push(f.boxed());
}
Selection::InlineFragment(Spanning {
item: ref fragment,
start: ref start_pos,
..
}) => {
if is_excluded(&fragment.directives, executor.variables()) {
continue;
}
let sub_exec = executor.type_sub_executor(
fragment.type_condition.as_ref().map(|c| c.item),
Some(&fragment.selection_set[..]),
);
if let Some(ref type_condition) = fragment.type_condition {
// FIXME: implement async version.
let sub_result = instance.resolve_into_type(
info,
type_condition.item,
Some(&fragment.selection_set[..]),
&sub_exec,
);
if let Ok(Value::Object(obj)) = sub_result {
for (k, v) in obj {
merge_key_into(&mut object, &k, v);
}
} else if let Err(e) = sub_result {
sub_exec.push_error_at(e, start_pos.clone());
}
} else {
let f = async move {
let value = resolve_selection_set_into_async(
instance,
info,
&fragment.selection_set[..],
&sub_exec,
)
.await;
AsyncValue::Nested(value)
};
async_values.push(f.boxed());
}
}
}
}
while let Some(item) = async_values.next().await {
match item {
AsyncValue::Field(AsyncField { name, value }) => {
if let Some(value) = value {
object.add_field(&name, value);
} else {
return Value::null();
}
}
AsyncValue::Nested(obj) => match obj {
v @ Value::Null => {
return v;
}
Value::Object(obj) => {
for (k, v) in obj {
merge_key_into(&mut object, &k, v);
}
}
_ => unreachable!(),
},
}
}
Value::Object(object)
}

View file

@ -343,7 +343,7 @@ where
}
}
pub(crate) fn resolve_selection_set_into<T, CtxT, S>(
pub fn resolve_selection_set_into<T, CtxT, S>(
instance: &T,
info: &T::TypeInfo,
selection_set: &[Selection<S>],
@ -499,7 +499,10 @@ where
true
}
fn is_excluded<S>(directives: &Option<Vec<Spanning<Directive<S>>>>, vars: &Variables<S>) -> bool
pub(super) fn is_excluded<S>(
directives: &Option<Vec<Spanning<Directive<S>>>>,
vars: &Variables<S>,
) -> bool
where
S: ScalarValue,
for<'b> &'b S: ScalarRefValue<'b>,
@ -528,7 +531,7 @@ where
false
}
fn merge_key_into<S>(result: &mut Object<S>, response_name: &str, value: Value<S>) {
pub(crate) fn merge_key_into<S>(result: &mut Object<S>, response_name: &str, value: Value<S>) {
if let Some(&mut (_, ref mut e)) = result
.iter_mut()
.find(|&&mut (ref key, _)| key == response_name)

View file

@ -217,3 +217,106 @@ where
Value::list(result)
}
#[cfg(feature = "async")]
async fn resolve_into_list_async<'a, S, T, I>(
executor: &'a Executor<'a, T::Context, S>,
info: &'a T::TypeInfo,
items: I,
) -> Value<S>
where
S: ScalarValue + Send + Sync,
I: Iterator<Item = T> + ExactSizeIterator,
T: crate::GraphQLTypeAsync<S>,
T::TypeInfo: Send + Sync,
T::Context: Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
use futures::stream::{FuturesOrdered, StreamExt};
use std::iter::FromIterator;
let stop_on_null = executor
.current_type()
.list_contents()
.expect("Current type is not a list type")
.is_non_null();
let iter =
items.map(|item| async move { executor.resolve_into_value_async(info, &item).await });
let mut futures = FuturesOrdered::from_iter(iter);
let mut values = Vec::with_capacity(futures.len());
while let Some(value) = futures.next().await {
if stop_on_null && value.is_null() {
return value;
}
values.push(value);
}
Value::list(values)
}
#[cfg(feature = "async")]
impl<S, T, CtxT> crate::GraphQLTypeAsync<S> for Vec<T>
where
T: crate::GraphQLTypeAsync<S, Context = CtxT>,
T::TypeInfo: Send + Sync,
S: ScalarValue + Send + Sync,
CtxT: Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
selection_set: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> futures::future::BoxFuture<'a, Value<S>> {
let f = resolve_into_list_async(executor, info, self.iter());
futures::future::FutureExt::boxed(f)
}
}
#[cfg(feature = "async")]
impl<S, T, CtxT> crate::GraphQLTypeAsync<S> for &[T]
where
T: crate::GraphQLTypeAsync<S, Context = CtxT>,
T::TypeInfo: Send + Sync,
S: ScalarValue + Send + Sync,
CtxT: Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
selection_set: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> futures::future::BoxFuture<'a, Value<S>> {
let f = resolve_into_list_async(executor, info, self.iter());
futures::future::FutureExt::boxed(f)
}
}
#[cfg(feature = "async")]
impl<S, T, CtxT> crate::GraphQLTypeAsync<S> for Option<T>
where
T: crate::GraphQLTypeAsync<S, Context = CtxT>,
T::TypeInfo: Send + Sync,
S: ScalarValue + Send + Sync,
CtxT: Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
selection_set: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> futures::future::BoxFuture<'a, Value<S>> {
let f = async move {
match *self {
Some(ref obj) => executor.resolve_into_value_async(info, obj).await,
None => Value::null(),
}
};
futures::future::FutureExt::boxed(f)
}
}

View file

@ -4,3 +4,6 @@ pub mod name;
pub mod pointers;
pub mod scalars;
pub mod utilities;
#[cfg(feature = "async")]
pub mod async_await;

View file

@ -85,7 +85,7 @@ where
}
}
impl<'a, S, T, CtxT> GraphQLType<S> for &'a T
impl<'e, S, T, CtxT> GraphQLType<S> for &'e T
where
S: ScalarValue,
T: GraphQLType<S, Context = CtxT>,
@ -136,6 +136,35 @@ where
}
}
#[cfg(feature = "async")]
impl<'e, S, T> crate::GraphQLTypeAsync<S> for &'e T
where
S: ScalarValue + Send + Sync,
T: crate::GraphQLTypeAsync<S>,
T::TypeInfo: Send + Sync,
T::Context: Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
fn resolve_field_async<'b>(
&'b self,
info: &'b Self::TypeInfo,
field_name: &'b str,
arguments: &'b Arguments<S>,
executor: &'b Executor<Self::Context, S>,
) -> futures::future::BoxFuture<'b, ExecutionResult<S>> {
crate::GraphQLTypeAsync::resolve_field_async(&**self, info, field_name, arguments, executor)
}
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
selection_set: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> futures::future::BoxFuture<'a, Value<S>> {
crate::GraphQLTypeAsync::resolve_async(&**self, info, selection_set, executor)
}
}
impl<'a, T, S> ToInputValue<S> for &'a T
where
S: Debug,

View file

@ -196,6 +196,23 @@ where
}
}
#[cfg(feature = "async")]
impl<'e, S> crate::GraphQLTypeAsync<S> for &'e str
where
S: ScalarValue + Send + Sync,
for<'b> &'b S: ScalarRefValue<'b>,
{
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
selection_set: Option<&'a [Selection<S>]>,
executor: &'a Executor<Self::Context, S>,
) -> futures::future::BoxFuture<'a, crate::Value<S>> {
use futures::future;
future::FutureExt::boxed(future::ready(self.resolve(info, selection_set, executor)))
}
}
impl<'a, S> ToInputValue<S> for &'a str
where
S: ScalarValue,

View file

@ -120,6 +120,13 @@ where
}
}
pub fn into_object(self) -> Option<Object<S>> {
match self {
Value::Object(o) => Some(o),
_ => None,
}
}
/// Mutable view into the underlying object value, if present.
pub fn as_mut_object_value(&mut self) -> Option<&mut Object<S>> {
match *self {

View file

@ -76,6 +76,19 @@ impl<S> Object<S> {
.find(|&&(ref k, _)| (k as &str) == key)
.map(|&(_, ref value)| value)
}
pub fn sort_by_field(&mut self) {
self.key_value_list
.sort_by(|(key1, _), (key2, _)| key1.cmp(key2));
for (_, ref mut value) in &mut self.key_value_list {
match value {
Value::Object(ref mut o) => {
o.sort_by_field();
}
_ => {}
}
}
}
}
impl<S> IntoIterator for Object<S> {

View file

@ -260,6 +260,8 @@ pub enum DefaultScalarValue {
Boolean(bool),
}
trait S: Send + Sync {}
impl ScalarValue for DefaultScalarValue {
type Visitor = DefaultScalarValueVisitor;

View file

@ -14,6 +14,9 @@ edition = "2018"
[lib]
proc-macro = true
[features]
async = []
[dependencies]
proc-macro2 = "1.0.1"
syn = { version = "1.0.3", features = ["full", "extra-traits", "parsing"] }

View file

@ -206,9 +206,34 @@ pub fn impl_enum(ast: &syn::DeriveInput, is_internal: bool) -> TokenStream {
});
}
#[cfg(feature = "async")]
let _async = quote!(
impl<__S> #juniper_path::GraphQLTypeAsync<__S> for #ident
where
__S: #juniper_path::ScalarValue + Send + Sync,
for<'__b> &'__b __S: #juniper_path::ScalarRefValue<'__b>
{
fn resolve_async<'a>(
&'a self,
info: &'a Self::TypeInfo,
selection_set: Option<&'a [#juniper_path::Selection<__S>]>,
executor: &'a #juniper_path::Executor<Self::Context, __S>,
) -> futures::future::BoxFuture<'a, #juniper_path::Value<__S>> {
use #juniper_path::GraphQLType;
use futures::future;
let v = self.resolve(info, selection_set, executor);
future::FutureExt::boxed(future::ready(v))
}
}
);
#[cfg(not(feature = "async"))]
let _async = quote!();
let body = quote! {
impl<__S> #juniper_path::GraphQLType<__S> for #ident
where __S: #juniper_path::ScalarValue,
where __S:
#juniper_path::ScalarValue,
for<'__b> &'__b __S: #juniper_path::ScalarRefValue<'__b>
{
type Context = ();
@ -261,6 +286,8 @@ pub fn impl_enum(ast: &syn::DeriveInput, is_internal: bool) -> TokenStream {
}
}
}
#_async
};
body
}

View file

@ -59,6 +59,7 @@ pub fn build_derive_object(ast: syn::DeriveInput, is_internal: bool) -> TokenStr
description: field_attrs.description,
deprecation: field_attrs.deprecation,
resolver_code,
resolver_code_async: None,
})
}
});
@ -74,6 +75,7 @@ pub fn build_derive_object(ast: syn::DeriveInput, is_internal: bool) -> TokenStr
interfaces: None,
include_type_generics: true,
generic_scalar: true,
no_async: attrs.no_async,
};
let juniper_crate_name = if is_internal { "crate" } else { "juniper" };

View file

@ -86,6 +86,7 @@ pub fn build_object(args: TokenStream, body: TokenStream, is_internal: bool) ->
},
include_type_generics: false,
generic_scalar: false,
no_async: impl_attrs.no_async,
};
for item in _impl.items {
@ -101,6 +102,8 @@ pub fn build_object(args: TokenStream, body: TokenStream, is_internal: bool) ->
}
};
let is_async = method.sig.asyncness.is_some();
let attrs = match util::FieldAttributes::from_attrs(
method.attrs,
util::FieldAttributeParseMode::Impl,
@ -195,12 +198,28 @@ pub fn build_object(args: TokenStream, body: TokenStream, is_internal: bool) ->
let body = &method.block;
let return_ty = &method.sig.output;
let resolver_code = quote!(
(|| #return_ty {
#( #resolve_parts )*
#body
})()
);
let (resolver_code, resolver_code_async) = if is_async {
(
quote!(),
Some(quote!(
(async move || #return_ty {
#( #resolve_parts )*
#body
})()
)),
)
} else {
(
quote!(
(|| #return_ty {
#( #resolve_parts )*
#body
})()
),
None,
)
};
let ident = &method.sig.ident;
let name = attrs
@ -214,6 +233,7 @@ pub fn build_object(args: TokenStream, body: TokenStream, is_internal: bool) ->
description: attrs.description,
deprecation: attrs.deprecation,
resolver_code,
resolver_code_async,
});
}
_ => {

View file

@ -297,6 +297,7 @@ pub struct ObjectAttributes {
pub context: Option<syn::Type>,
pub scalar: Option<syn::Type>,
pub interfaces: Vec<syn::Type>,
pub no_async: bool,
}
impl syn::parse::Parse for ObjectAttributes {
@ -307,6 +308,7 @@ impl syn::parse::Parse for ObjectAttributes {
context: None,
scalar: None,
interfaces: Vec::new(),
no_async: false,
};
while !input.is_empty() {
@ -350,6 +352,10 @@ impl syn::parse::Parse for ObjectAttributes {
.into_iter()
.collect();
}
// FIXME: make this unneccessary.
"noasync" => {
output.no_async = true;
}
other => {
return Err(input.error(format!("Unknown attribute: {}", other)));
}
@ -591,6 +597,14 @@ pub struct GraphQLTypeDefinitionField {
pub deprecation: Option<DeprecationAttr>,
pub args: Vec<GraphQLTypeDefinitionFieldArg>,
pub resolver_code: proc_macro2::TokenStream,
pub resolver_code_async: Option<proc_macro2::TokenStream>,
}
impl GraphQLTypeDefinitionField {
#[inline]
fn is_async(&self) -> bool {
self.resolver_code_async.is_some()
}
}
/// Definition of a graphql type based on information extracted
@ -618,9 +632,15 @@ pub struct GraphQLTypeDefiniton {
// If false, the scalar is only generic if a generic parameter
// is specified manually.
pub generic_scalar: bool,
// FIXME: make this redundant.
pub no_async: bool,
}
impl GraphQLTypeDefiniton {
fn has_async_field(&self) -> bool {
self.fields.iter().any(|field| field.is_async())
}
pub fn into_tokens(self, juniper_crate_name: &str) -> proc_macro2::TokenStream {
let juniper_crate_name = syn::parse_str::<syn::Path>(juniper_crate_name).unwrap();
@ -691,21 +711,30 @@ impl GraphQLTypeDefiniton {
let name = &field.name;
let code = &field.resolver_code;
quote!(
#name => {
let res = { #code };
#juniper_crate_name::IntoResolvable::into(
res,
executor.context()
)
.and_then(|res| {
match res {
Some((ctx, r)) => executor.replaced_context(ctx).resolve_with_ctx(&(), &r),
None => Ok(#juniper_crate_name::Value::null()),
}
})
},
)
if field.is_async() {
// TODO: better error message with field/type name.
quote!(
#name => {
panic!("Tried to resolve async field with a sync resolver");
},
)
} else {
quote!(
#name => {
let res = { #code };
#juniper_crate_name::IntoResolvable::into(
res,
executor.context()
)
.and_then(|res| {
match res {
Some((ctx, r)) => executor.replaced_context(ctx).resolve_with_ctx(&(), &r),
None => Ok(#juniper_crate_name::Value::null()),
}
})
},
)
}
});
let description = self
@ -778,6 +807,114 @@ impl GraphQLTypeDefiniton {
};
let (impl_generics, _, where_clause) = generics.split_for_impl();
#[cfg(feature = "async")]
let resolve_field_async = {
let resolve_matches_async = self.fields.iter().map(|field| {
let name = &field.name;
if let Some(code) = field.resolver_code_async.as_ref() {
quote!(
#name => {
let f = async move {
let res = { #code }.await;
let inner_res = #juniper_crate_name::IntoResolvable::into(
res,
executor.context()
);
match inner_res {
Ok(Some((ctx, r))) => {
let subexec = executor
.replaced_context(ctx);
subexec.resolve_with_ctx_async(&(), &r)
.await
},
Ok(None) => Ok(#juniper_crate_name::Value::null()),
Err(e) => Err(e),
}
};
future::FutureExt::boxed(f)
},
)
} else {
let code = &field.resolver_code;
let inner = if !self.no_async {
quote!(
let f = async move {
match res2 {
Ok(Some((ctx, r))) => {
let sub = executor.replaced_context(ctx);
sub.resolve_with_ctx_async(&(), &r).await
},
Ok(None) => Ok(#juniper_crate_name::Value::null()),
Err(e) => Err(e),
}
};
future::FutureExt::boxed(f)
)
} else {
quote!(
let v = match res2 {
Ok(Some((ctx, r))) => executor.replaced_context(ctx).resolve_with_ctx(&(), &r),
Ok(None) => Ok(#juniper_crate_name::Value::null()),
Err(e) => Err(e),
};
future::FutureExt::boxed(future::ready(v))
)
};
quote!(
#name => {
let res = { #code };
let res2 = #juniper_crate_name::IntoResolvable::into(
res,
executor.context()
);
#inner
},
)
}
});
let mut where_async = where_clause.cloned().unwrap_or_else(|| parse_quote!(where));;
where_async
.predicates
.push(parse_quote!( #scalar: Send + Sync ));
where_async.predicates.push(parse_quote!(Self: Send + Sync));
// FIXME: add where clause for interfaces.
quote!(
impl#impl_generics #juniper_crate_name::GraphQLTypeAsync<#scalar> for #ty #type_generics_tokens
#where_async
{
fn resolve_field_async<'b>(
&'b self,
info: &'b Self::TypeInfo,
field: &'b str,
args: &'b #juniper_crate_name::Arguments<#scalar>,
executor: &'b #juniper_crate_name::Executor<Self::Context, #scalar>,
) -> futures::future::BoxFuture<'b, #juniper_crate_name::ExecutionResult<#scalar>>
where #scalar: Send + Sync,
{
use futures::future;
use #juniper_crate_name::GraphQLType;
match field {
#( #resolve_matches_async )*
_ => {
panic!("Field {} not found on type {}", field, "Mutation");
}
}
}
}
)
};
#[cfg(not(feature = "async"))]
let resolve_field_async = quote!();
let output = quote!(
impl#impl_generics #juniper_crate_name::GraphQLType<#scalar> for #ty #type_generics_tokens
#where_clause
@ -822,11 +959,14 @@ impl GraphQLTypeDefiniton {
}
}
fn concrete_type_name(&self, _: &Self::Context, _: &Self::TypeInfo) -> String {
#name.to_string()
}
}
}
#resolve_field_async
);
output
}

View file

@ -8,6 +8,9 @@ documentation = "https://docs.rs/juniper_warp"
repository = "https://github.com/graphql-rust/juniper"
edition = "2018"
[features]
async = [ "juniper/async", "futures03" ]
[dependencies]
warp = "0.1.8"
juniper = { version = "0.14.0", path = "../juniper", default-features = false }
@ -18,6 +21,8 @@ futures = "0.1.23"
serde = "1.0.75"
tokio-threadpool = "0.1.7"
futures03 = { version = "0.3.0-alpha.18", optional = true, package = "futures-preview" }
[dev-dependencies]
juniper = { version = "0.14.0", path = "../juniper", features = ["expose-test-schema", "serde_json"] }
env_logger = "0.5.11"

View file

@ -41,11 +41,12 @@ Check the LICENSE file for details.
#![doc(html_root_url = "https://docs.rs/juniper_warp/0.2.0")]
use futures::{future::poll_fn, Future};
use juniper::{DefaultScalarValue, InputValue, ScalarRefValue, ScalarValue};
use serde::Deserialize;
use std::sync::Arc;
use warp::{filters::BoxedFilter, Filter};
use juniper::{DefaultScalarValue, InputValue, ScalarRefValue, ScalarValue};
#[derive(Debug, serde_derive::Deserialize, PartialEq)]
#[serde(untagged)]
#[serde(bound = "InputValue<S>: Deserialize<'de>")]
@ -240,6 +241,80 @@ where
get_filter.or(post_filter).unify().boxed()
}
/// FIXME: docs
pub fn make_graphql_filter_async<Query, Mutation, Context, S>(
schema: juniper::RootNode<'static, Query, Mutation, S>,
context_extractor: BoxedFilter<(Context,)>,
) -> BoxedFilter<(warp::http::Response<Vec<u8>>,)>
where
S: ScalarValue + Send + Sync + 'static,
for<'b> &'b S: ScalarRefValue<'b>,
Context: Send + Sync + 'static,
Query: juniper::GraphQLTypeAsync<S, Context = Context> + Send + Sync + 'static,
Query::TypeInfo: Send + Sync,
Mutation: juniper::GraphQLTypeAsync<S, Context = Context> + Send + Sync + 'static,
Mutation::TypeInfo: Send + Sync,
{
let schema = Arc::new(schema);
let post_schema = schema.clone();
let handle_post_request =
move |context: Context, request: GraphQLBatchRequest<S>| -> Response {
let schema = post_schema.clone();
Box::new(
poll_fn(move || {
tokio_threadpool::blocking(|| {
let response = request.execute(&schema, &context);
Ok((serde_json::to_vec(&response)?, response.is_ok()))
})
})
.and_then(|result| ::futures::future::done(Ok(build_response(result))))
.map_err(|e: tokio_threadpool::BlockingError| warp::reject::custom(e)),
)
};
let post_filter = warp::post2()
.and(context_extractor.clone())
.and(warp::body::json())
.and_then(handle_post_request);
let handle_get_request = move |context: Context,
mut request: std::collections::HashMap<String, String>|
-> Response {
let schema = schema.clone();
Box::new(
poll_fn(move || {
tokio_threadpool::blocking(|| {
let variables = match request.remove("variables") {
None => None,
Some(vs) => serde_json::from_str(&vs)?,
};
let graphql_request = juniper::http::GraphQLRequest::new(
request.remove("query").ok_or_else(|| {
failure::format_err!("Missing GraphQL query string in query parameters")
})?,
request.get("operation_name").map(|s| s.to_owned()),
variables,
);
let response = graphql_request.execute(&schema, &context);
Ok((serde_json::to_vec(&response)?, response.is_ok()))
})
})
.and_then(|result| ::futures::future::done(Ok(build_response(result))))
.map_err(|e: tokio_threadpool::BlockingError| warp::reject::custom(e)),
)
};
let get_filter = warp::get2()
.and(context_extractor.clone())
.and(warp::filters::query::query())
.and_then(handle_get_request);
get_filter.or(post_filter).unify().boxed()
}
fn build_response(
response: Result<(Vec<u8>, bool), failure::Error>,
) -> warp::http::Response<Vec<u8>> {