From edecb8c99fff00aaa3a13eede8f8a90992c8687f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=20Houl=C3=A9?= <tom@tomhoule.com> Date: Sat, 20 Oct 2018 09:59:35 +0200 Subject: [PATCH] Get rid of futures_cpupool in juniper_warp (fixes #258) --- juniper_warp/Cargo.toml | 2 +- juniper_warp/src/lib.rs | 88 ++++++++++++++++------------------------- 2 files changed, 36 insertions(+), 54 deletions(-) diff --git a/juniper_warp/Cargo.toml b/juniper_warp/Cargo.toml index b8405a0a..415da325 100644 --- a/juniper_warp/Cargo.toml +++ b/juniper_warp/Cargo.toml @@ -13,9 +13,9 @@ juniper = { path = "../juniper", version = ">=0.9, 0.10.0", default-features = f serde_json = "1.0.24" serde_derive = "1.0.75" failure = "0.1.2" -futures-cpupool = "0.1.8" futures = "0.1.23" serde = "1.0.75" +tokio-threadpool = "0.1.7" [dev-dependencies] juniper = { path = "../juniper", version = "0.10.0", features = ["expose-test-schema", "serde_json"] } diff --git a/juniper_warp/src/lib.rs b/juniper_warp/src/lib.rs index 29db681e..4aa63fe2 100644 --- a/juniper_warp/src/lib.rs +++ b/juniper_warp/src/lib.rs @@ -42,20 +42,19 @@ Check the LICENSE file for details. #[macro_use] extern crate failure; extern crate futures; -extern crate futures_cpupool; extern crate juniper; #[macro_use] extern crate serde_derive; extern crate serde; extern crate serde_json; +extern crate tokio_threadpool; extern crate warp; #[cfg(test)] extern crate percent_encoding; -use futures::Future; -use futures_cpupool::CpuPool; -use juniper::{DefaultScalarValue, ScalarRefValue, ScalarValue, InputValue}; +use futures::{future::poll_fn, Future}; +use juniper::{DefaultScalarValue, InputValue, ScalarRefValue, ScalarValue}; use serde::Deserialize; use std::sync::Arc; use warp::{filters::BoxedFilter, Filter}; @@ -127,9 +126,7 @@ where /// /// The `context_extractor` argument should be a filter that provides the GraphQL context required by the schema. /// -/// In order to avoid blocking, this helper will create a [CpuPool](../futures_cpupool/struct.CpuPool.html) to resolve GraphQL requests. -/// -/// If you want to pass your own threadpool, use [make_graphql_filter_with_thread_pool](fn.make_graphql_filter_with_thread_pool.html) instead. +/// In order to avoid blocking, this helper will use the `tokio_threadpool` threadpool created by hyper to resolve GraphQL requests. /// /// Example: /// @@ -181,27 +178,9 @@ where /// .and(graphql_filter); /// # } /// ``` -pub fn make_graphql_filter<Query, Mutation, Context>( - schema: juniper::RootNode<'static, Query, Mutation>, - context_extractor: BoxedFilter<(Context,)>, -) -> BoxedFilter<(warp::http::Response<Vec<u8>>,)> -where - Context: Send + 'static, - Query: juniper::GraphQLType<Context = Context, TypeInfo = ()> + Send + Sync + 'static, - Mutation: juniper::GraphQLType<Context = Context, TypeInfo = ()> + Send + Sync + 'static, -{ - let pool = CpuPool::new_num_cpus(); - make_graphql_filter_with_thread_pool(schema, context_extractor, pool) -} - -type Response = - Box<Future<Item = warp::http::Response<Vec<u8>>, Error = warp::reject::Rejection> + Send>; - -/// Same as [make_graphql_filter](./fn.make_graphql_filter.html), but use the provided [CpuPool](../futures_cpupool/struct.CpuPool.html) instead. -pub fn make_graphql_filter_with_thread_pool<Query, Mutation, Context, S>( +pub fn make_graphql_filter<Query, Mutation, Context, S>( schema: juniper::RootNode<'static, Query, Mutation, S>, context_extractor: BoxedFilter<(Context,)>, - thread_pool: futures_cpupool::CpuPool, ) -> BoxedFilter<(warp::http::Response<Vec<u8>>,)> where S: ScalarValue + Send + Sync + 'static, @@ -212,57 +191,57 @@ where { let schema = Arc::new(schema); let post_schema = schema.clone(); - let pool_filter = warp::any().map(move || thread_pool.clone()); let handle_post_request = - move |context: Context, request: GraphQLBatchRequest<S>, pool: CpuPool| -> Response { + move |context: Context, request: GraphQLBatchRequest<S>| -> Response { let schema = post_schema.clone(); Box::new( - pool.spawn_fn(move || { - let response = request.execute(&schema, &context); - Ok((serde_json::to_vec(&response)?, response.is_ok())) - }).then(|result| ::futures::future::done(Ok(build_response(result)))) - .map_err(|_: failure::Error| warp::reject::server_error()), + poll_fn(move || { + tokio_threadpool::blocking(|| { + let response = request.execute(&schema, &context); + Ok((serde_json::to_vec(&response)?, response.is_ok())) + }) + }).and_then(|result| ::futures::future::done(Ok(build_response(result)))) + .map_err(|_: tokio_threadpool::BlockingError| warp::reject::server_error()), ) }; let post_filter = warp::post2() .and(context_extractor.clone()) .and(warp::body::json()) - .and(pool_filter.clone()) .and_then(handle_post_request); let handle_get_request = move |context: Context, - mut request: std::collections::HashMap<String, String>, - pool: CpuPool| + mut request: std::collections::HashMap<String, String>| -> Response { let schema = schema.clone(); Box::new( - pool.spawn_fn(move || { - let variables = match request.remove("variables") { - None => None, - Some(vs) => serde_json::from_str(&vs)?, - }; + poll_fn(move || { + tokio_threadpool::blocking(|| { + let variables = match request.remove("variables") { + None => None, + Some(vs) => serde_json::from_str(&vs)?, + }; - let graphql_request = juniper::http::GraphQLRequest::new( - request.remove("query").ok_or_else(|| { - format_err!("Missing GraphQL query string in query parameters") - })?, - request.get("operation_name").map(|s| s.to_owned()), - variables, - ); + let graphql_request = juniper::http::GraphQLRequest::new( + request.remove("query").ok_or_else(|| { + format_err!("Missing GraphQL query string in query parameters") + })?, + request.get("operation_name").map(|s| s.to_owned()), + variables, + ); - let response = graphql_request.execute(&schema, &context); - Ok((serde_json::to_vec(&response)?, response.is_ok())) - }).then(|result| ::futures::future::done(Ok(build_response(result)))) - .map_err(|_: failure::Error| warp::reject::server_error()), + let response = graphql_request.execute(&schema, &context); + Ok((serde_json::to_vec(&response)?, response.is_ok())) + }) + }).and_then(|result| ::futures::future::done(Ok(build_response(result)))) + .map_err(|_: tokio_threadpool::BlockingError| warp::reject::server_error()), ) }; let get_filter = warp::get2() .and(context_extractor.clone()) .and(warp::filters::query::query()) - .and(pool_filter) .and_then(handle_get_request); get_filter.or(post_filter).unify().boxed() @@ -284,6 +263,9 @@ fn build_response( } } +type Response = + Box<Future<Item = warp::http::Response<Vec<u8>>, Error = warp::reject::Rejection> + Send>; + /// Create a filter that replies with an HTML page containing GraphiQL. This does not handle routing, so you can mount it on any endpoint. /// /// For example: