From 813c0d12102f010c23962f54609cdfc61332ed9a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 17 Nov 2023 13:04:37 +0000 Subject: [PATCH] Upgrade `juniper_hyper` to 1.0 `hyper` (#1217) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Kai Ren --- juniper_hyper/CHANGELOG.md | 3 + juniper_hyper/Cargo.toml | 12 +- juniper_hyper/examples/hyper_server.rs | 81 ++++++++------ juniper_hyper/src/lib.rs | 146 ++++++++++++++----------- 4 files changed, 137 insertions(+), 105 deletions(-) diff --git a/juniper_hyper/CHANGELOG.md b/juniper_hyper/CHANGELOG.md index 435c134e..f30500a2 100644 --- a/juniper_hyper/CHANGELOG.md +++ b/juniper_hyper/CHANGELOG.md @@ -11,10 +11,12 @@ All user visible changes to `juniper_hyper` crate will be documented in this fil ### BC Breaks - Switched to 0.16 version of [`juniper` crate]. +- Switched to 1 version of [`hyper` crate]. ([#1217]) - Changed return type of all functions from `Response` to `Response`. ([#1101], [#1096]) [#1096]: /../../issues/1096 [#1101]: /../../pull/1101 +[#1217]: /../../pull/1217 @@ -27,4 +29,5 @@ See [old CHANGELOG](/../../blob/juniper_hyper-v0.8.0/juniper_hyper/CHANGELOG.md) [`juniper` crate]: https://docs.rs/juniper +[`hyper` crate]: https://docs.rs/hyper [Semantic Versioning 2.0.0]: https://semver.org diff --git a/juniper_hyper/Cargo.toml b/juniper_hyper/Cargo.toml index 60effdcf..221c1efd 100644 --- a/juniper_hyper/Cargo.toml +++ b/juniper_hyper/Cargo.toml @@ -16,18 +16,18 @@ exclude = ["/examples/", "/release.toml"] [dependencies] futures = "0.3.22" -hyper = { version = "0.14.7", features = ["server", "runtime"] } +http-body-util = "0.1" +hyper = { version = "1.0", features = ["server"] } juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false } serde_json = "1.0.18" tokio = "1.0" url = "2.0" -# Fixes for `minimal-versions` check. -# TODO: Try remove on upgrade of `hyper` crate. -http-body = "0.4.5" - [dev-dependencies] +hyper = { version = "1.0", features = ["http1"] } +hyper-util = { version = "0.1", features = ["tokio"] } juniper = { version = "0.16.0-dev", path = "../juniper", features = ["expose-test-schema"] } +log = "0.4" pretty_env_logger = "0.5" reqwest = { version = "0.11", features = ["blocking", "rustls-tls"], default-features = false } -tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1.0", features = ["macros", "net", "rt-multi-thread"] } diff --git a/juniper_hyper/examples/hyper_server.rs b/juniper_hyper/examples/hyper_server.rs index 069e0c6f..d0f46022 100644 --- a/juniper_hyper/examples/hyper_server.rs +++ b/juniper_hyper/examples/hyper_server.rs @@ -1,21 +1,19 @@ -use std::{convert::Infallible, sync::Arc}; +use std::{convert::Infallible, env, error::Error, net::SocketAddr, sync::Arc}; -use hyper::{ - server::Server, - service::{make_service_fn, service_fn}, - Method, Response, StatusCode, -}; +use hyper::{server::conn::http1, service::service_fn, Method, Response, StatusCode}; +use hyper_util::rt::TokioIo; use juniper::{ tests::fixtures::starwars::schema::{Database, Query}, EmptyMutation, EmptySubscription, RootNode, }; +use juniper_hyper::{graphiql, graphql, playground}; +use tokio::net::TcpListener; #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { + env::set_var("RUST_LOG", "info"); pretty_env_logger::init(); - let addr = ([127, 0, 0, 1], 3000).into(); - let db = Arc::new(Database::new()); let root_node = Arc::new(RootNode::new( Query, @@ -23,35 +21,46 @@ async fn main() { EmptySubscription::::new(), )); - let new_service = make_service_fn(move |_| { + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + let listener = TcpListener::bind(addr).await?; + log::info!("Listening on http://{addr}"); + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + let root_node = root_node.clone(); - let ctx = db.clone(); + let db = db.clone(); - async { - Ok::<_, hyper::Error>(service_fn(move |req| { - let root_node = root_node.clone(); - let ctx = ctx.clone(); - async { - Ok::<_, Infallible>(match (req.method(), req.uri().path()) { - (&Method::GET, "/") => juniper_hyper::graphiql("/graphql", None).await, - (&Method::GET, "/graphql") | (&Method::POST, "/graphql") => { - juniper_hyper::graphql(root_node, ctx, req).await + tokio::spawn(async move { + let root_node = root_node.clone(); + let db = db.clone(); + + if let Err(e) = http1::Builder::new() + .serve_connection( + io, + service_fn(move |req| { + let root_node = root_node.clone(); + let db = db.clone(); + async { + Ok::<_, Infallible>(match (req.method(), req.uri().path()) { + (&Method::GET, "/graphql") | (&Method::POST, "/graphql") => { + graphql(root_node, db, req).await + } + (&Method::GET, "/graphiql") => graphiql("/graphql", None).await, + (&Method::GET, "/playground") => playground("/graphql", None).await, + _ => { + let mut resp = Response::new(String::new()); + *resp.status_mut() = StatusCode::NOT_FOUND; + resp + } + }) } - _ => { - let mut response = Response::new(String::new()); - *response.status_mut() = StatusCode::NOT_FOUND; - response - } - }) - } - })) - } - }); - - let server = Server::bind(&addr).serve(new_service); - println!("Listening on http://{addr}"); - - if let Err(e) = server.await { - eprintln!("server error: {e}") + }), + ) + .await + { + log::error!("Error serving connection: {e}"); + } + }); } } diff --git a/juniper_hyper/src/lib.rs b/juniper_hyper/src/lib.rs index 074cf850..d3969630 100644 --- a/juniper_hyper/src/lib.rs +++ b/juniper_hyper/src/lib.rs @@ -2,9 +2,11 @@ use std::{error::Error, fmt, string::FromUtf8Error, sync::Arc}; +use http_body_util::BodyExt as _; use hyper::{ + body, header::{self, HeaderValue}, - Body, Method, Request, Response, StatusCode, + Method, Request, Response, StatusCode, }; use juniper::{ http::{GraphQLBatchRequest, GraphQLRequest as JuniperGraphQLRequest, GraphQLRequest}, @@ -16,7 +18,7 @@ use url::form_urlencoded; pub async fn graphql_sync( root_node: Arc>, context: Arc, - req: Request, + req: Request, ) -> Response where QueryT: GraphQLType, @@ -37,7 +39,7 @@ where pub async fn graphql( root_node: Arc>, context: Arc, - req: Request, + req: Request, ) -> Response where QueryT: GraphQLTypeAsync, @@ -56,7 +58,7 @@ where } async fn parse_req( - req: Request, + req: Request, ) -> Result, Response> { match *req.method() { Method::GET => parse_get_req(req), @@ -77,7 +79,7 @@ async fn parse_req( } fn parse_get_req( - req: Request, + req: Request, ) -> Result, GraphQLRequestError> { req.uri() .query() @@ -90,13 +92,14 @@ fn parse_get_req( } async fn parse_post_json_req( - body: Body, + body: body::Incoming, ) -> Result, GraphQLRequestError> { - let chunk = hyper::body::to_bytes(body) + let chunk = body + .collect() .await .map_err(GraphQLRequestError::BodyHyper)?; - let input = String::from_utf8(chunk.iter().cloned().collect()) + let input = String::from_utf8(chunk.to_bytes().iter().cloned().collect()) .map_err(GraphQLRequestError::BodyUtf8)?; serde_json::from_str::>(&input) @@ -104,13 +107,14 @@ async fn parse_post_json_req( } async fn parse_post_graphql_req( - body: Body, + body: body::Incoming, ) -> Result, GraphQLRequestError> { - let chunk = hyper::body::to_bytes(body) + let chunk = body + .collect() .await .map_err(GraphQLRequestError::BodyHyper)?; - let query = String::from_utf8(chunk.iter().cloned().collect()) + let query = String::from_utf8(chunk.to_bytes().iter().cloned().collect()) .map_err(GraphQLRequestError::BodyUtf8)?; Ok(GraphQLBatchRequest::Single(GraphQLRequest::new( @@ -306,18 +310,19 @@ impl Error for GraphQLRequestError { #[cfg(test)] mod tests { - use hyper::{ - server::Server, - service::{make_service_fn, service_fn}, - Method, Response, StatusCode, + use std::{ + convert::Infallible, error::Error, net::SocketAddr, panic, sync::Arc, time::Duration, }; + + use hyper::{server::conn::http1, service::service_fn, Method, Response, StatusCode}; + use hyper_util::rt::TokioIo; use juniper::{ http::tests as http_tests, tests::fixtures::starwars::schema::{Database, Query}, EmptyMutation, EmptySubscription, RootNode, }; - use reqwest::{self, blocking::Response as ReqwestResponse}; - use std::{convert::Infallible, net::SocketAddr, sync::Arc, thread, time::Duration}; + use reqwest::blocking::Response as ReqwestResponse; + use tokio::{net::TcpListener, task, time::sleep}; struct TestHyperIntegration { port: u16, @@ -373,7 +378,7 @@ mod tests { async fn run_hyper_integration(is_sync: bool) { let port = if is_sync { 3002 } else { 3001 }; - let addr: SocketAddr = ([127, 0, 0, 1], port).into(); + let addr = SocketAddr::from(([127, 0, 0, 1], port)); let db = Arc::new(Database::new()); let root_node = Arc::new(RootNode::new( @@ -382,59 +387,74 @@ mod tests { EmptySubscription::::new(), )); - let new_service = make_service_fn(move |_| { - let root_node = root_node.clone(); - let ctx = db.clone(); + let server: task::JoinHandle>> = + task::spawn(async move { + let listener = TcpListener::bind(addr).await?; + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); - async move { - Ok::<_, hyper::Error>(service_fn(move |req| { let root_node = root_node.clone(); - let ctx = ctx.clone(); - let matches = { - let path = req.uri().path(); - match req.method() { - &Method::POST | &Method::GET => { - path == "/graphql" || path == "/graphql/" - } - _ => false, + let db = db.clone(); + + _ = task::spawn(async move { + let root_node = root_node.clone(); + let db = db.clone(); + + if let Err(e) = http1::Builder::new() + .serve_connection( + io, + service_fn(move |req| { + let root_node = root_node.clone(); + let db = db.clone(); + let matches = { + let path = req.uri().path(); + match req.method() { + &Method::POST | &Method::GET => { + path == "/graphql" || path == "/graphql/" + } + _ => false, + } + }; + async move { + Ok::<_, Infallible>(if matches { + if is_sync { + super::graphql_sync(root_node, db, req).await + } else { + super::graphql(root_node, db, req).await + } + } else { + let mut resp = Response::new(String::new()); + *resp.status_mut() = StatusCode::NOT_FOUND; + resp + }) + } + }), + ) + .await + { + eprintln!("server error: {e}"); } - }; - async move { - Ok::<_, Infallible>(if matches { - if is_sync { - super::graphql_sync(root_node, ctx, req).await - } else { - super::graphql(root_node, ctx, req).await - } - } else { - let mut resp = Response::new(String::new()); - *resp.status_mut() = StatusCode::NOT_FOUND; - resp - }) - } - })) - } - }); - - let (shutdown_fut, shutdown) = futures::future::abortable(async { - tokio::time::sleep(Duration::from_secs(60)).await; - }); - - let server = Server::bind(&addr) - .serve(new_service) - .with_graceful_shutdown(async { - shutdown_fut.await.unwrap_err(); + }); + } }); - tokio::task::spawn_blocking(move || { - thread::sleep(Duration::from_millis(10)); // wait 10ms for server to bind + sleep(Duration::from_secs(10)).await; // wait 10ms for `server` to bind + + match task::spawn_blocking(move || { let integration = TestHyperIntegration { port }; http_tests::run_http_test_suite(&integration); - shutdown.abort(); - }); + }) + .await + { + Err(f) if f.is_panic() => panic::resume_unwind(f.into_panic()), + Ok(()) | Err(_) => {} + } - if let Err(e) = server.await { - eprintln!("server error: {e}"); + server.abort(); + if let Ok(Err(e)) = server.await { + panic!("server failed: {e}"); } }