Get rid of futures_cpupool in juniper_warp (fixes #258)
This commit is contained in:
parent
3266c237e9
commit
edecb8c99f
2 changed files with 36 additions and 54 deletions
|
@ -13,9 +13,9 @@ juniper = { path = "../juniper", version = ">=0.9, 0.10.0", default-features = f
|
|||
serde_json = "1.0.24"
|
||||
serde_derive = "1.0.75"
|
||||
failure = "0.1.2"
|
||||
futures-cpupool = "0.1.8"
|
||||
futures = "0.1.23"
|
||||
serde = "1.0.75"
|
||||
tokio-threadpool = "0.1.7"
|
||||
|
||||
[dev-dependencies]
|
||||
juniper = { path = "../juniper", version = "0.10.0", features = ["expose-test-schema", "serde_json"] }
|
||||
|
|
|
@ -42,20 +42,19 @@ Check the LICENSE file for details.
|
|||
#[macro_use]
|
||||
extern crate failure;
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
extern crate juniper;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
extern crate serde;
|
||||
extern crate serde_json;
|
||||
extern crate tokio_threadpool;
|
||||
extern crate warp;
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate percent_encoding;
|
||||
|
||||
use futures::Future;
|
||||
use futures_cpupool::CpuPool;
|
||||
use juniper::{DefaultScalarValue, ScalarRefValue, ScalarValue, InputValue};
|
||||
use futures::{future::poll_fn, Future};
|
||||
use juniper::{DefaultScalarValue, InputValue, ScalarRefValue, ScalarValue};
|
||||
use serde::Deserialize;
|
||||
use std::sync::Arc;
|
||||
use warp::{filters::BoxedFilter, Filter};
|
||||
|
@ -127,9 +126,7 @@ where
|
|||
///
|
||||
/// The `context_extractor` argument should be a filter that provides the GraphQL context required by the schema.
|
||||
///
|
||||
/// In order to avoid blocking, this helper will create a [CpuPool](../futures_cpupool/struct.CpuPool.html) to resolve GraphQL requests.
|
||||
///
|
||||
/// If you want to pass your own threadpool, use [make_graphql_filter_with_thread_pool](fn.make_graphql_filter_with_thread_pool.html) instead.
|
||||
/// In order to avoid blocking, this helper will use the `tokio_threadpool` threadpool created by hyper to resolve GraphQL requests.
|
||||
///
|
||||
/// Example:
|
||||
///
|
||||
|
@ -181,27 +178,9 @@ where
|
|||
/// .and(graphql_filter);
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn make_graphql_filter<Query, Mutation, Context>(
|
||||
schema: juniper::RootNode<'static, Query, Mutation>,
|
||||
context_extractor: BoxedFilter<(Context,)>,
|
||||
) -> BoxedFilter<(warp::http::Response<Vec<u8>>,)>
|
||||
where
|
||||
Context: Send + 'static,
|
||||
Query: juniper::GraphQLType<Context = Context, TypeInfo = ()> + Send + Sync + 'static,
|
||||
Mutation: juniper::GraphQLType<Context = Context, TypeInfo = ()> + Send + Sync + 'static,
|
||||
{
|
||||
let pool = CpuPool::new_num_cpus();
|
||||
make_graphql_filter_with_thread_pool(schema, context_extractor, pool)
|
||||
}
|
||||
|
||||
type Response =
|
||||
Box<Future<Item = warp::http::Response<Vec<u8>>, Error = warp::reject::Rejection> + Send>;
|
||||
|
||||
/// Same as [make_graphql_filter](./fn.make_graphql_filter.html), but use the provided [CpuPool](../futures_cpupool/struct.CpuPool.html) instead.
|
||||
pub fn make_graphql_filter_with_thread_pool<Query, Mutation, Context, S>(
|
||||
pub fn make_graphql_filter<Query, Mutation, Context, S>(
|
||||
schema: juniper::RootNode<'static, Query, Mutation, S>,
|
||||
context_extractor: BoxedFilter<(Context,)>,
|
||||
thread_pool: futures_cpupool::CpuPool,
|
||||
) -> BoxedFilter<(warp::http::Response<Vec<u8>>,)>
|
||||
where
|
||||
S: ScalarValue + Send + Sync + 'static,
|
||||
|
@ -212,57 +191,57 @@ where
|
|||
{
|
||||
let schema = Arc::new(schema);
|
||||
let post_schema = schema.clone();
|
||||
let pool_filter = warp::any().map(move || thread_pool.clone());
|
||||
|
||||
let handle_post_request =
|
||||
move |context: Context, request: GraphQLBatchRequest<S>, pool: CpuPool| -> Response {
|
||||
move |context: Context, request: GraphQLBatchRequest<S>| -> Response {
|
||||
let schema = post_schema.clone();
|
||||
Box::new(
|
||||
pool.spawn_fn(move || {
|
||||
let response = request.execute(&schema, &context);
|
||||
Ok((serde_json::to_vec(&response)?, response.is_ok()))
|
||||
}).then(|result| ::futures::future::done(Ok(build_response(result))))
|
||||
.map_err(|_: failure::Error| warp::reject::server_error()),
|
||||
poll_fn(move || {
|
||||
tokio_threadpool::blocking(|| {
|
||||
let response = request.execute(&schema, &context);
|
||||
Ok((serde_json::to_vec(&response)?, response.is_ok()))
|
||||
})
|
||||
}).and_then(|result| ::futures::future::done(Ok(build_response(result))))
|
||||
.map_err(|_: tokio_threadpool::BlockingError| warp::reject::server_error()),
|
||||
)
|
||||
};
|
||||
|
||||
let post_filter = warp::post2()
|
||||
.and(context_extractor.clone())
|
||||
.and(warp::body::json())
|
||||
.and(pool_filter.clone())
|
||||
.and_then(handle_post_request);
|
||||
|
||||
let handle_get_request = move |context: Context,
|
||||
mut request: std::collections::HashMap<String, String>,
|
||||
pool: CpuPool|
|
||||
mut request: std::collections::HashMap<String, String>|
|
||||
-> Response {
|
||||
let schema = schema.clone();
|
||||
Box::new(
|
||||
pool.spawn_fn(move || {
|
||||
let variables = match request.remove("variables") {
|
||||
None => None,
|
||||
Some(vs) => serde_json::from_str(&vs)?,
|
||||
};
|
||||
poll_fn(move || {
|
||||
tokio_threadpool::blocking(|| {
|
||||
let variables = match request.remove("variables") {
|
||||
None => None,
|
||||
Some(vs) => serde_json::from_str(&vs)?,
|
||||
};
|
||||
|
||||
let graphql_request = juniper::http::GraphQLRequest::new(
|
||||
request.remove("query").ok_or_else(|| {
|
||||
format_err!("Missing GraphQL query string in query parameters")
|
||||
})?,
|
||||
request.get("operation_name").map(|s| s.to_owned()),
|
||||
variables,
|
||||
);
|
||||
let graphql_request = juniper::http::GraphQLRequest::new(
|
||||
request.remove("query").ok_or_else(|| {
|
||||
format_err!("Missing GraphQL query string in query parameters")
|
||||
})?,
|
||||
request.get("operation_name").map(|s| s.to_owned()),
|
||||
variables,
|
||||
);
|
||||
|
||||
let response = graphql_request.execute(&schema, &context);
|
||||
Ok((serde_json::to_vec(&response)?, response.is_ok()))
|
||||
}).then(|result| ::futures::future::done(Ok(build_response(result))))
|
||||
.map_err(|_: failure::Error| warp::reject::server_error()),
|
||||
let response = graphql_request.execute(&schema, &context);
|
||||
Ok((serde_json::to_vec(&response)?, response.is_ok()))
|
||||
})
|
||||
}).and_then(|result| ::futures::future::done(Ok(build_response(result))))
|
||||
.map_err(|_: tokio_threadpool::BlockingError| warp::reject::server_error()),
|
||||
)
|
||||
};
|
||||
|
||||
let get_filter = warp::get2()
|
||||
.and(context_extractor.clone())
|
||||
.and(warp::filters::query::query())
|
||||
.and(pool_filter)
|
||||
.and_then(handle_get_request);
|
||||
|
||||
get_filter.or(post_filter).unify().boxed()
|
||||
|
@ -284,6 +263,9 @@ fn build_response(
|
|||
}
|
||||
}
|
||||
|
||||
type Response =
|
||||
Box<Future<Item = warp::http::Response<Vec<u8>>, Error = warp::reject::Rejection> + Send>;
|
||||
|
||||
/// Create a filter that replies with an HTML page containing GraphiQL. This does not handle routing, so you can mount it on any endpoint.
|
||||
///
|
||||
/// For example:
|
||||
|
|
Loading…
Reference in a new issue