From 984973658217f73d63608283d18723e11ce65161 Mon Sep 17 00:00:00 2001 From: Kai Ren Date: Thu, 21 Sep 2023 23:24:41 +0200 Subject: [PATCH] Improve WebSocket integration (#1191, #1022) - consider `juniper_graphql_transport_ws` crate on CI - implement auto-selection of protocol in `juniper_warp` crate - support `graphql-transport-ws` protocol in `juniper_actix` crate - implement auto-selection of protocol in `juniper_actix` crate Additionally: - move `examples/warp_subscriptions` into `juniper_warp/examples/subscription.rs` - move `examples/actix_subscriptions` into `juniper_actix/examples/subscription.rs` - move `examples/basic_subscriptions` into `juniper_subscriptions/examples/basic.rs` - bump up MSRV of `juniper_actix` crate to 1.68 --- .github/workflows/ci.yml | 40 +-- Cargo.toml | 6 +- book/src/advanced/subscriptions.md | 4 +- book/src/servers/warp.md | 7 +- examples/README.md | 56 ---- examples/actix_subscriptions/Cargo.toml | 21 -- examples/basic_subscriptions/Cargo.toml | 15 - examples/warp_async/Cargo.toml | 17 - examples/warp_async/src/main.rs | 104 ------ examples/warp_subscriptions/Cargo.toml | 20 -- juniper/CHANGELOG.md | 6 +- juniper_actix/CHANGELOG.md | 9 + juniper_actix/Cargo.toml | 15 +- juniper_actix/README.md | 2 +- juniper_actix/examples/actix_server.rs | 141 -------- .../examples/subscription.rs | 67 ++-- juniper_actix/src/lib.rs | 307 +++++++++++------- juniper_graphql_transport_ws/CHANGELOG.md | 13 +- juniper_graphql_transport_ws/Cargo.toml | 5 +- juniper_graphql_transport_ws/LICENSE | 2 +- juniper_graphql_transport_ws/README.md | 11 +- juniper_graphql_transport_ws/src/lib.rs | 112 ++----- juniper_graphql_transport_ws/src/schema.rs | 133 +------- juniper_graphql_ws/CHANGELOG.md | 7 + juniper_graphql_ws/Cargo.toml | 2 +- juniper_graphql_ws/LICENSE | 2 +- juniper_graphql_ws/README.md | 9 +- juniper_graphql_ws/src/lib.rs | 33 +- juniper_subscriptions/Cargo.toml | 2 +- juniper_subscriptions/README.md | 4 +- .../examples/basic.rs | 0 juniper_warp/CHANGELOG.md | 9 + juniper_warp/Cargo.toml | 15 +- juniper_warp/README.md | 4 +- .../examples/subscription.rs | 103 +++--- juniper_warp/examples/warp_server.rs | 51 --- juniper_warp/src/lib.rs | 212 ++++++++++-- 37 files changed, 622 insertions(+), 944 deletions(-) delete mode 100644 examples/README.md delete mode 100644 examples/actix_subscriptions/Cargo.toml delete mode 100644 examples/basic_subscriptions/Cargo.toml delete mode 100644 examples/warp_async/Cargo.toml delete mode 100644 examples/warp_async/src/main.rs delete mode 100644 examples/warp_subscriptions/Cargo.toml delete mode 100644 juniper_actix/examples/actix_server.rs rename examples/actix_subscriptions/src/main.rs => juniper_actix/examples/subscription.rs (77%) rename examples/basic_subscriptions/src/main.rs => juniper_subscriptions/examples/basic.rs (100%) rename examples/warp_subscriptions/src/main.rs => juniper_warp/examples/subscription.rs (57%) delete mode 100644 juniper_warp/examples/warp_server.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2fc6dc26..f1102498 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,6 @@ jobs: needs: - bench - clippy - - example - feature - msrv - release-check @@ -84,32 +83,6 @@ jobs: - run: cargo clippy -p juniper_benchmarks --benches -- -D warnings - run: cargo bench -p juniper_benchmarks - example: - strategy: - fail-fast: false - matrix: - example: - - actix_subscriptions - - basic_subscriptions - - warp_async - - warp_subscriptions - os: - - ubuntu - - macOS - - windows - toolchain: - - stable - - beta - - nightly - runs-on: ${{ matrix.os }}-latest - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@v1 - with: - toolchain: ${{ matrix.toolchain }} - - - run: cargo check -p example_${{ matrix.example }} - feature: strategy: fail-fast: false @@ -161,8 +134,9 @@ jobs: - juniper_codegen - juniper - juniper_subscriptions + - juniper_graphql_transport_ws - juniper_graphql_ws - - juniper_actix + #- juniper_actix - juniper_hyper #- juniper_iron - juniper_rocket @@ -171,6 +145,10 @@ jobs: - ubuntu - macOS - windows + include: + - { msrv: "1.68.0", crate: "juniper_actix", os: "ubuntu" } + - { msrv: "1.68.0", crate: "juniper_actix", os: "macOS" } + - { msrv: "1.68.0", crate: "juniper_actix", os: "windows" } runs-on: ${{ matrix.os }}-latest steps: - uses: actions/checkout@v4 @@ -184,9 +162,6 @@ jobs: - run: cargo +nightly update -Z minimal-versions - run: make test.cargo crate=${{ matrix.crate }} - if: ${{ !contains(fromJSON('["juniper_actix"]'), matrix.crate) }} - - run: cargo check -p ${{ matrix.crate }} --all-features - if: ${{ contains(fromJSON('["juniper_actix"]'), matrix.crate) }} package: if: ${{ startsWith(github.ref, 'refs/tags/juniper') }} @@ -214,6 +189,7 @@ jobs: - juniper_codegen - juniper - juniper_subscriptions + - juniper_graphql_transport_ws - juniper_graphql_ws - juniper_integration_tests - juniper_codegen_tests @@ -342,6 +318,7 @@ jobs: - juniper_codegen - juniper - juniper_subscriptions + - juniper_graphql_transport_ws - juniper_graphql_ws - juniper_actix - juniper_hyper @@ -366,7 +343,6 @@ jobs: needs: - bench - clippy - - example - feature - msrv - package diff --git a/Cargo.toml b/Cargo.toml index bd65c114..d505b21a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,18 +2,14 @@ resolver = "1" # unifying Cargo features asap for Book tests members = [ "benches", - "examples/basic_subscriptions", - "examples/warp_async", - "examples/warp_subscriptions", - "examples/actix_subscriptions", "juniper_codegen", "juniper", "juniper_hyper", "juniper_iron", "juniper_rocket", "juniper_subscriptions", - "juniper_graphql_ws", "juniper_graphql_transport_ws", + "juniper_graphql_ws", "juniper_warp", "juniper_actix", "tests/codegen", diff --git a/book/src/advanced/subscriptions.md b/book/src/advanced/subscriptions.md index 51200b10..9aaa9363 100644 --- a/book/src/advanced/subscriptions.md +++ b/book/src/advanced/subscriptions.md @@ -154,8 +154,8 @@ async fn run_subscription() { Currently there is an example of subscriptions with [warp][warp], but it still in an alpha state. GraphQL over [WS][WS] is not fully supported yet and is non-standard. -- [Warp Subscription Example](https://github.com/graphql-rust/juniper/tree/master/examples/warp_subscriptions) -- [Small Example](https://github.com/graphql-rust/juniper/tree/master/examples/basic_subscriptions) +- [Warp Subscription Example](https://github.com/graphql-rust/juniper/tree/master/juniper_warp/examples/subscription.rs) +- [Small Example](https://github.com/graphql-rust/juniper/tree/master/juniper_subscriptions/examples/basic.rs) diff --git a/book/src/servers/warp.md b/book/src/servers/warp.md index 1f7338eb..70ea945a 100644 --- a/book/src/servers/warp.md +++ b/book/src/servers/warp.md @@ -14,10 +14,11 @@ juniper = "0.16.0" juniper_warp = "0.8.0" ``` -Included in the source is a [small example][example] which sets up a basic GraphQL and [GraphiQL] handler. +Included in the source is a [small example][example] which sets up a basic GraphQL and [GraphiQL]/[GraphQL Playground] handlers with subscriptions support. -[graphiql]: https://github.com/graphql/graphiql +[GraphiQL]: https://github.com/graphql/graphiql +[GraphQL Playground]: https://github.com/prisma/graphql-playground [hyper]: https://hyper.rs/ [warp]: https://crates.io/crates/warp [juniper_warp]: https://github.com/graphql-rust/juniper/tree/master/juniper_warp -[example]: https://github.com/graphql-rust/juniper/blob/master/juniper_warp/examples/warp_server.rs +[example]: https://github.com/graphql-rust/juniper/blob/master/juniper_warp/examples/subscription.rs diff --git a/examples/README.md b/examples/README.md deleted file mode 100644 index bd217ced..00000000 --- a/examples/README.md +++ /dev/null @@ -1,56 +0,0 @@ -# Juniper Examples - -This directory contains examples of how to use Juniper. - -## How to run - -To run an example, you need to have a working Rust toolchain installed. You can -get it from [rustup](https://rustup.rs/). - -Then, you can run the example using its workspace: - -```bash -cargo run --example -``` - -Where `` is one of the following workspace members: - -``` -actix_server -hyper_server -iron_server -rocket_server -warp_server -``` - -e.g. to run the `actix_server` example: - -```bash -cargo run --example actix_server -``` - -You can also run an example directly from an `examples` workspace directory. To -run the `actix_server` example: - -```bash -cd examples/actix_subscriptions -cargo run - Finished dev [unoptimized + debuginfo] target(s) in 0.13s - Running `/path/to/repo/juniper/target/debug/example_actix_subscriptions` -[2022-11-20T07:46:08Z INFO actix_server::builder] Starting 10 workers -[2022-11-20T07:46:08Z INFO actix_server::server] Actix runtime found; starting in Actix runtime -``` - -Note if you want to run the code within your own project, you need to change -the relative paths in `Cargo.toml`, e.g: - -```toml -juniper_graphql_ws = { path = "../../juniper_graphql_ws" } -``` - -to: - -```toml -juniper_graphql_ws = "0.3.0" -``` - diff --git a/examples/actix_subscriptions/Cargo.toml b/examples/actix_subscriptions/Cargo.toml deleted file mode 100644 index 923af69a..00000000 --- a/examples/actix_subscriptions/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "example_actix_subscriptions" -version = "0.0.0" -edition = "2021" -rust-version = "1.65" -authors = ["Mihai Dinculescu "] -publish = false - -[dependencies] -actix-cors = "0.6" -actix-web = "4.0" -async-stream = "0.3" -env_logger = "0.10" -futures = "0.3" -juniper = { path = "../../juniper", features = ["expose-test-schema"] } -juniper_actix = { path = "../../juniper_actix", features = ["subscriptions"] } -juniper_graphql_ws = { path = "../../juniper_graphql_ws" } -rand = "0.8" -serde = "1.0" -serde_json = "1.0" -tokio = "1.0" diff --git a/examples/basic_subscriptions/Cargo.toml b/examples/basic_subscriptions/Cargo.toml deleted file mode 100644 index ee26a299..00000000 --- a/examples/basic_subscriptions/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "example_basic_subscriptions" -version = "0.0.0" -edition = "2021" -rust-version = "1.65" -authors = ["Jordao Rosario "] -publish = false - -[dependencies] -futures = "0.3" -juniper = { path = "../../juniper" } -juniper_subscriptions = { path = "../../juniper_subscriptions" } -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } diff --git a/examples/warp_async/Cargo.toml b/examples/warp_async/Cargo.toml deleted file mode 100644 index a09d6615..00000000 --- a/examples/warp_async/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "example_warp_async" -version = "0.0.0" -edition = "2021" -rust-version = "1.65" -authors = ["Christoph Herzog "] -publish = false - -[dependencies] -env_logger = "0.10" -futures = "0.3" -juniper = { path = "../../juniper" } -juniper_warp = { path = "../../juniper_warp" } -log = "0.4" -reqwest = { version = "0.11", features = ["rustls-tls"], default-features = false } -tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } -warp = "0.3" diff --git a/examples/warp_async/src/main.rs b/examples/warp_async/src/main.rs deleted file mode 100644 index 7e1babbf..00000000 --- a/examples/warp_async/src/main.rs +++ /dev/null @@ -1,104 +0,0 @@ -//! This example demonstrates async/await usage with warp. - -use juniper::{ - graphql_object, EmptyMutation, EmptySubscription, FieldError, GraphQLEnum, RootNode, -}; -use warp::{http::Response, Filter}; - -#[derive(Clone, Copy, Debug)] -struct Context; -impl juniper::Context for Context {} - -#[derive(Clone, Copy, Debug, GraphQLEnum)] -enum UserKind { - Admin, - User, - Guest, -} - -#[derive(Clone, Debug)] -struct User { - id: i32, - kind: UserKind, - name: String, -} - -#[graphql_object(context = Context)] -impl User { - fn id(&self) -> i32 { - self.id - } - - fn kind(&self) -> UserKind { - self.kind - } - - fn name(&self) -> &str { - &self.name - } - - async fn friends(&self) -> Vec { - vec![] - } -} - -#[derive(Clone, Copy, Debug)] -struct Query; - -#[graphql_object(context = Context)] -impl Query { - async fn users() -> Vec { - vec![User { - id: 1, - kind: UserKind::Admin, - name: "user1".into(), - }] - } - - /// Fetch a URL and return the response body text. - async fn request(url: String) -> Result { - Ok(reqwest::get(&url).await?.text().await?) - } -} - -type Schema = RootNode<'static, Query, EmptyMutation, EmptySubscription>; - -fn schema() -> Schema { - Schema::new( - Query, - EmptyMutation::::new(), - EmptySubscription::::new(), - ) -} - -#[tokio::main] -async fn main() { - std::env::set_var("RUST_LOG", "warp_async"); - env_logger::init(); - - let log = warp::log("warp_server"); - - let homepage = warp::path::end().map(|| { - Response::builder() - .header("content-type", "text/html") - .body( - "

juniper_warp

visit /graphiql", - ) - }); - - log::info!("Listening on 127.0.0.1:8080"); - - let state = warp::any().map(|| Context); - let graphql_filter = juniper_warp::make_graphql_filter(schema(), state.boxed()); - - warp::serve( - warp::get() - .and(warp::path("graphiql")) - .and(juniper_warp::graphiql_filter("/graphql", None)) - .or(homepage) - .or(warp::path("graphql").and(graphql_filter)) - .with(log), - ) - .run(([127, 0, 0, 1], 8080)) - .await -} diff --git a/examples/warp_subscriptions/Cargo.toml b/examples/warp_subscriptions/Cargo.toml deleted file mode 100644 index f708c99e..00000000 --- a/examples/warp_subscriptions/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "example_warp_subscriptions" -version = "0.0.0" -edition = "2021" -rust-version = "1.65" -publish = false - -[dependencies] -async-stream = "0.3" -env_logger = "0.10" -futures = "0.3" -juniper = { path = "../../juniper" } -juniper_graphql_ws = { path = "../../juniper_graphql_ws" } -juniper_graphql_transport_ws = { path = "../../juniper_graphql_transport_ws" } -juniper_warp = { path = "../../juniper_warp", features = ["subscriptions"] } -log = "0.4.8" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } -warp = "0.3" diff --git a/juniper/CHANGELOG.md b/juniper/CHANGELOG.md index 7bcaef99..1693e131 100644 --- a/juniper/CHANGELOG.md +++ b/juniper/CHANGELOG.md @@ -51,7 +51,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi - Disabled `chrono` [Cargo feature] by default. - Removed `scalar-naivetime` [Cargo feature]. - Removed lifetime parameter from `ParseError`, `GraphlQLError`, `GraphQLBatchRequest` and `GraphQLRequest`. ([#1081], [#528]) -- Upgraded [GraphiQL] to 3.0.6 version (requires new [`graphql-ws` GraphQL over WebSocket Protocol] integration on server, see `examples/warp_subscriptions`). ([#1188], [#1193]) +- Upgraded [GraphiQL] to 3.0.6 version (requires new [`graphql-transport-ws` GraphQL over WebSocket Protocol] integration on server, see `juniper_warp/examples/subscription.rs`). ([#1188], [#1193]) ### Added @@ -145,8 +145,8 @@ See [old CHANGELOG](/../../blob/juniper-v0.15.9/juniper/CHANGELOG.md). [`chrono-tz` crate]: https://docs.rs/chrono-tz [`time` crate]: https://docs.rs/time [Cargo feature]: https://doc.rust-lang.org/cargo/reference/features.html -[`graphql-ws` GraphQL over WebSocket Protocol]: https://github.com/graphql/graphiql -[GraphiQL]: https://github.com/enisdenjo/graphql-ws/master/PROTOCOL.md +[`graphql-transport-ws` GraphQL over WebSocket Protocol]: https://github.com/enisdenjo/graphql-ws/v5.14.0/PROTOCOL.md +[GraphiQL]: https://github.com/graphql/graphiql [GraphQL Playground]: https://github.com/prisma/graphql-playground [graphql-scalars.dev]: https://graphql-scalars.dev [October 2021]: https://spec.graphql.org/October2021 diff --git a/juniper_actix/CHANGELOG.md b/juniper_actix/CHANGELOG.md index 84b533b8..28450776 100644 --- a/juniper_actix/CHANGELOG.md +++ b/juniper_actix/CHANGELOG.md @@ -13,6 +13,12 @@ All user visible changes to `juniper_actix` crate will be documented in this fil - Switched to 4.0 version of [`actix-web` crate] and its ecosystem. ([#1034]) - Switched to 0.16 version of [`juniper` crate]. - Switched to 0.4 version of [`juniper_graphql_ws` crate]. +- Renamed `subscriptions::subscriptions_handler()` as `subscriptions::graphql_ws_handler()` for processing the [legacy `graphql-ws` GraphQL over WebSocket Protocol][graphql-ws]. ([#1191]) + +### Added + +- `subscriptions::graphql_transport_ws_handler()` allowing to process the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][graphql-transport-ws]. ([#1191]) +- `subscriptions::ws_handler()` with auto-selection between the [legacy `graphql-ws` GraphQL over WebSocket Protocol][graphql-ws] and the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][graphql-transport-ws], based on the `Sec-Websocket-Protocol` HTTP header value. ([#1191]) ### Fixed @@ -21,6 +27,7 @@ All user visible changes to `juniper_actix` crate will be documented in this fil [#1034]: /../../pull/1034 [#1169]: /../../issues/1169 [#1187]: /../../pull/1187 +[#1191]: /../../pull/1191 @@ -36,3 +43,5 @@ See [old CHANGELOG](/../../blob/juniper_actix-v0.4.0/juniper_actix/CHANGELOG.md) [`juniper` crate]: https://docs.rs/juniper [`juniper_graphql_ws` crate]: https://docs.rs/juniper_graphql_ws [Semantic Versioning 2.0.0]: https://semver.org +[graphql-transport-ws]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md +[graphql-ws]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md \ No newline at end of file diff --git a/juniper_actix/Cargo.toml b/juniper_actix/Cargo.toml index 30bda9ee..c38b0bb3 100644 --- a/juniper_actix/Cargo.toml +++ b/juniper_actix/Cargo.toml @@ -2,7 +2,7 @@ name = "juniper_actix" version = "0.5.0-dev" edition = "2021" -rust-version = "1.65" +rust-version = "1.68" description = "`juniper` GraphQL integration with `actix-web`." license = "BSD-2-Clause" authors = ["Jordao Rosario "] @@ -22,6 +22,7 @@ rustdoc-args = ["--cfg", "docsrs"] subscriptions = [ "dep:actix", "dep:actix-web-actors", + "dep:juniper_graphql_transport_ws", "dep:juniper_graphql_ws", "dep:tokio", ] @@ -29,11 +30,12 @@ subscriptions = [ [dependencies] actix = { version = ">=0.12,<=0.13", optional = true } actix-http = "3.2" -actix-web = "4.2.1" +actix-web = "4.4" actix-web-actors = { version = "4.1", optional = true } anyhow = "1.0.47" futures = "0.3.22" juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false } +juniper_graphql_transport_ws = { version = "0.4.0-dev", path = "../juniper_graphql_transport_ws", optional = true } juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", optional = true } http = "0.2.4" serde = { version = "1.0.122", features = ["derive"] } @@ -41,10 +43,6 @@ serde_json = "1.0.18" thiserror = "1.0" tokio = { version = "1.0", features = ["sync"], optional = true } -# Fixes for MSRV check. -# TODO: Try remove on upgrade to 4.3 version of `actix-web` crate. -derive_more = { version = "0.99.8", default-features = false } - [dev-dependencies] actix-cors = "0.6" actix-identity = "0.6" @@ -55,4 +53,9 @@ bytes = "1.0" env_logger = "0.10" juniper = { version = "0.16.0-dev", path = "../juniper", features = ["expose-test-schema"] } log = "0.4" +rand = "0.8" tokio = "1.0" + +[[example]] +name = "subscription" +required-features = ["subscriptions"] diff --git a/juniper_actix/README.md b/juniper_actix/README.md index 45192b68..b94de8af 100644 --- a/juniper_actix/README.md +++ b/juniper_actix/README.md @@ -4,7 +4,7 @@ [![Crates.io](https://img.shields.io/crates/v/juniper_actix.svg?maxAge=2592000)](https://crates.io/crates/juniper_actix) [![Documentation](https://docs.rs/juniper_actix/badge.svg)](https://docs.rs/juniper_actix) [![CI](https://github.com/graphql-rust/juniper/workflows/CI/badge.svg?branch=master "CI")](https://github.com/graphql-rust/juniper/actions?query=workflow%3ACI+branch%3Amaster) -[![Rust 1.65+](https://img.shields.io/badge/rustc-1.65+-lightgray.svg "Rust 1.65+")](https://blog.rust-lang.org/2022/11/03/Rust-1.65.0.html) +[![Rust 1.68+](https://img.shields.io/badge/rustc-1.68+-lightgray.svg "Rust 1.68+")](https://blog.rust-lang.org/2023/03/09/Rust-1.68.0.html) - [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_actix/CHANGELOG.md) diff --git a/juniper_actix/examples/actix_server.rs b/juniper_actix/examples/actix_server.rs deleted file mode 100644 index ab27373d..00000000 --- a/juniper_actix/examples/actix_server.rs +++ /dev/null @@ -1,141 +0,0 @@ -#![deny(warnings)] - -use std::{collections::HashMap, env}; - -use actix_cors::Cors; -use actix_web::{ - http::header, - middleware, - web::{self, Data}, - App, Error, HttpResponse, HttpServer, -}; -use juniper::{graphql_object, EmptyMutation, EmptySubscription, GraphQLObject, RootNode}; -use juniper_actix::{graphiql_handler, graphql_handler, playground_handler}; - -#[derive(Clone, GraphQLObject)] -///a user -pub struct User { - ///the id - id: i32, - ///the name - name: String, -} - -#[derive(Clone, Default)] -pub struct Database { - ///this could be a database connection - users: HashMap, -} -impl Database { - pub fn new() -> Database { - let mut users = HashMap::new(); - users.insert( - 1, - User { - id: 1, - name: "Aron".into(), - }, - ); - users.insert( - 2, - User { - id: 2, - name: "Bea".into(), - }, - ); - users.insert( - 3, - User { - id: 3, - name: "Carl".into(), - }, - ); - users.insert( - 4, - User { - id: 4, - name: "Dora".into(), - }, - ); - Database { users } - } - pub fn get_user(&self, id: &i32) -> Option<&User> { - self.users.get(id) - } -} - -// To make our Database usable by Juniper, we have to implement a marker trait. -impl juniper::Context for Database {} - -// Queries represent the callable funcitons -struct Query; -#[graphql_object(context = Database)] -impl Query { - fn api_version() -> &'static str { - "1.0" - } - - fn user( - context: &Database, - #[graphql(description = "id of the user")] id: i32, - ) -> Option<&User> { - context.get_user(&id) - } -} - -type Schema = RootNode<'static, Query, EmptyMutation, EmptySubscription>; - -fn schema() -> Schema { - Schema::new( - Query, - EmptyMutation::::new(), - EmptySubscription::::new(), - ) -} - -async fn graphiql_route() -> Result { - graphiql_handler("/graphql", None).await -} -async fn playground_route() -> Result { - playground_handler("/graphql", None).await -} -async fn graphql_route( - req: actix_web::HttpRequest, - payload: actix_web::web::Payload, - schema: web::Data, -) -> Result { - let context = Database::new(); - graphql_handler(&schema, &context, req, payload).await -} - -#[actix_web::main] -async fn main() -> std::io::Result<()> { - env::set_var("RUST_LOG", "info"); - env_logger::init(); - - let server = HttpServer::new(move || { - App::new() - .app_data(Data::new(schema())) - .wrap( - Cors::default() - .allow_any_origin() - .allowed_methods(vec!["POST", "GET"]) - .allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT]) - .allowed_header(header::CONTENT_TYPE) - .supports_credentials() - .max_age(3600), - ) - .wrap(middleware::Compress::default()) - .wrap(middleware::Logger::default()) - .service( - web::resource("/graphql") - .route(web::post().to(graphql_route)) - .route(web::get().to(graphql_route)), - ) - .service(web::resource("/playground").route(web::get().to(playground_route))) - .service(web::resource("/graphiql").route(web::get().to(graphiql_route))) - }); - server.bind("127.0.0.1:8080").unwrap().run().await -} -// now go to http://127.0.0.1:8080/playground or graphiql and execute -//{ apiVersion, user(id: 2){id, name}} diff --git a/examples/actix_subscriptions/src/main.rs b/juniper_actix/examples/subscription.rs similarity index 77% rename from examples/actix_subscriptions/src/main.rs rename to juniper_actix/examples/subscription.rs index 6747105f..17f72d40 100644 --- a/examples/actix_subscriptions/src/main.rs +++ b/juniper_actix/examples/subscription.rs @@ -1,3 +1,5 @@ +//! This example demonstrates asynchronous subscriptions with [`actix_web`]. + use std::{env, pin::Pin, time::Duration}; use actix_cors::Cors; @@ -5,7 +7,7 @@ use actix_web::{ http::header, middleware, web::{self, Data}, - App, Error, HttpRequest, HttpResponse, HttpServer, + App, Error, HttpRequest, HttpResponse, HttpServer, Responder, }; use juniper::{ @@ -13,7 +15,7 @@ use juniper::{ tests::fixtures::starwars::schema::{Database, Query}, EmptyMutation, FieldError, GraphQLObject, RootNode, }; -use juniper_actix::{graphql_handler, playground_handler, subscriptions::subscriptions_handler}; +use juniper_actix::{graphiql_handler, graphql_handler, playground_handler, subscriptions}; use juniper_graphql_ws::ConnectionConfig; type Schema = RootNode<'static, Query, EmptyMutation, Subscription>; @@ -26,15 +28,45 @@ async fn playground() -> Result { playground_handler("/graphql", Some("/subscriptions")).await } +async fn graphiql() -> Result { + graphiql_handler("/graphql", Some("/subscriptions")).await +} + async fn graphql( - req: actix_web::HttpRequest, - payload: actix_web::web::Payload, - schema: web::Data, + req: HttpRequest, + payload: web::Payload, + schema: Data, ) -> Result { let context = Database::new(); graphql_handler(&schema, &context, req, payload).await } +async fn homepage() -> impl Responder { + HttpResponse::Ok() + .insert_header(("content-type", "text/html")) + .message_body( + "

juniper_actix/subscription example

\ +
visit GraphiQL
\ + \ + ", + ) +} + +async fn subscriptions( + req: HttpRequest, + stream: web::Payload, + schema: web::Data, +) -> Result { + let context = Database::new(); + let schema = schema.into_inner(); + let config = ConnectionConfig::new(context); + // set the keep alive interval to 15 secs so that it doesn't timeout in playground + // playground has a hard-coded timeout set to 20 secs + let config = config.with_keep_alive_interval(Duration::from_secs(15)); + + subscriptions::ws_handler(req, stream, schema, config).await +} + struct Subscription; #[derive(GraphQLObject)] @@ -49,7 +81,8 @@ type RandomHumanStream = #[graphql_subscription(context = Database)] impl Subscription { #[graphql( - description = "A random humanoid creature in the Star Wars universe every 3 seconds. Second result will be an error." + description = "A random humanoid creature in the Star Wars universe every 3 seconds. \ + Second result will be an error." )] async fn random_human(context: &Database) -> RandomHumanStream { let mut counter = 0; @@ -84,21 +117,6 @@ impl Subscription { } } -async fn subscriptions( - req: HttpRequest, - stream: web::Payload, - schema: web::Data, -) -> Result { - let context = Database::new(); - let schema = schema.into_inner(); - let config = ConnectionConfig::new(context); - // set the keep alive interval to 15 secs so that it doesn't timeout in playground - // playground has a hard-coded timeout set to 20 secs - let config = config.with_keep_alive_interval(Duration::from_secs(15)); - - subscriptions_handler(req, stream, schema, config).await -} - #[actix_web::main] async fn main() -> std::io::Result<()> { env::set_var("RUST_LOG", "info"); @@ -125,11 +143,8 @@ async fn main() -> std::io::Result<()> { .route(web::get().to(graphql)), ) .service(web::resource("/playground").route(web::get().to(playground))) - .default_service(web::to(|| async { - HttpResponse::Found() - .append_header((header::LOCATION, "/playground")) - .finish() - })) + .service(web::resource("/graphiql").route(web::get().to(graphiql))) + .default_service(web::to(homepage)) }) .bind("127.0.0.1:8080")? .run() diff --git a/juniper_actix/src/lib.rs b/juniper_actix/src/lib.rs index baf0476f..6d69951b 100644 --- a/juniper_actix/src/lib.rs +++ b/juniper_actix/src/lib.rs @@ -165,44 +165,47 @@ pub async fn playground_handler( .body(html)) } -/// `juniper_actix` subscriptions handler implementation. -/// Cannot be merged to `juniper_actix` yet as GraphQL over WS[1] -/// is not fully supported in current implementation. -/// -/// *Note: this implementation is in an alpha state.* -/// -/// [1]: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md #[cfg(feature = "subscriptions")] +/// `juniper_actix` subscriptions handler implementation. pub mod subscriptions { use std::{fmt, sync::Arc}; - use actix::{prelude::*, Actor, StreamHandler}; + use actix::{ + AsyncContext as _, ContextFutureSpawner as _, Handler, StreamHandler, WrapFuture as _, + }; use actix_web::{ http::header::{HeaderName, HeaderValue}, web, HttpRequest, HttpResponse, }; use actix_web_actors::ws; - use juniper::{ - futures::{ - stream::{SplitSink, SplitStream, StreamExt}, - SinkExt, - }, - GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue, + use futures::{ + stream::{SplitSink, SplitStream}, + SinkExt as _, Stream, StreamExt as _, }; - use juniper_graphql_ws::{ArcSchema, ClientMessage, Connection, Init, ServerMessage}; + use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue}; + use juniper_graphql_transport_ws::{ArcSchema, Init}; use tokio::sync::Mutex; - /// Serves the graphql-ws protocol over a WebSocket connection. + /// Serves by auto-selecting between the + /// [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] and the + /// [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], based on the + /// `Sec-Websocket-Protocol` HTTP header value. /// - /// The `init` argument is used to provide the context and additional configuration for - /// connections. This can be a `juniper_graphql_ws::ConnectionConfig` if the context and - /// configuration are already known, or it can be a closure that gets executed asynchronously - /// when the client sends the ConnectionInit message. Using a closure allows you to perform - /// authentication based on the parameters provided by the client. - pub async fn subscriptions_handler( + /// The `schema` argument is your [`juniper`] schema. + /// + /// The `init` argument is used to provide the custom [`juniper::Context`] and additional + /// configuration for connections. This can be a + /// [`juniper_graphql_transport_ws::ConnectionConfig`] if the context and configuration are + /// already known, or it can be a closure that gets executed asynchronously whenever a client + /// sends the subscription initialization message. Using a closure allows to perform an + /// authentication based on the parameters provided by a client. + /// + /// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md + /// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md + pub async fn ws_handler( req: HttpRequest, stream: web::Payload, - root_node: Arc>, + schema: Arc>, init: I, ) -> Result where @@ -216,12 +219,56 @@ pub mod subscriptions { S: ScalarValue + Send + Sync + 'static, I: Init + Send, { - let (s_tx, s_rx) = Connection::new(ArcSchema(root_node), init).split::(); + if req + .headers() + .get("sec-websocket-protocol") + .map(AsRef::as_ref) + == Some("graphql-ws".as_bytes()) + { + graphql_ws_handler(req, stream, schema, init).await + } else { + graphql_transport_ws_handler(req, stream, schema, init).await + } + } + + /// Serves the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old]. + /// + /// The `init` argument is used to provide the context and additional configuration for + /// connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the context and + /// configuration are already known, or it can be a closure that gets executed asynchronously + /// when the client sends the `GQL_CONNECTION_INIT` message. Using a closure allows to perform + /// an authentication based on the parameters provided by a client. + /// + /// > __WARNING__: This protocol has been deprecated in favor of the + /// [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], which is + /// provided by the [`graphql_transport_ws_handler()`] function. + /// + /// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md + /// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md + pub async fn graphql_ws_handler( + req: HttpRequest, + stream: web::Payload, + schema: Arc>, + init: I, + ) -> Result + where + Query: GraphQLTypeAsync + Send + 'static, + Query::TypeInfo: Send + Sync, + Mutation: GraphQLTypeAsync + Send + 'static, + Mutation::TypeInfo: Send + Sync, + Subscription: GraphQLSubscriptionType + Send + 'static, + Subscription::TypeInfo: Send + Sync, + CtxT: Unpin + Send + Sync + 'static, + S: ScalarValue + Send + Sync + 'static, + I: Init + Send, + { + let (s_tx, s_rx) = + juniper_graphql_ws::Connection::new(ArcSchema(schema), init).split::(); let mut resp = ws::start( - SubscriptionActor { - graphql_tx: Arc::new(Mutex::new(s_tx)), - graphql_rx: Arc::new(Mutex::new(s_rx)), + Actor { + tx: Arc::new(Mutex::new(s_tx)), + rx: Arc::new(Mutex::new(s_rx)), }, &req, stream, @@ -235,18 +282,21 @@ pub mod subscriptions { Ok(resp) } - type ConnectionSplitSink = Arc< - Mutex, I>, Message>>, - >; - - type ConnectionSplitStream = - Arc, I>>>>; - - /// Subscription Actor - /// coordinates messages between actix_web and juniper_graphql_ws - /// ws message -> actor -> juniper - /// juniper -> actor -> ws response - struct SubscriptionActor + /// Serves the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new]. + /// + /// The `init` argument is used to provide the context and additional configuration for + /// connections. This can be a [`juniper_graphql_transport_ws::ConnectionConfig`] if the context + /// and configuration are already known, or it can be a closure that gets executed + /// asynchronously when the client sends the `ConnectionInit` message. Using a closure allows to + /// perform an authentication based on the parameters provided by a client. + /// + /// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md + pub async fn graphql_transport_ws_handler( + req: HttpRequest, + stream: web::Payload, + schema: Arc>, + init: I, + ) -> Result where Query: GraphQLTypeAsync + Send + 'static, Query::TypeInfo: Send + Sync, @@ -258,37 +308,56 @@ pub mod subscriptions { S: ScalarValue + Send + Sync + 'static, I: Init + Send, { - graphql_tx: ConnectionSplitSink, - graphql_rx: ConnectionSplitStream, + let (s_tx, s_rx) = juniper_graphql_transport_ws::Connection::new(ArcSchema(schema), init) + .split::(); + + let mut resp = ws::start( + Actor { + tx: Arc::new(Mutex::new(s_tx)), + rx: Arc::new(Mutex::new(s_rx)), + }, + &req, + stream, + )?; + + resp.headers_mut().insert( + HeaderName::from_static("sec-websocket-protocol"), + HeaderValue::from_static("graphql-transport-ws"), + ); + + Ok(resp) } - /// ws message -> actor -> juniper - impl - StreamHandler> - for SubscriptionActor + type ConnectionSplitSink = Arc>>; + type ConnectionSplitStream = Arc>>; + + /// [`actix::Actor`], coordinating messages between [`actix_web`] and [`juniper_graphql_ws`]: + /// - incoming [`ws::Message`] -> [`Actor`] -> [`juniper`] + /// - [`juniper`] -> [`Actor`] -> response [`ws::Message`] + struct Actor { + tx: ConnectionSplitSink, + rx: ConnectionSplitStream, + } + + impl StreamHandler> for Actor where - Query: GraphQLTypeAsync + Send + 'static, - Query::TypeInfo: Send + Sync, - Mutation: GraphQLTypeAsync + Send + 'static, - Mutation::TypeInfo: Send + Sync, - Subscription: GraphQLSubscriptionType + Send + 'static, - Subscription::TypeInfo: Send + Sync, - CtxT: Unpin + Send + Sync + 'static, - S: ScalarValue + Send + Sync + 'static, - I: Init + Send, + Self: actix::Actor>, + Conn: futures::Sink, + >::Error: fmt::Debug, { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - let msg = msg.map(Message); - #[allow(clippy::single_match)] match msg { Ok(msg) => { - let tx = self.graphql_tx.clone(); + let tx = self.tx.clone(); + // TODO: Somehow this implementation always closes as `1006: Abnormal closure` + // due to excessive polling of `tx` part. + // Needs to be reworked. async move { tx.lock() .await - .send(msg) + .send(Message(msg)) .await .expect("Infallible: this should not happen"); } @@ -303,31 +372,23 @@ pub mod subscriptions { } } - /// juniper -> actor - impl Actor - for SubscriptionActor + /// [`juniper`] -> [`Actor`]. + impl actix::Actor for Actor where - Query: GraphQLTypeAsync + Send + 'static, - Query::TypeInfo: Send + Sync, - Mutation: GraphQLTypeAsync + Send + 'static, - Mutation::TypeInfo: Send + Sync, - Subscription: GraphQLSubscriptionType + Send + 'static, - Subscription::TypeInfo: Send + Sync, - CtxT: Unpin + Send + Sync + 'static, - S: ScalarValue + Send + Sync + 'static, - I: Init + Send, + Conn: Stream + 'static, + ::Item: IntoWsResponse + Send, { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { - let stream = self.graphql_rx.clone(); + let stream = self.rx.clone(); let addr = ctx.address(); let fut = async move { let mut stream = stream.lock().await; - while let Some(message) = stream.next().await { - // sending the message to self so that it can be forwarded back to the client - addr.do_send(ServerMessageWrapper { message }); + while let Some(msg) = stream.next().await { + // Sending the `msg` to `self`, so that it can be forwarded back to the client. + addr.do_send(ServerMessage(msg)); } } .into_actor(self); @@ -341,55 +402,75 @@ pub mod subscriptions { } } - /// actor -> websocket response - impl actix::prelude::Handler> - for SubscriptionActor + /// [`Actor`] -> response [`ws::Message`]. + impl Handler> for Actor where - Query: GraphQLTypeAsync + Send + 'static, - Query::TypeInfo: Send + Sync, - Mutation: GraphQLTypeAsync + Send + 'static, - Mutation::TypeInfo: Send + Sync, - Subscription: GraphQLSubscriptionType + Send + 'static, - Subscription::TypeInfo: Send + Sync, - CtxT: Unpin + Send + Sync + 'static, - S: ScalarValue + Send + Sync + 'static, - I: Init + Send, + Conn: Stream + 'static, + M: IntoWsResponse + Send, { type Result = (); - fn handle( - &mut self, - msg: ServerMessageWrapper, - ctx: &mut Self::Context, - ) -> Self::Result { - let msg = serde_json::to_string(&msg.message); - match msg { + fn handle(&mut self, msg: ServerMessage, ctx: &mut Self::Context) -> Self::Result { + match msg.0.into_ws_response() { Ok(msg) => ctx.text(msg), - Err(e) => { - let reason = ws::CloseReason { - code: ws::CloseCode::Error, - description: Some(format!("error serializing response: {e}")), - }; - - // TODO: trace - ctx.close(Some(reason)) - } - }; + // TODO: trace + Err(reason) => ctx.close(Some(reason)), + } } } - #[derive(Message)] + + #[derive(actix::Message)] #[rtype(result = "()")] - struct ServerMessageWrapper - where - S: ScalarValue + Send + Sync + 'static, - { - message: ServerMessage, + struct ServerMessage(T); + + /// Conversion of a [`ServerMessage`] into a response [`ws::Message`]. + pub trait IntoWsResponse { + /// Converts this [`ServerMessage`] into response [`ws::Message`]. + fn into_ws_response(self) -> Result; + } + + impl IntoWsResponse for juniper_graphql_transport_ws::Output { + fn into_ws_response(self) -> Result { + match self { + Self::Message(msg) => serde_json::to_string(&msg).map_err(|e| ws::CloseReason { + code: ws::CloseCode::Error, + description: Some(format!("error serializing response: {e}")), + }), + Self::Close { code, message } => Err(ws::CloseReason { + code: code.into(), + description: Some(message), + }), + } + } + } + + impl IntoWsResponse for juniper_graphql_ws::ServerMessage { + fn into_ws_response(self) -> Result { + serde_json::to_string(&self).map_err(|e| ws::CloseReason { + code: ws::CloseCode::Error, + description: Some(format!("error serializing response: {e}")), + }) + } } #[derive(Debug)] struct Message(ws::Message); - impl TryFrom for ClientMessage { + impl TryFrom for juniper_graphql_transport_ws::Input { + type Error = Error; + + fn try_from(msg: Message) -> Result { + match msg.0 { + ws::Message::Text(text) => serde_json::from_slice(text.as_bytes()) + .map(Self::Message) + .map_err(Error::Serde), + ws::Message::Close(_) => Ok(Self::Close), + _ => Err(Error::UnexpectedClientMessage), + } + } + } + + impl TryFrom for juniper_graphql_ws::ClientMessage { type Error = Error; fn try_from(msg: Message) -> Result { @@ -397,7 +478,7 @@ pub mod subscriptions { ws::Message::Text(text) => { serde_json::from_slice(text.as_bytes()).map_err(Error::Serde) } - ws::Message::Close(_) => Ok(ClientMessage::ConnectionTerminate), + ws::Message::Close(_) => Ok(Self::ConnectionTerminate), _ => Err(Error::UnexpectedClientMessage), } } @@ -770,7 +851,7 @@ mod subscription_tests { use juniper_graphql_ws::ConnectionConfig; use tokio::time::timeout; - use super::subscriptions::subscriptions_handler; + use super::subscriptions::graphql_ws_handler; #[derive(Default)] struct TestActixWsIntegration; @@ -852,7 +933,7 @@ mod subscription_tests { let schema = schema.into_inner(); let config = ConnectionConfig::new(context); - subscriptions_handler(req, stream, schema, config).await + graphql_ws_handler(req, stream, schema, config).await } #[actix_web::rt::test] diff --git a/juniper_graphql_transport_ws/CHANGELOG.md b/juniper_graphql_transport_ws/CHANGELOG.md index 79402114..f0cc1799 100644 --- a/juniper_graphql_transport_ws/CHANGELOG.md +++ b/juniper_graphql_transport_ws/CHANGELOG.md @@ -1,5 +1,5 @@ `juniper_graphql_transport_ws` changelog -============================== +======================================== All user visible changes to `juniper_graphql_transport_ws` crate will be documented in this file. This project uses [Semantic Versioning 2.0.0]. @@ -8,9 +8,20 @@ All user visible changes to `juniper_graphql_transport_ws` crate will be documen ## master +### Implemented + +- [`graphql-transport-ws` GraphQL over WebSocket Protocol][proto-5.14.0] as of 5.14.0 version of [`graphql-ws` npm package]. ([#1158], [#1191], [#1022]) +- On top of 0.16 version of [`juniper` crate] and 0.17 version of [`juniper_subscriptions` crate]. + +[#1022]: /../../issues/1022 +[#1158]: /../../pull/1158 +[#1191]: /../../pull/1191 +[proto-5.14.0]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md + +[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws [`juniper` crate]: https://docs.rs/juniper [`juniper_subscriptions` crate]: https://docs.rs/juniper_subscriptions [Semantic Versioning 2.0.0]: https://semver.org diff --git a/juniper_graphql_transport_ws/Cargo.toml b/juniper_graphql_transport_ws/Cargo.toml index 11c6fae6..8bd0db34 100644 --- a/juniper_graphql_transport_ws/Cargo.toml +++ b/juniper_graphql_transport_ws/Cargo.toml @@ -3,7 +3,7 @@ name = "juniper_graphql_transport_ws" version = "0.4.0-dev" edition = "2021" rust-version = "1.65" -description = "GraphQL over WebSocket Protocol implementation for `juniper` crate." +description = "`graphql-transport-ws` GraphQL over WebSocket Protocol implementation for `juniper` crate." license = "BSD-2-Clause" authors = ["Christopher Brown "] documentation = "https://docs.rs/juniper_graphql_transport_ws" @@ -11,11 +11,12 @@ homepage = "https://github.com/graphql-rust/juniper/tree/master/juniper_graphql_ repository = "https://github.com/graphql-rust/juniper" readme = "README.md" categories = ["asynchronous", "web-programming", "web-programming::http-server"] -keywords = ["apollo", "graphql", "graphql-ws", "subscription", "websocket"] +keywords = ["apollo", "graphql", "graphql-transport-ws", "subscription", "websocket"] exclude = ["/release.toml"] [dependencies] juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false } +juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws" } juniper_subscriptions = { version = "0.17.0-dev", path = "../juniper_subscriptions" } serde = { version = "1.0.122", features = ["derive"], default-features = false } tokio = { version = "1.0", features = ["macros", "rt", "time"], default-features = false } diff --git a/juniper_graphql_transport_ws/LICENSE b/juniper_graphql_transport_ws/LICENSE index 652dd95d..c64e2be7 100644 --- a/juniper_graphql_transport_ws/LICENSE +++ b/juniper_graphql_transport_ws/LICENSE @@ -1,6 +1,6 @@ BSD 2-Clause License -Copyright (c) 2018-2022, Christopher Brown +Copyright (c) 2018-2023, Christopher Brown All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/juniper_graphql_transport_ws/README.md b/juniper_graphql_transport_ws/README.md index 972b7f26..11946a51 100644 --- a/juniper_graphql_transport_ws/README.md +++ b/juniper_graphql_transport_ws/README.md @@ -1,5 +1,5 @@ `juniper_graphql_transport_ws` crate -========================== +==================================== [![Crates.io](https://img.shields.io/crates/v/juniper_graphql_transport_ws.svg?maxAge=2592000)](https://crates.io/crates/juniper_graphql_transport_ws) [![Documentation](https://docs.rs/juniper_graphql_transport_ws/badge.svg)](https://docs.rs/juniper_graphql_transport_ws) @@ -8,7 +8,9 @@ - [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_transport_ws/CHANGELOG.md) -This crate contains an implementation of the [graphql-transport-ws WebSocket subprotocol], as used by [Apollo]. +This crate contains an implementation of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], as now used by [Apollo] and [`graphql-ws` npm package]. + +Implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] may be found in the [`juniper_graphql_ws` crate]. @@ -20,5 +22,8 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql +[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws +[`juniper_graphql_ws` crate]: https://docs.rs/juniper_graphql_ws [Apollo]: https://www.apollographql.com -[graphql-transport-ws WebSocket subprotocol]: https://github.com/enisdenjo/graphql-ws/blob/fbb763a662802a6a2584b0cbeb9cf1bde38158e0/PROTOCOL.md +[new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md +[old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md diff --git a/juniper_graphql_transport_ws/src/lib.rs b/juniper_graphql_transport_ws/src/lib.rs index 6f98100f..4dafe795 100644 --- a/juniper_graphql_transport_ws/src/lib.rs +++ b/juniper_graphql_transport_ws/src/lib.rs @@ -25,58 +25,31 @@ use juniper::{ task::{Context, Poll, Waker}, Sink, Stream, }, - GraphQLError, RuleError, ScalarValue, Variables, + GraphQLError, RuleError, ScalarValue, }; +#[doc(inline)] +pub use juniper_graphql_ws::{ConnectionConfig, Init}; + struct ExecutionParams { subscribe_payload: SubscribePayload, config: Arc>, schema: S, } -/// ConnectionConfig is used to configure the connection once the client sends the ConnectionInit -/// message. -pub struct ConnectionConfig { - context: CtxT, - max_in_flight_operations: usize, - keep_alive_interval: Duration, +/// Possible inputs received from a client. +#[derive(Debug)] +pub enum Input { + /// Deserialized [`ClientMessage`]. + Message(ClientMessage), + + /// Client initiated normal closing of a [`Connection`]. + Close, } -impl ConnectionConfig { - /// Constructs the configuration required for a connection to be accepted. - pub fn new(context: CtxT) -> Self { - Self { - context, - max_in_flight_operations: 0, - keep_alive_interval: Duration::from_secs(15), - } - } - - /// Specifies the maximum number of in-flight operations that a connection can have. If this - /// number is exceeded, attempting to start more will result in an error. By default, there is - /// no limit to in-flight operations. - #[must_use] - pub fn with_max_in_flight_operations(mut self, max: usize) -> Self { - self.max_in_flight_operations = max; - self - } - - /// Specifies the interval at which to send unsolicited pong messages as keep-alives. - /// Specifying a zero duration will disable keep-alives. By default, keep-alives are sent every - /// 15 seconds. - #[must_use] - pub fn with_keep_alive_interval(mut self, interval: Duration) -> Self { - self.keep_alive_interval = interval; - self - } -} - -impl Init for ConnectionConfig { - type Error = Infallible; - type Future = future::Ready>; - - fn init(self, _params: Variables) -> Self::Future { - future::ready(Ok(self)) +impl From> for Input { + fn from(val: ClientMessage) -> Self { + Self::Message(val) } } @@ -103,36 +76,6 @@ impl Output { } } -/// Init defines the requirements for types that can provide connection configurations when -/// ConnectionInit messages are received. Implementations are provided for `ConnectionConfig` and -/// closures that meet the requirements. -pub trait Init: Unpin + 'static { - /// The error that is returned on failure. The formatted error will be used as the contents of - /// the "message" field sent back to the client. - type Error: Error; - - /// The future configuration type. - type Future: Future, Self::Error>> + Send + 'static; - - /// Returns a future for the configuration to use. - fn init(self, params: Variables) -> Self::Future; -} - -impl Init for F -where - S: ScalarValue, - F: FnOnce(Variables) -> Fut + Unpin + 'static, - Fut: Future, E>> + Send + 'static, - E: Error, -{ - type Error = E; - type Future = Fut; - - fn init(self, params: Variables) -> Fut { - self(params) - } -} - enum ConnectionState> { /// PreInit is the state before a ConnectionInit message has been accepted. PreInit { init: I, schema: S }, @@ -496,8 +439,8 @@ enum ConnectionSinkState> { Closed, } -/// Implements the graphql-ws protocol. This is a sink for `TryInto` and a stream of -/// `ServerMessage`. +/// Implements the `graphql-ws` protocol. +/// This is a sink for `TryInto` messages and a stream of `Output` messages. pub struct Connection> { reactions: SelectAll>>, stream_waker: Option, @@ -510,7 +453,8 @@ where S: Schema, I: Init, { - /// Creates a new connection, which is a sink for `TryInto` and a stream of `ServerMessage`. + /// Creates a new connection, which is a sink for `TryInto` messages and a stream of + /// `Output` messages. /// /// The `schema` argument should typically be an `Arc>`. /// @@ -533,7 +477,7 @@ where impl Sink for Connection where - T: TryInto>, + T: TryInto>, T::Error: Error, S: Schema, I: Init + Send, @@ -563,9 +507,19 @@ where *state = match std::mem::replace(state, ConnectionSinkState::Closed) { ConnectionSinkState::Ready { state } => { match item.try_into() { - Ok(msg) => ConnectionSinkState::HandlingMessage { + Ok(Input::Message(msg)) => ConnectionSinkState::HandlingMessage { result: state.handle_message(msg).boxed(), }, + Ok(Input::Close) => { + s.reactions.push( + Output::Close { + code: 1000, + message: "Normal Closure".into(), + } + .into_stream(), + ); + ConnectionSinkState::Closed + } Err(e) => { // If we weren't able to parse the message, we must close the connection. s.reactions.push( @@ -588,7 +542,7 @@ where >::poll_ready(self, cx) } - fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context) -> Poll> { self.sink_state = ConnectionSinkState::Closed; if let Some(waker) = self.stream_waker.take() { // Wake up the stream so it can close too. @@ -643,7 +597,7 @@ mod test { futures::sink::SinkExt, graphql_input_value, graphql_object, graphql_subscription, graphql_value, graphql_vars, parser::{ParseError, Spanning}, - DefaultScalarValue, EmptyMutation, FieldError, FieldResult, RootNode, + DefaultScalarValue, EmptyMutation, FieldError, FieldResult, RootNode, Variables, }; use super::*; diff --git a/juniper_graphql_transport_ws/src/schema.rs b/juniper_graphql_transport_ws/src/schema.rs index 68d282f0..e0eb4796 100644 --- a/juniper_graphql_transport_ws/src/schema.rs +++ b/juniper_graphql_transport_ws/src/schema.rs @@ -1,131 +1,2 @@ -use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue}; -use std::sync::Arc; - -/// Schema defines the requirements for schemas that can be used for operations. Typically this is -/// just an `Arc>` and you should not have to implement it yourself. -pub trait Schema: Unpin + Clone + Send + Sync + 'static { - /// The context type. - type Context: Unpin + Send + Sync; - - /// The scalar value type. - type ScalarValue: ScalarValue + Send + Sync; - - /// The query type info. - type QueryTypeInfo: Send + Sync; - - /// The query type. - type Query: GraphQLTypeAsync - + Send; - - /// The mutation type info. - type MutationTypeInfo: Send + Sync; - - /// The mutation type. - type Mutation: GraphQLTypeAsync< - Self::ScalarValue, - Context = Self::Context, - TypeInfo = Self::MutationTypeInfo, - > + Send; - - /// The subscription type info. - type SubscriptionTypeInfo: Send + Sync; - - /// The subscription type. - type Subscription: GraphQLSubscriptionType< - Self::ScalarValue, - Context = Self::Context, - TypeInfo = Self::SubscriptionTypeInfo, - > + Send; - - /// Returns the root node for the schema. - fn root_node( - &self, - ) -> &RootNode<'static, Self::Query, Self::Mutation, Self::Subscription, Self::ScalarValue>; -} - -/// This exists as a work-around for this issue: https://github.com/rust-lang/rust/issues/64552 -/// -/// It can be used in generators where using Arc directly would result in an error. -// TODO: Remove this once that issue is resolved. -#[doc(hidden)] -pub struct ArcSchema( - pub Arc>, -) -where - QueryT: GraphQLTypeAsync + Send + 'static, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + 'static, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + 'static, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Unpin + Send + Sync, - S: ScalarValue + Send + Sync + 'static; - -impl Clone - for ArcSchema -where - QueryT: GraphQLTypeAsync + Send + 'static, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + 'static, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + 'static, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Unpin + Send + Sync, - S: ScalarValue + Send + Sync + 'static, -{ - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl Schema - for ArcSchema -where - QueryT: GraphQLTypeAsync + Send + 'static, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + 'static, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + 'static, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Unpin + Send + Sync + 'static, - S: ScalarValue + Send + Sync + 'static, -{ - type Context = CtxT; - type ScalarValue = S; - type QueryTypeInfo = QueryT::TypeInfo; - type Query = QueryT; - type MutationTypeInfo = MutationT::TypeInfo; - type Mutation = MutationT; - type SubscriptionTypeInfo = SubscriptionT::TypeInfo; - type Subscription = SubscriptionT; - - fn root_node(&self) -> &RootNode<'static, QueryT, MutationT, SubscriptionT, S> { - &self.0 - } -} - -impl Schema - for Arc> -where - QueryT: GraphQLTypeAsync + Send + 'static, - QueryT::TypeInfo: Send + Sync, - MutationT: GraphQLTypeAsync + Send + 'static, - MutationT::TypeInfo: Send + Sync, - SubscriptionT: GraphQLSubscriptionType + Send + 'static, - SubscriptionT::TypeInfo: Send + Sync, - CtxT: Unpin + Send + Sync, - S: ScalarValue + Send + Sync + 'static, -{ - type Context = CtxT; - type ScalarValue = S; - type QueryTypeInfo = QueryT::TypeInfo; - type Query = QueryT; - type MutationTypeInfo = MutationT::TypeInfo; - type Mutation = MutationT; - type SubscriptionTypeInfo = SubscriptionT::TypeInfo; - type Subscription = SubscriptionT; - - fn root_node(&self) -> &RootNode<'static, QueryT, MutationT, SubscriptionT, S> { - self - } -} +#[doc(inline)] +pub use juniper_graphql_ws::{ArcSchema, Schema}; diff --git a/juniper_graphql_ws/CHANGELOG.md b/juniper_graphql_ws/CHANGELOG.md index a375a5bf..e809bdb8 100644 --- a/juniper_graphql_ws/CHANGELOG.md +++ b/juniper_graphql_ws/CHANGELOG.md @@ -13,6 +13,12 @@ All user visible changes to `juniper_graphql_ws` crate will be documented in thi - Switched to 0.16 version of [`juniper` crate]. - Switched to 0.17 version of [`juniper_subscriptions` crate]. +### Changed + +- Made fields of `ConnectionConfig` public to reuse in [`juniper_graphql_transport_ws` crate]. ([#1191]) + +[#1191]: /../../pull/1191 + @@ -24,5 +30,6 @@ See [old CHANGELOG](/../../blob/juniper_graphql_ws-v0.3.0/juniper_graphql_ws/CHA [`juniper` crate]: https://docs.rs/juniper +[`juniper_graphql_transport_ws` crate]: https://docs.rs/juniper_graphql_transport_ws [`juniper_subscriptions` crate]: https://docs.rs/juniper_subscriptions [Semantic Versioning 2.0.0]: https://semver.org diff --git a/juniper_graphql_ws/Cargo.toml b/juniper_graphql_ws/Cargo.toml index 44d3873f..51e39e5c 100644 --- a/juniper_graphql_ws/Cargo.toml +++ b/juniper_graphql_ws/Cargo.toml @@ -3,7 +3,7 @@ name = "juniper_graphql_ws" version = "0.4.0-dev" edition = "2021" rust-version = "1.65" -description = "GraphQL over WebSocket Protocol implementation for `juniper` crate." +description = "Legacy `graphql-ws` GraphQL over WebSocket Protocol implementation for `juniper` crate." license = "BSD-2-Clause" authors = ["Christopher Brown "] documentation = "https://docs.rs/juniper_graphql_ws" diff --git a/juniper_graphql_ws/LICENSE b/juniper_graphql_ws/LICENSE index 652dd95d..c64e2be7 100644 --- a/juniper_graphql_ws/LICENSE +++ b/juniper_graphql_ws/LICENSE @@ -1,6 +1,6 @@ BSD 2-Clause License -Copyright (c) 2018-2022, Christopher Brown +Copyright (c) 2018-2023, Christopher Brown All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/juniper_graphql_ws/README.md b/juniper_graphql_ws/README.md index ad39b0e1..b3a30411 100644 --- a/juniper_graphql_ws/README.md +++ b/juniper_graphql_ws/README.md @@ -8,7 +8,7 @@ - [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_ws/CHANGELOG.md) -This crate contains an implementation of the [graphql-ws WebSocket subprotocol], as formerly used by [Apollo]. It has now been deprecated in favor of the protocol implemented by the [`juniper_graphql_transport_ws` crate]. +This crate contains an implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old], as formerly used by [Apollo] and [`subscriptions-transport-ws` npm package]. It has now been deprecated in favor of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], implemented by the new [`juniper_graphql_transport_ws` crate] and new [`graphql-ws` npm package]. @@ -20,6 +20,9 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql -[Apollo]: https://www.apollographql.com -[graphql-ws WebSocket subprotocol]: https://github.com/apollographql/subscriptions-transport-ws/blob/0ce7a1e1eb687fe51214483e4735f50a2f2d5c79/PROTOCOL.md +[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws [`juniper_graphql_transport_ws` crate]: https://docs.rs/juniper_graphql_transport_ws +[`subscriptions-transport-ws` npm package]: https://npmjs.com/package/subscriptions-transport-ws +[Apollo]: https://www.apollographql.com +[new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md +[old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md diff --git a/juniper_graphql_ws/src/lib.rs b/juniper_graphql_ws/src/lib.rs index 957d1a04..97741311 100644 --- a/juniper_graphql_ws/src/lib.rs +++ b/juniper_graphql_ws/src/lib.rs @@ -36,10 +36,23 @@ struct ExecutionParams { /// ConnectionConfig is used to configure the connection once the client sends the ConnectionInit /// message. +#[derive(Clone, Copy, Debug)] pub struct ConnectionConfig { - context: CtxT, - max_in_flight_operations: usize, - keep_alive_interval: Duration, + /// Custom-provided [`juniper::Context`]. + pub context: CtxT, + + /// Maximum number of in-flight operations that a connection can have. + /// + /// If this number is exceeded, attempting to start more will result in an error. + /// By default, there is no limit to in-flight operations. + pub max_in_flight_operations: usize, + + /// Interval at which to send keep-alives. + /// + /// Specifying a [`Duration::ZERO`] will disable keep-alives. + /// + /// By default, keep-alives are sent every 15 seconds. + pub keep_alive_interval: Duration, } impl ConnectionConfig { @@ -52,17 +65,21 @@ impl ConnectionConfig { } } - /// Specifies the maximum number of in-flight operations that a connection can have. If this - /// number is exceeded, attempting to start more will result in an error. By default, there is - /// no limit to in-flight operations. + /// Specifies the maximum number of in-flight operations that a connection can have. + /// + /// If this number is exceeded, attempting to start more will result in an error. + /// By default, there is no limit to in-flight operations. #[must_use] pub fn with_max_in_flight_operations(mut self, max: usize) -> Self { self.max_in_flight_operations = max; self } - /// Specifies the interval at which to send keep-alives. Specifying a zero duration will - /// disable keep-alives. By default, keep-alives are sent every 15 seconds. + /// Specifies the interval at which to send keep-alives. + /// + /// Specifying a [`Duration::ZERO`] will disable keep-alives. + /// + /// By default, keep-alives are sent every 15 seconds. #[must_use] pub fn with_keep_alive_interval(mut self, interval: Duration) -> Self { self.keep_alive_interval = interval; diff --git a/juniper_subscriptions/Cargo.toml b/juniper_subscriptions/Cargo.toml index 5f02c2d6..02239979 100644 --- a/juniper_subscriptions/Cargo.toml +++ b/juniper_subscriptions/Cargo.toml @@ -20,4 +20,4 @@ juniper = { version = "0.16.0-dev", path = "../juniper", default-features = fals [dev-dependencies] serde_json = "1.0.18" -tokio = { version = "1.0", features = ["macros", "rt"] } +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } diff --git a/juniper_subscriptions/README.md b/juniper_subscriptions/README.md index f77245c6..097ffc44 100644 --- a/juniper_subscriptions/README.md +++ b/juniper_subscriptions/README.md @@ -27,7 +27,7 @@ For `SubscriptionCoordinator` and `SubscriptionConnection` documentation, check ## Examples -Check [`examples/warp_subscriptions/`][1] for example code of a working [`warp`] server with [GraphQL] subscription handlers. +Check [`juniper_warp/examples/subscription.rs`][1] for example code of a working [`warp`] server with [GraphQL] subscription handlers. @@ -43,4 +43,4 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql [`warp`]: https://docs.rs/warp [GraphQL]: http://graphql.org -[1]: https://github.com/graphql-rust/juniper/blob/master/examples/warp_subscriptions/src/main.rs +[1]: https://github.com/graphql-rust/juniper/blob/master/juniper_warp/examples/subscription.rs diff --git a/examples/basic_subscriptions/src/main.rs b/juniper_subscriptions/examples/basic.rs similarity index 100% rename from examples/basic_subscriptions/src/main.rs rename to juniper_subscriptions/examples/basic.rs diff --git a/juniper_warp/CHANGELOG.md b/juniper_warp/CHANGELOG.md index 8ee5d26d..c328dfc6 100644 --- a/juniper_warp/CHANGELOG.md +++ b/juniper_warp/CHANGELOG.md @@ -12,12 +12,19 @@ All user visible changes to `juniper_warp` crate will be documented in this file - Switched to 0.16 version of [`juniper` crate]. +### Added + +- `subscriptions::serve_graphql_transport_ws()` function allowing to process the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][graphql-transport-ws]. ([#1158]) +- `subscriptions::make_ws_filter()` function providing endpoint with auto-selection between the [legacy `graphql-ws` GraphQL over WebSocket Protocol][graphql-ws] and the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][graphql-transport-ws], based on the `Sec-Websocket-Protocol` HTTP header value. ([#1191]) + ### Changed - Made `schema` argument of `make_graphql_filter()` and `make_graphql_filter_sync()` polymorphic, allowing to specify external `Arc`ed `schema`. ([#1136], [#1135]) [#1135]: /../../issues/1136 [#1136]: /../../pull/1136 +[#1158]: /../../pull/1158 +[#1191]: /../../pull/1191 @@ -31,3 +38,5 @@ See [old CHANGELOG](/../../blob/juniper_warp-v0.7.0/juniper_warp/CHANGELOG.md). [`juniper` crate]: https://docs.rs/juniper [Semantic Versioning 2.0.0]: https://semver.org +[graphql-transport-ws]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md +[graphql-ws]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md diff --git a/juniper_warp/Cargo.toml b/juniper_warp/Cargo.toml index a4620dbe..95e9c0b3 100644 --- a/juniper_warp/Cargo.toml +++ b/juniper_warp/Cargo.toml @@ -12,14 +12,19 @@ repository = "https://github.com/graphql-rust/juniper" readme = "README.md" categories = ["asynchronous", "web-programming", "web-programming::http-server"] keywords = ["apollo", "graphql", "juniper", "warp", "websocket"] -exclude = ["/examples/", "/release.toml"] +include = ["/examples/", "/src/", "/CHANGELOG.md", "/LICENSE", "/README.md"] [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] [features] -subscriptions = ["dep:juniper_graphql_ws", "dep:juniper_graphql_transport_ws", "warp/websocket"] +subscriptions = [ + "dep:juniper_graphql_transport_ws", + "dep:juniper_graphql_ws", + "dep:log", + "warp/websocket", +] [dependencies] anyhow = "1.0.47" @@ -27,6 +32,7 @@ futures = "0.3.22" juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false } juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", optional = true } juniper_graphql_transport_ws = { version = "0.4.0-dev", path = "../juniper_graphql_transport_ws", optional = true } +log = { version = "0.4", optional = true } serde = { version = "1.0.122", features = ["derive"] } serde_json = "1.0.18" thiserror = "1.0" @@ -38,9 +44,14 @@ warp = { version = "0.3.2", default-features = false } headers = "0.3.8" [dev-dependencies] +async-stream = "0.3" env_logger = "0.10" juniper = { version = "0.16.0-dev", path = "../juniper", features = ["expose-test-schema"] } log = "0.4" percent-encoding = "2.1" tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } url = "2.0" + +[[example]] +name = "subscription" +required-features = ["subscriptions"] diff --git a/juniper_warp/README.md b/juniper_warp/README.md index 5679e19a..f98d4121 100644 --- a/juniper_warp/README.md +++ b/juniper_warp/README.md @@ -24,7 +24,7 @@ A basic usage example can also be found in the [API docs][`juniper_warp`]. ## Examples -Check [`examples/warp_server.rs`][1] for example code of a working [`warp`] server with [GraphQL] handlers. +Check [`examples/subscription.rs`][1] for example code of a working [`warp`] server with [GraphQL] handlers and subscriptions. @@ -43,4 +43,4 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql [Juniper Book]: https://graphql-rust.github.io [Rust]: https://www.rust-lang.org -[1]: https://github.com/graphql-rust/juniper/blob/master/juniper_warp/examples/warp_server.rs +[1]: https://github.com/graphql-rust/juniper/blob/master/juniper_warp/examples/subscription.rs diff --git a/examples/warp_subscriptions/src/main.rs b/juniper_warp/examples/subscription.rs similarity index 57% rename from examples/warp_subscriptions/src/main.rs rename to juniper_warp/examples/subscription.rs index 1f6800a6..f01cdde2 100644 --- a/examples/warp_subscriptions/src/main.rs +++ b/juniper_warp/examples/subscription.rs @@ -1,18 +1,13 @@ -//! This example demonstrates asynchronous subscriptions with warp and tokio 0.2 +//! This example demonstrates asynchronous subscriptions with [`warp`]. use std::{env, pin::Pin, sync::Arc, time::Duration}; -use futures::{FutureExt as _, Stream}; +use futures::Stream; use juniper::{ graphql_object, graphql_subscription, graphql_value, EmptyMutation, FieldError, GraphQLEnum, RootNode, }; use juniper_graphql_transport_ws::ConnectionConfig; -use juniper_graphql_ws::ConnectionConfig as LegacyConnectionConfig; -use juniper_warp::{ - graphiql_filter, playground_filter, - subscriptions::{serve_graphql_transport_ws, serve_graphql_ws}, -}; use warp::{http::Response, Filter}; #[derive(Clone)] @@ -145,73 +140,49 @@ fn schema() -> Schema { #[tokio::main] async fn main() { - env::set_var("RUST_LOG", "warp_subscriptions"); + env::set_var("RUST_LOG", "subscription"); env_logger::init(); - let log = warp::log("warp_subscriptions"); - + let log = warp::log("subscription"); let homepage = warp::path::end().map(|| { Response::builder() .header("content-type", "text/html") - .body("

juniper_subscriptions demo

visit graphql playground") + .body( + "

juniper_warp/subscription example

\ +
visit GraphiQL
\ + \ + ", + ) }); + let schema = Arc::new(schema()); - let qm_schema = schema(); - let qm_state = warp::any().map(|| Context); - let qm_graphql_filter = juniper_warp::make_graphql_filter(qm_schema, qm_state.boxed()); - - let ws_schema = Arc::new(schema()); - let transport_ws_schema = ws_schema.clone(); + let routes = (warp::post() + .and(warp::path("graphql")) + .and(juniper_warp::make_graphql_filter( + schema.clone(), + warp::any().map(|| Context).boxed(), + ))) + .or( + warp::path("subscriptions").and(juniper_warp::subscriptions::make_ws_filter( + schema, + ConnectionConfig::new(Context), + )), + ) + .or(warp::get() + .and(warp::path("playground")) + .and(juniper_warp::playground_filter( + "/graphql", + Some("/subscriptions"), + ))) + .or(warp::get() + .and(warp::path("graphiql")) + .and(juniper_warp::graphiql_filter( + "/graphql", + Some("/subscriptions"), + ))) + .or(homepage) + .with(log); log::info!("Listening on 127.0.0.1:8080"); - - let routes = warp::path("subscriptions") - .and(warp::ws()) - .map(move |ws: warp::ws::Ws| { - let transport_ws_schema = transport_ws_schema.clone(); - ws.on_upgrade(move |websocket| async move { - serve_graphql_transport_ws( - websocket, - transport_ws_schema, - ConnectionConfig::new(Context), - ) - .map(|r| { - if let Err(e) = r { - println!("Websocket error: {e}"); - } - }) - .await - }) - }) - .or(warp::path("legacy-subscriptions") - .and(warp::ws()) - .map(move |ws: warp::ws::Ws| { - let ws_schema = ws_schema.clone(); - ws.on_upgrade(move |websocket| async move { - serve_graphql_ws(websocket, ws_schema, LegacyConnectionConfig::new(Context)) - .map(|r| { - if let Err(e) = r { - println!("Websocket error: {e}"); - } - }) - .await - }) - }) - .map(|reply| { - // TODO#584: remove this workaround - warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws") - })) - .or(warp::post() - .and(warp::path("graphql")) - .and(qm_graphql_filter)) - .or(warp::get() - .and(warp::path("playground")) - .and(playground_filter("/graphql", Some("/legacy-subscriptions")))) - .or(warp::get() - .and(warp::path("graphiql")) - .and(graphiql_filter("/graphql", Some("/subscriptions")))) - .or(homepage) - .with(log); - warp::serve(routes).run(([127, 0, 0, 1], 8080)).await; } diff --git a/juniper_warp/examples/warp_server.rs b/juniper_warp/examples/warp_server.rs deleted file mode 100644 index 97d3d1dd..00000000 --- a/juniper_warp/examples/warp_server.rs +++ /dev/null @@ -1,51 +0,0 @@ -#![deny(warnings)] - -use std::env; - -use juniper::{ - tests::fixtures::starwars::schema::{Database, Query}, - EmptyMutation, EmptySubscription, RootNode, -}; -use warp::{http::Response, Filter}; - -type Schema = RootNode<'static, Query, EmptyMutation, EmptySubscription>; - -fn schema() -> Schema { - Schema::new( - Query, - EmptyMutation::::new(), - EmptySubscription::::new(), - ) -} - -#[tokio::main] -async fn main() { - env::set_var("RUST_LOG", "warp_server"); - env_logger::init(); - - let log = warp::log("warp_server"); - - let homepage = warp::path::end().map(|| { - Response::builder() - .header("content-type", "text/html") - .body(format!( - "

juniper_warp

visit /graphiql" - )) - }); - - log::info!("Listening on 127.0.0.1:8080"); - - let state = warp::any().map(Database::new); - let graphql_filter = juniper_warp::make_graphql_filter(schema(), state.boxed()); - - warp::serve( - warp::get() - .and(warp::path("graphiql")) - .and(juniper_warp::graphiql_filter("/graphql", None)) - .or(homepage) - .or(warp::path("graphql").and(graphql_filter)) - .with(log), - ) - .run(([127, 0, 0, 1], 8080)) - .await -} diff --git a/juniper_warp/src/lib.rs b/juniper_warp/src/lib.rs index 48adecc1..c95805a4 100644 --- a/juniper_warp/src/lib.rs +++ b/juniper_warp/src/lib.rs @@ -14,17 +14,17 @@ use juniper::{ use tokio::task; use warp::{body, filters::BoxedFilter, http, hyper::body::Bytes, query, Filter}; -/// Make a filter for graphql queries/mutations. +/// Makes a filter for GraphQL queries/mutations. /// -/// The `schema` argument is your juniper schema. +/// The `schema` argument is your [`juniper`] schema. /// /// The `context_extractor` argument should be a filter that provides the GraphQL context required by the schema. /// /// In order to avoid blocking, this helper will use the `tokio_threadpool` threadpool created by hyper to resolve GraphQL requests. /// -/// Example: +/// # Example /// -/// ``` +/// ```rust /// # use std::sync::Arc; /// # use warp::Filter; /// # use juniper::{graphql_object, EmptyMutation, EmptySubscription, RootNode}; @@ -336,14 +336,8 @@ fn playground_response( .expect("response is valid") } -/// `juniper_warp` subscriptions handler implementation. -/// Cannot be merged to `juniper_warp` yet as GraphQL over WS[1] -/// is not fully supported in current implementation. -/// -/// *Note: this implementation is in an alpha state.* -/// -/// [1]: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md #[cfg(feature = "subscriptions")] +/// `juniper_warp` subscriptions handler implementation. pub mod subscriptions { use std::{convert::Infallible, fmt, sync::Arc}; @@ -357,6 +351,7 @@ pub mod subscriptions { }; use juniper_graphql_transport_ws; use juniper_graphql_ws; + use warp::{filters::BoxedFilter, reply::Reply, Filter as _}; struct Message(warp::ws::Message); @@ -364,15 +359,23 @@ pub mod subscriptions { type Error = serde_json::Error; fn try_from(msg: Message) -> serde_json::Result { - serde_json::from_slice(msg.0.as_bytes()) + if msg.0.is_close() { + Ok(Self::ConnectionTerminate) + } else { + serde_json::from_slice(msg.0.as_bytes()) + } } } - impl TryFrom for juniper_graphql_transport_ws::ClientMessage { + impl TryFrom for juniper_graphql_transport_ws::Input { type Error = serde_json::Error; fn try_from(msg: Message) -> serde_json::Result { - serde_json::from_slice(msg.0.as_bytes()) + if msg.0.is_close() { + Ok(Self::Close) + } else { + serde_json::from_slice(msg.0.as_bytes()).map(Self::Message) + } } } @@ -410,16 +413,173 @@ pub mod subscriptions { } } - /// Serves the graphql-ws protocol over a WebSocket connection. + /// Makes a filter for GraphQL subscriptions. + /// + /// This filter auto-selects between the + /// [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] and the + /// [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], based on the + /// `Sec-Websocket-Protocol` HTTP header value. + /// + /// The `schema` argument is your [`juniper`] schema. + /// + /// The `init` argument is used to provide the custom [`juniper::Context`] and additional + /// configuration for connections. This can be a + /// [`juniper_graphql_transport_ws::ConnectionConfig`] if the context and configuration are + /// already known, or it can be a closure that gets executed asynchronously whenever a client + /// sends the subscription initialization message. Using a closure allows to perform an + /// authentication based on the parameters provided by a client. + /// + /// # Example + /// + /// ```rust + /// # use std::{convert::Infallible, pin::Pin, sync::Arc, time::Duration}; + /// # + /// # use futures::Stream; + /// # use juniper::{graphql_object, graphql_subscription, EmptyMutation, RootNode}; + /// # use juniper_graphql_transport_ws::ConnectionConfig; + /// # use juniper_warp::make_graphql_filter; + /// # use warp::Filter as _; + /// # + /// type UserId = String; + /// # #[derive(Debug)] + /// struct AppState(Vec); + /// #[derive(Clone)] + /// struct ExampleContext(Arc, UserId); + /// # impl juniper::Context for ExampleContext {} + /// + /// struct QueryRoot; + /// + /// #[graphql_object(context = ExampleContext)] + /// impl QueryRoot { + /// fn say_hello(context: &ExampleContext) -> String { + /// format!( + /// "good morning {}, the app state is {:?}", + /// context.1, + /// context.0, + /// ) + /// } + /// } + /// + /// type StringsStream = Pin + Send>>; + /// + /// struct SubscriptionRoot; + /// + /// #[graphql_subscription(context = ExampleContext)] + /// impl SubscriptionRoot { + /// async fn say_hellos(context: &ExampleContext) -> StringsStream { + /// let mut interval = tokio::time::interval(Duration::from_secs(1)); + /// let context = context.clone(); + /// Box::pin(async_stream::stream! { + /// let mut counter = 0; + /// while counter < 5 { + /// counter += 1; + /// interval.tick().await; + /// yield format!( + /// "{counter}: good morning {}, the app state is {:?}", + /// context.1, + /// context.0, + /// ) + /// } + /// }) + /// } + /// } + /// + /// let schema = Arc::new(RootNode::new(QueryRoot, EmptyMutation::new(), SubscriptionRoot)); + /// let app_state = Arc::new(AppState(vec![3, 4, 5])); + /// let app_state_for_ws = app_state.clone(); + /// + /// let context_extractor = warp::any() + /// .and(warp::header::("authorization")) + /// .and(warp::any().map(move || app_state.clone())) + /// .map(|auth_header: String, app_state: Arc| { + /// let user_id = auth_header; // we believe them + /// ExampleContext(app_state, user_id) + /// }) + /// .boxed(); + /// + /// let graphql_endpoint = (warp::path("graphql") + /// .and(warp::post()) + /// .and(make_graphql_filter(schema.clone(), context_extractor))) + /// .or(warp::path("subscriptions") + /// .and(juniper_warp::subscriptions::make_ws_filter( + /// schema, + /// move |variables: juniper::Variables| { + /// let user_id = variables + /// .get("authorization") + /// .map(ToString::to_string) + /// .unwrap_or_default(); // we believe them + /// async move { + /// Ok::<_, Infallible>(ConnectionConfig::new( + /// ExampleContext(app_state_for_ws.clone(), user_id), + /// )) + /// } + /// }, + /// ))); + /// ``` + /// + /// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md + /// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md + pub fn make_ws_filter( + schema: impl Into>>, + init: I, + ) -> BoxedFilter<(impl Reply,)> + where + Query: GraphQLTypeAsync + Send + 'static, + Query::TypeInfo: Send + Sync, + Mutation: GraphQLTypeAsync + Send + 'static, + Mutation::TypeInfo: Send + Sync, + Subscription: GraphQLSubscriptionType + Send + 'static, + Subscription::TypeInfo: Send + Sync, + CtxT: Unpin + Send + Sync + 'static, + S: ScalarValue + Send + Sync + 'static, + I: juniper_graphql_transport_ws::Init + Clone + Send + Sync, + { + let schema = schema.into(); + + warp::ws() + .and(warp::filters::header::value("sec-websocket-protocol")) + .map(move |ws: warp::ws::Ws, subproto| { + let schema = schema.clone(); + let init = init.clone(); + + let is_legacy = subproto == "graphql-ws"; + + warp::reply::with_header( + ws.on_upgrade(move |ws| async move { + if is_legacy { + serve_graphql_ws(ws, schema, init).await + } else { + serve_graphql_transport_ws(ws, schema, init).await + } + .unwrap_or_else(|e| { + log::error!("GraphQL over WebSocket Protocol error: {e}"); + }) + }), + "sec-websocket-protocol", + if is_legacy { + "graphql-ws" + } else { + "graphql-transport-ws" + }, + ) + }) + .boxed() + } + + /// Serves the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old]. /// /// The `init` argument is used to provide the context and additional configuration for - /// connections. This can be a `juniper_graphql_ws::ConnectionConfig` if the context and + /// connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the context and /// configuration are already known, or it can be a closure that gets executed asynchronously - /// when the client sends the ConnectionInit message. Using a closure allows you to perform - /// authentication based on the parameters provided by the client. + /// when the client sends the `GQL_CONNECTION_INIT` message. Using a closure allows to perform + /// an authentication based on the parameters provided by a client. /// - /// This protocol has been deprecated in favor of the `graphql-transport-ws` protocol, which is - /// provided by the `serve_graphql_transport_ws` function. + /// > __WARNING__: This protocol has been deprecated in favor of the + /// [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], which is + /// provided by the [`serve_graphql_transport_ws()`] function. + /// + /// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md + /// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md pub async fn serve_graphql_ws( websocket: warp::ws::WebSocket, root_node: Arc>, @@ -459,13 +619,15 @@ pub mod subscriptions { } } - /// Serves the graphql-transport-ws protocol over a WebSocket connection. + /// Serves the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new]. /// /// The `init` argument is used to provide the context and additional configuration for - /// connections. This can be a `juniper_graphql_transport_ws::ConnectionConfig` if the context and - /// configuration are already known, or it can be a closure that gets executed asynchronously - /// when the client sends the ConnectionInit message. Using a closure allows you to perform - /// authentication based on the parameters provided by the client. + /// connections. This can be a [`juniper_graphql_transport_ws::ConnectionConfig`] if the context + /// and configuration are already known, or it can be a closure that gets executed + /// asynchronously when the client sends the `ConnectionInit` message. Using a closure allows to + /// perform an authentication based on the parameters provided by a client. + /// + /// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md pub async fn serve_graphql_transport_ws( websocket: warp::ws::WebSocket, root_node: Arc>,