Update juniper_hyper to hyper 0.13 and add async resolution (#505)

This involves updating to futures 0.3, tokio 0.2 stable
This commit is contained in:
Genna Wingert 2020-02-13 07:48:28 +01:00 committed by GitHub
parent 7ee67ed6e8
commit 4ccb129fa2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 279 additions and 170 deletions

View file

@ -9,8 +9,7 @@ repository = "https://github.com/graphql-rust/juniper"
edition = "2018" edition = "2018"
[features] [features]
# Fake feature to help CI. async = ["juniper/async", "futures"]
async = []
[dependencies] [dependencies]
serde = "1.0" serde = "1.0"
@ -18,11 +17,9 @@ serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
url = "2" url = "2"
juniper = { version = "0.14.2", default-features = false, path = "../juniper"} juniper = { version = "0.14.2", default-features = false, path = "../juniper"}
tokio = "0.2"
futures = "0.1" hyper = "0.13"
tokio = "0.1.8" futures = { version = "0.3", optional = true }
hyper = "0.12"
tokio-threadpool = "0.1.7"
[dev-dependencies] [dev-dependencies]
pretty_env_logger = "0.2" pretty_env_logger = "0.2"
@ -32,3 +29,7 @@ reqwest = "0.9"
version = "0.14.2" version = "0.14.2"
features = ["expose-test-schema", "serde_json"] features = ["expose-test-schema", "serde_json"]
path = "../juniper" path = "../juniper"
[dev-dependencies.tokio]
version = "0.2"
features = ["macros"]

View file

@ -1,13 +1,10 @@
extern crate futures;
extern crate hyper; extern crate hyper;
extern crate juniper; extern crate juniper;
extern crate juniper_hyper; extern crate juniper_hyper;
extern crate pretty_env_logger; extern crate pretty_env_logger;
use futures::future;
use hyper::{ use hyper::{
rt::{self, Future}, service::{make_service_fn, service_fn},
service::service_fn,
Body, Method, Response, Server, StatusCode, Body, Method, Response, Server, StatusCode,
}; };
use juniper::{ use juniper::{
@ -16,7 +13,8 @@ use juniper::{
}; };
use std::sync::Arc; use std::sync::Arc;
fn main() { #[tokio::main]
async fn main() {
pretty_env_logger::init(); pretty_env_logger::init();
let addr = ([127, 0, 0, 1], 3000).into(); let addr = ([127, 0, 0, 1], 3000).into();
@ -24,30 +22,35 @@ fn main() {
let db = Arc::new(Database::new()); let db = Arc::new(Database::new());
let root_node = Arc::new(RootNode::new(Query, EmptyMutation::<Database>::new())); let root_node = Arc::new(RootNode::new(Query, EmptyMutation::<Database>::new()));
let new_service = move || { let new_service = make_service_fn(move |_| {
let root_node = root_node.clone(); let root_node = root_node.clone();
let ctx = db.clone(); let ctx = db.clone();
service_fn(move |req| -> Box<dyn Future<Item = _, Error = _> + Send> {
let root_node = root_node.clone(); async move {
let ctx = ctx.clone(); Ok::<_, hyper::Error>(service_fn(move |req| {
match (req.method(), req.uri().path()) { let root_node = root_node.clone();
(&Method::GET, "/") => Box::new(juniper_hyper::graphiql("/graphql")), let ctx = ctx.clone();
(&Method::GET, "/graphql") => Box::new(juniper_hyper::graphql(root_node, ctx, req)), async move {
(&Method::POST, "/graphql") => { match (req.method(), req.uri().path()) {
Box::new(juniper_hyper::graphql(root_node, ctx, req)) (&Method::GET, "/") => juniper_hyper::graphiql("/graphql").await,
(&Method::GET, "/graphql") | (&Method::POST, "/graphql") => {
juniper_hyper::graphql(root_node, ctx, req).await
}
_ => {
let mut response = Response::new(Body::empty());
*response.status_mut() = StatusCode::NOT_FOUND;
Ok(response)
}
}
} }
_ => { }))
let mut response = Response::new(Body::empty()); }
*response.status_mut() = StatusCode::NOT_FOUND; });
Box::new(future::ok(response))
} let server = Server::bind(&addr).serve(new_service);
}
})
};
let server = Server::bind(&addr)
.serve(new_service)
.map_err(|e| eprintln!("server error: {}", e));
println!("Listening on http://{}", addr); println!("Listening on http://{}", addr);
rt::run(server); if let Err(e) = server.await {
eprintln!("server error: {}", e)
}
} }

View file

@ -3,26 +3,27 @@
#[cfg(test)] #[cfg(test)]
extern crate reqwest; extern crate reqwest;
use futures::future::Either; #[cfg(feature = "async")]
use futures;
use hyper::{ use hyper::{
header::{self, HeaderValue}, header::{self, HeaderValue},
rt::Stream,
Body, Method, Request, Response, StatusCode, Body, Method, Request, Response, StatusCode,
}; };
#[cfg(feature = "async")]
use juniper::GraphQLTypeAsync;
use juniper::{ use juniper::{
http::GraphQLRequest as JuniperGraphQLRequest, serde::Deserialize, DefaultScalarValue, http::GraphQLRequest as JuniperGraphQLRequest, serde::Deserialize, DefaultScalarValue,
GraphQLType, InputValue, RootNode, ScalarValue, GraphQLType, InputValue, RootNode, ScalarValue,
}; };
use serde_json::error::Error as SerdeError; use serde_json::error::Error as SerdeError;
use std::{error::Error, fmt, string::FromUtf8Error, sync::Arc}; use std::{error::Error, fmt, string::FromUtf8Error, sync::Arc};
use tokio::prelude::*;
use url::form_urlencoded; use url::form_urlencoded;
pub fn graphql<CtxT, QueryT, MutationT, S>( pub async fn graphql<CtxT, QueryT, MutationT, S>(
root_node: Arc<RootNode<'static, QueryT, MutationT, S>>, root_node: Arc<RootNode<'static, QueryT, MutationT, S>>,
context: Arc<CtxT>, context: Arc<CtxT>,
request: Request<Body>, request: Request<Body>,
) -> impl Future<Item = Response<Body>, Error = hyper::Error> ) -> Result<Response<Body>, hyper::Error>
where where
S: ScalarValue + Send + Sync + 'static, S: ScalarValue + Send + Sync + 'static,
CtxT: Send + Sync + 'static, CtxT: Send + Sync + 'static,
@ -32,68 +33,100 @@ where
MutationT::TypeInfo: Send + Sync, MutationT::TypeInfo: Send + Sync,
{ {
match request.method() { match request.method() {
&Method::GET => Either::A(Either::A( &Method::GET => {
future::done( let gql_req = parse_get_req(request);
request
.uri() match gql_req {
.query() Ok(gql_req) => Ok(execute_request(root_node, context, gql_req).await),
.map(|q| gql_request_from_get(q).map(GraphQLRequest::Single)) Err(err) => Ok(render_error(err)),
.unwrap_or_else(|| { }
Err(GraphQLRequestError::Invalid( }
"'query' parameter is missing".to_string(), &Method::POST => {
)) let gql_req = parse_post_req(request.into_body()).await;
}),
) match gql_req {
.and_then(move |gql_req| { Ok(gql_req) => Ok(execute_request(root_node, context, gql_req).await),
execute_request(root_node, context, gql_req).map_err(|_| { Err(err) => Ok(render_error(err)),
unreachable!("thread pool has shut down?!"); }
}) }
}) _ => Ok(new_response(StatusCode::METHOD_NOT_ALLOWED)),
.or_else(|err| future::ok(render_error(err))),
)),
&Method::POST => Either::A(Either::B(
request
.into_body()
.concat2()
.or_else(|err| future::done(Err(GraphQLRequestError::BodyHyper(err))))
.and_then(move |chunk| {
future::done({
String::from_utf8(chunk.iter().cloned().collect::<Vec<u8>>())
.map_err(GraphQLRequestError::BodyUtf8)
.and_then(|input| {
serde_json::from_str::<GraphQLRequest<S>>(&input)
.map_err(GraphQLRequestError::BodyJSONError)
})
})
})
.and_then(move |gql_req| {
execute_request(root_node, context, gql_req).map_err(|_| {
unreachable!("thread pool has shut down?!");
})
})
.or_else(|err| future::ok(render_error(err))),
)),
_ => return Either::B(future::ok(new_response(StatusCode::METHOD_NOT_ALLOWED))),
} }
} }
pub fn graphiql( #[cfg(feature = "async")]
graphql_endpoint: &str, pub async fn graphql_async<CtxT, QueryT, MutationT, S>(
) -> impl Future<Item = Response<Body>, Error = hyper::Error> { root_node: Arc<RootNode<'static, QueryT, MutationT, S>>,
context: Arc<CtxT>,
request: Request<Body>,
) -> Result<Response<Body>, hyper::Error>
where
S: ScalarValue + Send + Sync + 'static,
CtxT: Send + Sync + 'static,
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + Sync + 'static,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + Sync + 'static,
QueryT::TypeInfo: Send + Sync,
MutationT::TypeInfo: Send + Sync,
{
match request.method() {
&Method::GET => {
let gql_req = parse_get_req(request);
match gql_req {
Ok(gql_req) => Ok(execute_request_async(root_node, context, gql_req).await),
Err(err) => Ok(render_error(err)),
}
}
&Method::POST => {
let gql_req = parse_post_req(request.into_body()).await;
match gql_req {
Ok(gql_req) => Ok(execute_request_async(root_node, context, gql_req).await),
Err(err) => Ok(render_error(err)),
}
}
_ => Ok(new_response(StatusCode::METHOD_NOT_ALLOWED)),
}
}
fn parse_get_req<S: ScalarValue>(
req: Request<Body>,
) -> Result<GraphQLRequest<S>, GraphQLRequestError> {
req.uri()
.query()
.map(|q| gql_request_from_get(q).map(GraphQLRequest::Single))
.unwrap_or_else(|| {
Err(GraphQLRequestError::Invalid(
"'query' parameter is missing".to_string(),
))
})
}
async fn parse_post_req<S: ScalarValue>(
body: Body,
) -> Result<GraphQLRequest<S>, GraphQLRequestError> {
let chunk = hyper::body::to_bytes(body)
.await
.map_err(|err| GraphQLRequestError::BodyHyper(err))?;
let input = String::from_utf8(chunk.iter().cloned().collect())
.map_err(GraphQLRequestError::BodyUtf8)?;
serde_json::from_str::<GraphQLRequest<S>>(&input).map_err(GraphQLRequestError::BodyJSONError)
}
pub async fn graphiql(graphql_endpoint: &str) -> Result<Response<Body>, hyper::Error> {
let mut resp = new_html_response(StatusCode::OK); let mut resp = new_html_response(StatusCode::OK);
// XXX: is the call to graphiql_source blocking? // XXX: is the call to graphiql_source blocking?
*resp.body_mut() = Body::from(juniper::graphiql::graphiql_source(graphql_endpoint)); *resp.body_mut() = Body::from(juniper::graphiql::graphiql_source(graphql_endpoint));
future::ok(resp) Ok(resp)
} }
pub fn playground( pub async fn playground(graphql_endpoint: &str) -> Result<Response<Body>, hyper::Error> {
graphql_endpoint: &str,
) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
let mut resp = new_html_response(StatusCode::OK); let mut resp = new_html_response(StatusCode::OK);
*resp.body_mut() = Body::from(juniper::http::playground::playground_source( *resp.body_mut() = Body::from(juniper::http::playground::playground_source(
graphql_endpoint, graphql_endpoint,
)); ));
future::ok(resp) Ok(resp)
} }
fn render_error(err: GraphQLRequestError) -> Response<Body> { fn render_error(err: GraphQLRequestError) -> Response<Body> {
@ -103,11 +136,11 @@ fn render_error(err: GraphQLRequestError) -> Response<Body> {
resp resp
} }
fn execute_request<CtxT, QueryT, MutationT, S>( async fn execute_request<CtxT, QueryT, MutationT, S>(
root_node: Arc<RootNode<'static, QueryT, MutationT, S>>, root_node: Arc<RootNode<'static, QueryT, MutationT, S>>,
context: Arc<CtxT>, context: Arc<CtxT>,
request: GraphQLRequest<S>, request: GraphQLRequest<S>,
) -> impl Future<Item = Response<Body>, Error = tokio_threadpool::BlockingError> ) -> Response<Body>
where where
S: ScalarValue + Send + Sync + 'static, S: ScalarValue + Send + Sync + 'static,
CtxT: Send + Sync + 'static, CtxT: Send + Sync + 'static,
@ -116,20 +149,48 @@ where
QueryT::TypeInfo: Send + Sync, QueryT::TypeInfo: Send + Sync,
MutationT::TypeInfo: Send + Sync, MutationT::TypeInfo: Send + Sync,
{ {
request.execute(root_node, context).map(|(is_ok, body)| { let (is_ok, body) = request.execute(root_node, context);
let code = if is_ok { let code = if is_ok {
StatusCode::OK StatusCode::OK
} else { } else {
StatusCode::BAD_REQUEST StatusCode::BAD_REQUEST
}; };
let mut resp = new_response(code); let mut resp = new_response(code);
resp.headers_mut().insert( resp.headers_mut().insert(
header::CONTENT_TYPE, header::CONTENT_TYPE,
HeaderValue::from_static("application/json"), HeaderValue::from_static("application/json"),
); );
*resp.body_mut() = body; *resp.body_mut() = body;
resp resp
}) }
#[cfg(feature = "async")]
async fn execute_request_async<CtxT, QueryT, MutationT, S>(
root_node: Arc<RootNode<'static, QueryT, MutationT, S>>,
context: Arc<CtxT>,
request: GraphQLRequest<S>,
) -> Response<Body>
where
S: ScalarValue + Send + Sync + 'static,
CtxT: Send + Sync + 'static,
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + Sync + 'static,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + Sync + 'static,
QueryT::TypeInfo: Send + Sync,
MutationT::TypeInfo: Send + Sync,
{
let (is_ok, body) = request.execute_async(root_node, context).await;
let code = if is_ok {
StatusCode::OK
} else {
StatusCode::BAD_REQUEST
};
let mut resp = new_response(code);
resp.headers_mut().insert(
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
*resp.body_mut() = body;
resp
} }
fn gql_request_from_get<S>(input: &str) -> Result<JuniperGraphQLRequest<S>, GraphQLRequestError> fn gql_request_from_get<S>(input: &str) -> Result<JuniperGraphQLRequest<S>, GraphQLRequestError>
@ -215,45 +276,74 @@ where
self, self,
root_node: Arc<RootNode<'a, QueryT, MutationT, S>>, root_node: Arc<RootNode<'a, QueryT, MutationT, S>>,
context: Arc<CtxT>, context: Arc<CtxT>,
) -> impl Future<Item = (bool, hyper::Body), Error = tokio_threadpool::BlockingError> + 'a ) -> (bool, hyper::Body)
where where
S: 'a, S: 'a + Send + Sync,
QueryT: GraphQLType<S, Context = CtxT> + 'a, QueryT: GraphQLType<S, Context = CtxT> + 'a,
MutationT: GraphQLType<S, Context = CtxT> + 'a, MutationT: GraphQLType<S, Context = CtxT> + 'a,
{ {
match self { match self {
GraphQLRequest::Single(request) => Either::A(future::poll_fn(move || { GraphQLRequest::Single(request) => {
let res = futures::try_ready!(tokio_threadpool::blocking( let res = request.execute(&root_node, &context);
|| request.execute(&root_node, &context)
));
let is_ok = res.is_ok(); let is_ok = res.is_ok();
let body = Body::from(serde_json::to_string_pretty(&res).unwrap()); let body = Body::from(serde_json::to_string_pretty(&res).unwrap());
Ok(Async::Ready((is_ok, body))) (is_ok, body)
})), }
GraphQLRequest::Batch(requests) => { GraphQLRequest::Batch(requests) => {
Either::B( let results: Vec<_> = requests
future::join_all(requests.into_iter().map(move |request| { .into_iter()
// TODO: these clones are sad .map(move |request| {
let root_node = root_node.clone(); let root_node = root_node.clone();
let context = context.clone(); let res = request.execute(&root_node, &context);
future::poll_fn(move || { let is_ok = res.is_ok();
let res = futures::try_ready!(tokio_threadpool::blocking( let body = serde_json::to_string_pretty(&res).unwrap();
|| request.execute(&root_node, &context)
));
let is_ok = res.is_ok();
let body = serde_json::to_string_pretty(&res).unwrap();
Ok(Async::Ready((is_ok, body)))
})
}))
.map(|results| {
let is_ok = results.iter().all(|&(is_ok, _)| is_ok);
// concatenate json bodies as array
// TODO: maybe use Body chunks instead?
let bodies: Vec<_> = results.into_iter().map(|(_, body)| body).collect();
let body = hyper::Body::from(format!("[{}]", bodies.join(",")));
(is_ok, body) (is_ok, body)
}), })
) .collect();
let is_ok = !results.iter().any(|&(is_ok, _)| !is_ok);
let bodies: Vec<_> = results.into_iter().map(|(_, body)| body).collect();
let body = hyper::Body::from(format!("[{}]", bodies.join(",")));
(is_ok, body)
}
}
}
#[cfg(feature = "async")]
async fn execute_async<'a, CtxT: 'a, QueryT, MutationT>(
self,
root_node: Arc<RootNode<'a, QueryT, MutationT, S>>,
context: Arc<CtxT>,
) -> (bool, hyper::Body)
where
S: Send + Sync,
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + Sync,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + Sync,
QueryT::TypeInfo: Send + Sync,
MutationT::TypeInfo: Send + Sync,
CtxT: Send + Sync,
{
match self {
GraphQLRequest::Single(request) => {
let res = request.execute_async(&root_node, &context).await;
let is_ok = res.is_ok();
let body = Body::from(serde_json::to_string_pretty(&res).unwrap());
(is_ok, body)
}
GraphQLRequest::Batch(requests) => {
let futures = requests
.iter()
.map(|request| request.execute_async(&root_node, &context))
.collect::<Vec<_>>();
let results = futures::future::join_all(futures).await;
let is_ok = results.iter().all(|res| res.is_ok());
let bodies: Vec<_> = results
.into_iter()
.map(|res| serde_json::to_string_pretty(&res).unwrap())
.collect();
let body = hyper::Body::from(format!("[{}]", bodies.join(",")));
(is_ok, body)
} }
} }
} }
@ -301,22 +391,21 @@ impl Error for GraphQLRequestError {
} }
} }
} }
#[cfg(feature = "async")]
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::{ use futures;
future::{self, Either}, use hyper::{
Future, service::{make_service_fn, service_fn},
Body, Method, Response, Server, StatusCode,
}; };
use hyper::{service::service_fn, Body, Method, Response, Server, StatusCode};
use juniper::{ use juniper::{
http::tests as http_tests, http::tests as http_tests,
tests::{model::Database, schema::Query}, tests::{model::Database, schema::Query},
EmptyMutation, RootNode, EmptyMutation, RootNode,
}; };
use reqwest::{self, Response as ReqwestResponse}; use reqwest::{self, Response as ReqwestResponse};
use std::{sync::Arc, thread, time}; use std::{net::SocketAddr, sync::Arc, thread, time::Duration};
use tokio::runtime::Runtime;
struct TestHyperIntegration; struct TestHyperIntegration;
@ -355,46 +444,62 @@ mod tests {
} }
} }
#[test] #[tokio::test]
fn test_hyper_integration() { async fn test_hyper_integration() {
let addr = ([127, 0, 0, 1], 3001).into(); let addr: SocketAddr = ([127, 0, 0, 1], 3001).into();
let db = Arc::new(Database::new()); let db = Arc::new(Database::new());
let root_node = Arc::new(RootNode::new(Query, EmptyMutation::<Database>::new())); let root_node = Arc::new(RootNode::new(Query, EmptyMutation::<Database>::new()));
let new_service = move || { let new_service = make_service_fn(move |_| {
let root_node = root_node.clone(); let root_node = root_node.clone();
let ctx = db.clone(); let ctx = db.clone();
service_fn(move |req| {
let root_node = root_node.clone(); async move {
let ctx = ctx.clone(); Ok::<_, hyper::Error>(service_fn(move |req| {
let matches = { let root_node = root_node.clone();
let path = req.uri().path(); let ctx = ctx.clone();
match req.method() { let matches = {
&Method::POST | &Method::GET => path == "/graphql" || path == "/graphql/", let path = req.uri().path();
_ => false, match req.method() {
&Method::POST | &Method::GET => {
path == "/graphql" || path == "/graphql/"
}
_ => false,
}
};
async move {
if matches {
super::graphql(root_node, ctx, req).await
} else {
let mut response = Response::new(Body::empty());
*response.status_mut() = StatusCode::NOT_FOUND;
Ok(response)
}
} }
}; }))
if matches { }
Either::A(super::graphql(root_node, ctx, req)) });
} else {
let mut response = Response::new(Body::empty()); let (shutdown_fut, shutdown) = futures::future::abortable(async {
*response.status_mut() = StatusCode::NOT_FOUND; tokio::time::delay_for(Duration::from_secs(60)).await;
Either::B(future::ok(response)) });
}
})
};
let server = Server::bind(&addr) let server = Server::bind(&addr)
.serve(new_service) .serve(new_service)
.map_err(|e| eprintln!("server error: {}", e)); .with_graceful_shutdown(async {
shutdown_fut.await.unwrap_err();
});
let mut runtime = Runtime::new().unwrap(); tokio::task::spawn_blocking(move || {
runtime.spawn(server); thread::sleep(Duration::from_millis(10)); // wait 10ms for server to bind
thread::sleep(time::Duration::from_millis(10)); // wait 10ms for server to bind let integration = TestHyperIntegration;
http_tests::run_http_test_suite(&integration);
shutdown.abort();
});
let integration = TestHyperIntegration; if let Err(e) = server.await {
http_tests::run_http_test_suite(&integration); eprintln!("server error: {}", e);
}
runtime.shutdown_now().wait().unwrap();
} }
} }