diff --git a/juniper_hyper/CHANGELOG.md b/juniper_hyper/CHANGELOG.md
index 435c134e..f30500a2 100644
--- a/juniper_hyper/CHANGELOG.md
+++ b/juniper_hyper/CHANGELOG.md
@@ -11,10 +11,12 @@ All user visible changes to `juniper_hyper` crate will be documented in this fil
### BC Breaks
- Switched to 0.16 version of [`juniper` crate].
+- Switched to 1 version of [`hyper` crate]. ([#1217])
- Changed return type of all functions from `Response
` to `Response`. ([#1101], [#1096])
[#1096]: /../../issues/1096
[#1101]: /../../pull/1101
+[#1217]: /../../pull/1217
@@ -27,4 +29,5 @@ See [old CHANGELOG](/../../blob/juniper_hyper-v0.8.0/juniper_hyper/CHANGELOG.md)
[`juniper` crate]: https://docs.rs/juniper
+[`hyper` crate]: https://docs.rs/hyper
[Semantic Versioning 2.0.0]: https://semver.org
diff --git a/juniper_hyper/Cargo.toml b/juniper_hyper/Cargo.toml
index 60effdcf..221c1efd 100644
--- a/juniper_hyper/Cargo.toml
+++ b/juniper_hyper/Cargo.toml
@@ -16,18 +16,18 @@ exclude = ["/examples/", "/release.toml"]
[dependencies]
futures = "0.3.22"
-hyper = { version = "0.14.7", features = ["server", "runtime"] }
+http-body-util = "0.1"
+hyper = { version = "1.0", features = ["server"] }
juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false }
serde_json = "1.0.18"
tokio = "1.0"
url = "2.0"
-# Fixes for `minimal-versions` check.
-# TODO: Try remove on upgrade of `hyper` crate.
-http-body = "0.4.5"
-
[dev-dependencies]
+hyper = { version = "1.0", features = ["http1"] }
+hyper-util = { version = "0.1", features = ["tokio"] }
juniper = { version = "0.16.0-dev", path = "../juniper", features = ["expose-test-schema"] }
+log = "0.4"
pretty_env_logger = "0.5"
reqwest = { version = "0.11", features = ["blocking", "rustls-tls"], default-features = false }
-tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
+tokio = { version = "1.0", features = ["macros", "net", "rt-multi-thread"] }
diff --git a/juniper_hyper/examples/hyper_server.rs b/juniper_hyper/examples/hyper_server.rs
index 069e0c6f..d0f46022 100644
--- a/juniper_hyper/examples/hyper_server.rs
+++ b/juniper_hyper/examples/hyper_server.rs
@@ -1,21 +1,19 @@
-use std::{convert::Infallible, sync::Arc};
+use std::{convert::Infallible, env, error::Error, net::SocketAddr, sync::Arc};
-use hyper::{
- server::Server,
- service::{make_service_fn, service_fn},
- Method, Response, StatusCode,
-};
+use hyper::{server::conn::http1, service::service_fn, Method, Response, StatusCode};
+use hyper_util::rt::TokioIo;
use juniper::{
tests::fixtures::starwars::schema::{Database, Query},
EmptyMutation, EmptySubscription, RootNode,
};
+use juniper_hyper::{graphiql, graphql, playground};
+use tokio::net::TcpListener;
#[tokio::main]
-async fn main() {
+async fn main() -> Result<(), Box> {
+ env::set_var("RUST_LOG", "info");
pretty_env_logger::init();
- let addr = ([127, 0, 0, 1], 3000).into();
-
let db = Arc::new(Database::new());
let root_node = Arc::new(RootNode::new(
Query,
@@ -23,35 +21,46 @@ async fn main() {
EmptySubscription::::new(),
));
- let new_service = make_service_fn(move |_| {
+ let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
+ let listener = TcpListener::bind(addr).await?;
+ log::info!("Listening on http://{addr}");
+ loop {
+ let (stream, _) = listener.accept().await?;
+ let io = TokioIo::new(stream);
+
let root_node = root_node.clone();
- let ctx = db.clone();
+ let db = db.clone();
- async {
- Ok::<_, hyper::Error>(service_fn(move |req| {
- let root_node = root_node.clone();
- let ctx = ctx.clone();
- async {
- Ok::<_, Infallible>(match (req.method(), req.uri().path()) {
- (&Method::GET, "/") => juniper_hyper::graphiql("/graphql", None).await,
- (&Method::GET, "/graphql") | (&Method::POST, "/graphql") => {
- juniper_hyper::graphql(root_node, ctx, req).await
+ tokio::spawn(async move {
+ let root_node = root_node.clone();
+ let db = db.clone();
+
+ if let Err(e) = http1::Builder::new()
+ .serve_connection(
+ io,
+ service_fn(move |req| {
+ let root_node = root_node.clone();
+ let db = db.clone();
+ async {
+ Ok::<_, Infallible>(match (req.method(), req.uri().path()) {
+ (&Method::GET, "/graphql") | (&Method::POST, "/graphql") => {
+ graphql(root_node, db, req).await
+ }
+ (&Method::GET, "/graphiql") => graphiql("/graphql", None).await,
+ (&Method::GET, "/playground") => playground("/graphql", None).await,
+ _ => {
+ let mut resp = Response::new(String::new());
+ *resp.status_mut() = StatusCode::NOT_FOUND;
+ resp
+ }
+ })
}
- _ => {
- let mut response = Response::new(String::new());
- *response.status_mut() = StatusCode::NOT_FOUND;
- response
- }
- })
- }
- }))
- }
- });
-
- let server = Server::bind(&addr).serve(new_service);
- println!("Listening on http://{addr}");
-
- if let Err(e) = server.await {
- eprintln!("server error: {e}")
+ }),
+ )
+ .await
+ {
+ log::error!("Error serving connection: {e}");
+ }
+ });
}
}
diff --git a/juniper_hyper/src/lib.rs b/juniper_hyper/src/lib.rs
index 074cf850..d3969630 100644
--- a/juniper_hyper/src/lib.rs
+++ b/juniper_hyper/src/lib.rs
@@ -2,9 +2,11 @@
use std::{error::Error, fmt, string::FromUtf8Error, sync::Arc};
+use http_body_util::BodyExt as _;
use hyper::{
+ body,
header::{self, HeaderValue},
- Body, Method, Request, Response, StatusCode,
+ Method, Request, Response, StatusCode,
};
use juniper::{
http::{GraphQLBatchRequest, GraphQLRequest as JuniperGraphQLRequest, GraphQLRequest},
@@ -16,7 +18,7 @@ use url::form_urlencoded;
pub async fn graphql_sync(
root_node: Arc>,
context: Arc,
- req: Request,
+ req: Request,
) -> Response
where
QueryT: GraphQLType,
@@ -37,7 +39,7 @@ where
pub async fn graphql(
root_node: Arc>,
context: Arc,
- req: Request,
+ req: Request,
) -> Response
where
QueryT: GraphQLTypeAsync,
@@ -56,7 +58,7 @@ where
}
async fn parse_req(
- req: Request,
+ req: Request,
) -> Result, Response> {
match *req.method() {
Method::GET => parse_get_req(req),
@@ -77,7 +79,7 @@ async fn parse_req(
}
fn parse_get_req(
- req: Request,
+ req: Request,
) -> Result, GraphQLRequestError> {
req.uri()
.query()
@@ -90,13 +92,14 @@ fn parse_get_req(
}
async fn parse_post_json_req(
- body: Body,
+ body: body::Incoming,
) -> Result, GraphQLRequestError> {
- let chunk = hyper::body::to_bytes(body)
+ let chunk = body
+ .collect()
.await
.map_err(GraphQLRequestError::BodyHyper)?;
- let input = String::from_utf8(chunk.iter().cloned().collect())
+ let input = String::from_utf8(chunk.to_bytes().iter().cloned().collect())
.map_err(GraphQLRequestError::BodyUtf8)?;
serde_json::from_str::>(&input)
@@ -104,13 +107,14 @@ async fn parse_post_json_req(
}
async fn parse_post_graphql_req(
- body: Body,
+ body: body::Incoming,
) -> Result, GraphQLRequestError> {
- let chunk = hyper::body::to_bytes(body)
+ let chunk = body
+ .collect()
.await
.map_err(GraphQLRequestError::BodyHyper)?;
- let query = String::from_utf8(chunk.iter().cloned().collect())
+ let query = String::from_utf8(chunk.to_bytes().iter().cloned().collect())
.map_err(GraphQLRequestError::BodyUtf8)?;
Ok(GraphQLBatchRequest::Single(GraphQLRequest::new(
@@ -306,18 +310,19 @@ impl Error for GraphQLRequestError {
#[cfg(test)]
mod tests {
- use hyper::{
- server::Server,
- service::{make_service_fn, service_fn},
- Method, Response, StatusCode,
+ use std::{
+ convert::Infallible, error::Error, net::SocketAddr, panic, sync::Arc, time::Duration,
};
+
+ use hyper::{server::conn::http1, service::service_fn, Method, Response, StatusCode};
+ use hyper_util::rt::TokioIo;
use juniper::{
http::tests as http_tests,
tests::fixtures::starwars::schema::{Database, Query},
EmptyMutation, EmptySubscription, RootNode,
};
- use reqwest::{self, blocking::Response as ReqwestResponse};
- use std::{convert::Infallible, net::SocketAddr, sync::Arc, thread, time::Duration};
+ use reqwest::blocking::Response as ReqwestResponse;
+ use tokio::{net::TcpListener, task, time::sleep};
struct TestHyperIntegration {
port: u16,
@@ -373,7 +378,7 @@ mod tests {
async fn run_hyper_integration(is_sync: bool) {
let port = if is_sync { 3002 } else { 3001 };
- let addr: SocketAddr = ([127, 0, 0, 1], port).into();
+ let addr = SocketAddr::from(([127, 0, 0, 1], port));
let db = Arc::new(Database::new());
let root_node = Arc::new(RootNode::new(
@@ -382,59 +387,74 @@ mod tests {
EmptySubscription::::new(),
));
- let new_service = make_service_fn(move |_| {
- let root_node = root_node.clone();
- let ctx = db.clone();
+ let server: task::JoinHandle>> =
+ task::spawn(async move {
+ let listener = TcpListener::bind(addr).await?;
+
+ loop {
+ let (stream, _) = listener.accept().await?;
+ let io = TokioIo::new(stream);
- async move {
- Ok::<_, hyper::Error>(service_fn(move |req| {
let root_node = root_node.clone();
- let ctx = ctx.clone();
- let matches = {
- let path = req.uri().path();
- match req.method() {
- &Method::POST | &Method::GET => {
- path == "/graphql" || path == "/graphql/"
- }
- _ => false,
+ let db = db.clone();
+
+ _ = task::spawn(async move {
+ let root_node = root_node.clone();
+ let db = db.clone();
+
+ if let Err(e) = http1::Builder::new()
+ .serve_connection(
+ io,
+ service_fn(move |req| {
+ let root_node = root_node.clone();
+ let db = db.clone();
+ let matches = {
+ let path = req.uri().path();
+ match req.method() {
+ &Method::POST | &Method::GET => {
+ path == "/graphql" || path == "/graphql/"
+ }
+ _ => false,
+ }
+ };
+ async move {
+ Ok::<_, Infallible>(if matches {
+ if is_sync {
+ super::graphql_sync(root_node, db, req).await
+ } else {
+ super::graphql(root_node, db, req).await
+ }
+ } else {
+ let mut resp = Response::new(String::new());
+ *resp.status_mut() = StatusCode::NOT_FOUND;
+ resp
+ })
+ }
+ }),
+ )
+ .await
+ {
+ eprintln!("server error: {e}");
}
- };
- async move {
- Ok::<_, Infallible>(if matches {
- if is_sync {
- super::graphql_sync(root_node, ctx, req).await
- } else {
- super::graphql(root_node, ctx, req).await
- }
- } else {
- let mut resp = Response::new(String::new());
- *resp.status_mut() = StatusCode::NOT_FOUND;
- resp
- })
- }
- }))
- }
- });
-
- let (shutdown_fut, shutdown) = futures::future::abortable(async {
- tokio::time::sleep(Duration::from_secs(60)).await;
- });
-
- let server = Server::bind(&addr)
- .serve(new_service)
- .with_graceful_shutdown(async {
- shutdown_fut.await.unwrap_err();
+ });
+ }
});
- tokio::task::spawn_blocking(move || {
- thread::sleep(Duration::from_millis(10)); // wait 10ms for server to bind
+ sleep(Duration::from_secs(10)).await; // wait 10ms for `server` to bind
+
+ match task::spawn_blocking(move || {
let integration = TestHyperIntegration { port };
http_tests::run_http_test_suite(&integration);
- shutdown.abort();
- });
+ })
+ .await
+ {
+ Err(f) if f.is_panic() => panic::resume_unwind(f.into_panic()),
+ Ok(()) | Err(_) => {}
+ }
- if let Err(e) = server.await {
- eprintln!("server error: {e}");
+ server.abort();
+ if let Ok(Err(e)) = server.await {
+ panic!("server failed: {e}");
}
}