Improve WebSocket integration (#1191, #1022)

- consider `juniper_graphql_transport_ws` crate on CI
- implement auto-selection of protocol in `juniper_warp` crate
- support `graphql-transport-ws` protocol in  `juniper_actix` crate
- implement auto-selection of protocol in `juniper_actix` crate

Additionally:
- move `examples/warp_subscriptions` into `juniper_warp/examples/subscription.rs`
- move `examples/actix_subscriptions` into `juniper_actix/examples/subscription.rs`
- move `examples/basic_subscriptions` into `juniper_subscriptions/examples/basic.rs`
- bump up MSRV of `juniper_actix` crate to 1.68
This commit is contained in:
Kai Ren 2023-09-21 23:24:41 +02:00 committed by GitHub
parent a74ea9ccb6
commit 9849736582
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 622 additions and 944 deletions

View file

@ -25,7 +25,6 @@ jobs:
needs:
- bench
- clippy
- example
- feature
- msrv
- release-check
@ -84,32 +83,6 @@ jobs:
- run: cargo clippy -p juniper_benchmarks --benches -- -D warnings
- run: cargo bench -p juniper_benchmarks
example:
strategy:
fail-fast: false
matrix:
example:
- actix_subscriptions
- basic_subscriptions
- warp_async
- warp_subscriptions
os:
- ubuntu
- macOS
- windows
toolchain:
- stable
- beta
- nightly
runs-on: ${{ matrix.os }}-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@v1
with:
toolchain: ${{ matrix.toolchain }}
- run: cargo check -p example_${{ matrix.example }}
feature:
strategy:
fail-fast: false
@ -161,8 +134,9 @@ jobs:
- juniper_codegen
- juniper
- juniper_subscriptions
- juniper_graphql_transport_ws
- juniper_graphql_ws
- juniper_actix
#- juniper_actix
- juniper_hyper
#- juniper_iron
- juniper_rocket
@ -171,6 +145,10 @@ jobs:
- ubuntu
- macOS
- windows
include:
- { msrv: "1.68.0", crate: "juniper_actix", os: "ubuntu" }
- { msrv: "1.68.0", crate: "juniper_actix", os: "macOS" }
- { msrv: "1.68.0", crate: "juniper_actix", os: "windows" }
runs-on: ${{ matrix.os }}-latest
steps:
- uses: actions/checkout@v4
@ -184,9 +162,6 @@ jobs:
- run: cargo +nightly update -Z minimal-versions
- run: make test.cargo crate=${{ matrix.crate }}
if: ${{ !contains(fromJSON('["juniper_actix"]'), matrix.crate) }}
- run: cargo check -p ${{ matrix.crate }} --all-features
if: ${{ contains(fromJSON('["juniper_actix"]'), matrix.crate) }}
package:
if: ${{ startsWith(github.ref, 'refs/tags/juniper') }}
@ -214,6 +189,7 @@ jobs:
- juniper_codegen
- juniper
- juniper_subscriptions
- juniper_graphql_transport_ws
- juniper_graphql_ws
- juniper_integration_tests
- juniper_codegen_tests
@ -342,6 +318,7 @@ jobs:
- juniper_codegen
- juniper
- juniper_subscriptions
- juniper_graphql_transport_ws
- juniper_graphql_ws
- juniper_actix
- juniper_hyper
@ -366,7 +343,6 @@ jobs:
needs:
- bench
- clippy
- example
- feature
- msrv
- package

View file

@ -2,18 +2,14 @@
resolver = "1" # unifying Cargo features asap for Book tests
members = [
"benches",
"examples/basic_subscriptions",
"examples/warp_async",
"examples/warp_subscriptions",
"examples/actix_subscriptions",
"juniper_codegen",
"juniper",
"juniper_hyper",
"juniper_iron",
"juniper_rocket",
"juniper_subscriptions",
"juniper_graphql_ws",
"juniper_graphql_transport_ws",
"juniper_graphql_ws",
"juniper_warp",
"juniper_actix",
"tests/codegen",

View file

@ -154,8 +154,8 @@ async fn run_subscription() {
Currently there is an example of subscriptions with [warp][warp], but it still in an alpha state.
GraphQL over [WS][WS] is not fully supported yet and is non-standard.
- [Warp Subscription Example](https://github.com/graphql-rust/juniper/tree/master/examples/warp_subscriptions)
- [Small Example](https://github.com/graphql-rust/juniper/tree/master/examples/basic_subscriptions)
- [Warp Subscription Example](https://github.com/graphql-rust/juniper/tree/master/juniper_warp/examples/subscription.rs)
- [Small Example](https://github.com/graphql-rust/juniper/tree/master/juniper_subscriptions/examples/basic.rs)

View file

@ -14,10 +14,11 @@ juniper = "0.16.0"
juniper_warp = "0.8.0"
```
Included in the source is a [small example][example] which sets up a basic GraphQL and [GraphiQL] handler.
Included in the source is a [small example][example] which sets up a basic GraphQL and [GraphiQL]/[GraphQL Playground] handlers with subscriptions support.
[graphiql]: https://github.com/graphql/graphiql
[GraphiQL]: https://github.com/graphql/graphiql
[GraphQL Playground]: https://github.com/prisma/graphql-playground
[hyper]: https://hyper.rs/
[warp]: https://crates.io/crates/warp
[juniper_warp]: https://github.com/graphql-rust/juniper/tree/master/juniper_warp
[example]: https://github.com/graphql-rust/juniper/blob/master/juniper_warp/examples/warp_server.rs
[example]: https://github.com/graphql-rust/juniper/blob/master/juniper_warp/examples/subscription.rs

View file

@ -1,56 +0,0 @@
# Juniper Examples
This directory contains examples of how to use Juniper.
## How to run
To run an example, you need to have a working Rust toolchain installed. You can
get it from [rustup](https://rustup.rs/).
Then, you can run the example using its workspace:
```bash
cargo run --example <example_name>
```
Where `<example_name>` is one of the following workspace members:
```
actix_server
hyper_server
iron_server
rocket_server
warp_server
```
e.g. to run the `actix_server` example:
```bash
cargo run --example actix_server
```
You can also run an example directly from an `examples` workspace directory. To
run the `actix_server` example:
```bash
cd examples/actix_subscriptions
cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.13s
Running `/path/to/repo/juniper/target/debug/example_actix_subscriptions`
[2022-11-20T07:46:08Z INFO actix_server::builder] Starting 10 workers
[2022-11-20T07:46:08Z INFO actix_server::server] Actix runtime found; starting in Actix runtime
```
Note if you want to run the code within your own project, you need to change
the relative paths in `Cargo.toml`, e.g:
```toml
juniper_graphql_ws = { path = "../../juniper_graphql_ws" }
```
to:
```toml
juniper_graphql_ws = "0.3.0"
```

View file

@ -1,21 +0,0 @@
[package]
name = "example_actix_subscriptions"
version = "0.0.0"
edition = "2021"
rust-version = "1.65"
authors = ["Mihai Dinculescu <mihai.dinculescu@outlook.com>"]
publish = false
[dependencies]
actix-cors = "0.6"
actix-web = "4.0"
async-stream = "0.3"
env_logger = "0.10"
futures = "0.3"
juniper = { path = "../../juniper", features = ["expose-test-schema"] }
juniper_actix = { path = "../../juniper_actix", features = ["subscriptions"] }
juniper_graphql_ws = { path = "../../juniper_graphql_ws" }
rand = "0.8"
serde = "1.0"
serde_json = "1.0"
tokio = "1.0"

View file

@ -1,15 +0,0 @@
[package]
name = "example_basic_subscriptions"
version = "0.0.0"
edition = "2021"
rust-version = "1.65"
authors = ["Jordao Rosario <jordao.rosario01@gmail.com>"]
publish = false
[dependencies]
futures = "0.3"
juniper = { path = "../../juniper" }
juniper_subscriptions = { path = "../../juniper_subscriptions" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }

View file

@ -1,17 +0,0 @@
[package]
name = "example_warp_async"
version = "0.0.0"
edition = "2021"
rust-version = "1.65"
authors = ["Christoph Herzog <chris@theduke.at>"]
publish = false
[dependencies]
env_logger = "0.10"
futures = "0.3"
juniper = { path = "../../juniper" }
juniper_warp = { path = "../../juniper_warp" }
log = "0.4"
reqwest = { version = "0.11", features = ["rustls-tls"], default-features = false }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
warp = "0.3"

View file

@ -1,104 +0,0 @@
//! This example demonstrates async/await usage with warp.
use juniper::{
graphql_object, EmptyMutation, EmptySubscription, FieldError, GraphQLEnum, RootNode,
};
use warp::{http::Response, Filter};
#[derive(Clone, Copy, Debug)]
struct Context;
impl juniper::Context for Context {}
#[derive(Clone, Copy, Debug, GraphQLEnum)]
enum UserKind {
Admin,
User,
Guest,
}
#[derive(Clone, Debug)]
struct User {
id: i32,
kind: UserKind,
name: String,
}
#[graphql_object(context = Context)]
impl User {
fn id(&self) -> i32 {
self.id
}
fn kind(&self) -> UserKind {
self.kind
}
fn name(&self) -> &str {
&self.name
}
async fn friends(&self) -> Vec<User> {
vec![]
}
}
#[derive(Clone, Copy, Debug)]
struct Query;
#[graphql_object(context = Context)]
impl Query {
async fn users() -> Vec<User> {
vec![User {
id: 1,
kind: UserKind::Admin,
name: "user1".into(),
}]
}
/// Fetch a URL and return the response body text.
async fn request(url: String) -> Result<String, FieldError> {
Ok(reqwest::get(&url).await?.text().await?)
}
}
type Schema = RootNode<'static, Query, EmptyMutation<Context>, EmptySubscription<Context>>;
fn schema() -> Schema {
Schema::new(
Query,
EmptyMutation::<Context>::new(),
EmptySubscription::<Context>::new(),
)
}
#[tokio::main]
async fn main() {
std::env::set_var("RUST_LOG", "warp_async");
env_logger::init();
let log = warp::log("warp_server");
let homepage = warp::path::end().map(|| {
Response::builder()
.header("content-type", "text/html")
.body(
"<html><h1>juniper_warp</h1><div>visit <a href=\"/graphiql\">/graphiql</a></html>",
)
});
log::info!("Listening on 127.0.0.1:8080");
let state = warp::any().map(|| Context);
let graphql_filter = juniper_warp::make_graphql_filter(schema(), state.boxed());
warp::serve(
warp::get()
.and(warp::path("graphiql"))
.and(juniper_warp::graphiql_filter("/graphql", None))
.or(homepage)
.or(warp::path("graphql").and(graphql_filter))
.with(log),
)
.run(([127, 0, 0, 1], 8080))
.await
}

View file

@ -1,20 +0,0 @@
[package]
name = "example_warp_subscriptions"
version = "0.0.0"
edition = "2021"
rust-version = "1.65"
publish = false
[dependencies]
async-stream = "0.3"
env_logger = "0.10"
futures = "0.3"
juniper = { path = "../../juniper" }
juniper_graphql_ws = { path = "../../juniper_graphql_ws" }
juniper_graphql_transport_ws = { path = "../../juniper_graphql_transport_ws" }
juniper_warp = { path = "../../juniper_warp", features = ["subscriptions"] }
log = "0.4.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
warp = "0.3"

View file

@ -51,7 +51,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi
- Disabled `chrono` [Cargo feature] by default.
- Removed `scalar-naivetime` [Cargo feature].
- Removed lifetime parameter from `ParseError`, `GraphlQLError`, `GraphQLBatchRequest` and `GraphQLRequest`. ([#1081], [#528])
- Upgraded [GraphiQL] to 3.0.6 version (requires new [`graphql-ws` GraphQL over WebSocket Protocol] integration on server, see `examples/warp_subscriptions`). ([#1188], [#1193])
- Upgraded [GraphiQL] to 3.0.6 version (requires new [`graphql-transport-ws` GraphQL over WebSocket Protocol] integration on server, see `juniper_warp/examples/subscription.rs`). ([#1188], [#1193])
### Added
@ -145,8 +145,8 @@ See [old CHANGELOG](/../../blob/juniper-v0.15.9/juniper/CHANGELOG.md).
[`chrono-tz` crate]: https://docs.rs/chrono-tz
[`time` crate]: https://docs.rs/time
[Cargo feature]: https://doc.rust-lang.org/cargo/reference/features.html
[`graphql-ws` GraphQL over WebSocket Protocol]: https://github.com/graphql/graphiql
[GraphiQL]: https://github.com/enisdenjo/graphql-ws/master/PROTOCOL.md
[`graphql-transport-ws` GraphQL over WebSocket Protocol]: https://github.com/enisdenjo/graphql-ws/v5.14.0/PROTOCOL.md
[GraphiQL]: https://github.com/graphql/graphiql
[GraphQL Playground]: https://github.com/prisma/graphql-playground
[graphql-scalars.dev]: https://graphql-scalars.dev
[October 2021]: https://spec.graphql.org/October2021

View file

@ -13,6 +13,12 @@ All user visible changes to `juniper_actix` crate will be documented in this fil
- Switched to 4.0 version of [`actix-web` crate] and its ecosystem. ([#1034])
- Switched to 0.16 version of [`juniper` crate].
- Switched to 0.4 version of [`juniper_graphql_ws` crate].
- Renamed `subscriptions::subscriptions_handler()` as `subscriptions::graphql_ws_handler()` for processing the [legacy `graphql-ws` GraphQL over WebSocket Protocol][graphql-ws]. ([#1191])
### Added
- `subscriptions::graphql_transport_ws_handler()` allowing to process the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][graphql-transport-ws]. ([#1191])
- `subscriptions::ws_handler()` with auto-selection between the [legacy `graphql-ws` GraphQL over WebSocket Protocol][graphql-ws] and the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][graphql-transport-ws], based on the `Sec-Websocket-Protocol` HTTP header value. ([#1191])
### Fixed
@ -21,6 +27,7 @@ All user visible changes to `juniper_actix` crate will be documented in this fil
[#1034]: /../../pull/1034
[#1169]: /../../issues/1169
[#1187]: /../../pull/1187
[#1191]: /../../pull/1191
@ -36,3 +43,5 @@ See [old CHANGELOG](/../../blob/juniper_actix-v0.4.0/juniper_actix/CHANGELOG.md)
[`juniper` crate]: https://docs.rs/juniper
[`juniper_graphql_ws` crate]: https://docs.rs/juniper_graphql_ws
[Semantic Versioning 2.0.0]: https://semver.org
[graphql-transport-ws]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
[graphql-ws]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md

View file

@ -2,7 +2,7 @@
name = "juniper_actix"
version = "0.5.0-dev"
edition = "2021"
rust-version = "1.65"
rust-version = "1.68"
description = "`juniper` GraphQL integration with `actix-web`."
license = "BSD-2-Clause"
authors = ["Jordao Rosario <jordao.rosario01@gmail.com>"]
@ -22,6 +22,7 @@ rustdoc-args = ["--cfg", "docsrs"]
subscriptions = [
"dep:actix",
"dep:actix-web-actors",
"dep:juniper_graphql_transport_ws",
"dep:juniper_graphql_ws",
"dep:tokio",
]
@ -29,11 +30,12 @@ subscriptions = [
[dependencies]
actix = { version = ">=0.12,<=0.13", optional = true }
actix-http = "3.2"
actix-web = "4.2.1"
actix-web = "4.4"
actix-web-actors = { version = "4.1", optional = true }
anyhow = "1.0.47"
futures = "0.3.22"
juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false }
juniper_graphql_transport_ws = { version = "0.4.0-dev", path = "../juniper_graphql_transport_ws", optional = true }
juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", optional = true }
http = "0.2.4"
serde = { version = "1.0.122", features = ["derive"] }
@ -41,10 +43,6 @@ serde_json = "1.0.18"
thiserror = "1.0"
tokio = { version = "1.0", features = ["sync"], optional = true }
# Fixes for MSRV check.
# TODO: Try remove on upgrade to 4.3 version of `actix-web` crate.
derive_more = { version = "0.99.8", default-features = false }
[dev-dependencies]
actix-cors = "0.6"
actix-identity = "0.6"
@ -55,4 +53,9 @@ bytes = "1.0"
env_logger = "0.10"
juniper = { version = "0.16.0-dev", path = "../juniper", features = ["expose-test-schema"] }
log = "0.4"
rand = "0.8"
tokio = "1.0"
[[example]]
name = "subscription"
required-features = ["subscriptions"]

View file

@ -4,7 +4,7 @@
[![Crates.io](https://img.shields.io/crates/v/juniper_actix.svg?maxAge=2592000)](https://crates.io/crates/juniper_actix)
[![Documentation](https://docs.rs/juniper_actix/badge.svg)](https://docs.rs/juniper_actix)
[![CI](https://github.com/graphql-rust/juniper/workflows/CI/badge.svg?branch=master "CI")](https://github.com/graphql-rust/juniper/actions?query=workflow%3ACI+branch%3Amaster)
[![Rust 1.65+](https://img.shields.io/badge/rustc-1.65+-lightgray.svg "Rust 1.65+")](https://blog.rust-lang.org/2022/11/03/Rust-1.65.0.html)
[![Rust 1.68+](https://img.shields.io/badge/rustc-1.68+-lightgray.svg "Rust 1.68+")](https://blog.rust-lang.org/2023/03/09/Rust-1.68.0.html)
- [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_actix/CHANGELOG.md)

View file

@ -1,141 +0,0 @@
#![deny(warnings)]
use std::{collections::HashMap, env};
use actix_cors::Cors;
use actix_web::{
http::header,
middleware,
web::{self, Data},
App, Error, HttpResponse, HttpServer,
};
use juniper::{graphql_object, EmptyMutation, EmptySubscription, GraphQLObject, RootNode};
use juniper_actix::{graphiql_handler, graphql_handler, playground_handler};
#[derive(Clone, GraphQLObject)]
///a user
pub struct User {
///the id
id: i32,
///the name
name: String,
}
#[derive(Clone, Default)]
pub struct Database {
///this could be a database connection
users: HashMap<i32, User>,
}
impl Database {
pub fn new() -> Database {
let mut users = HashMap::new();
users.insert(
1,
User {
id: 1,
name: "Aron".into(),
},
);
users.insert(
2,
User {
id: 2,
name: "Bea".into(),
},
);
users.insert(
3,
User {
id: 3,
name: "Carl".into(),
},
);
users.insert(
4,
User {
id: 4,
name: "Dora".into(),
},
);
Database { users }
}
pub fn get_user(&self, id: &i32) -> Option<&User> {
self.users.get(id)
}
}
// To make our Database usable by Juniper, we have to implement a marker trait.
impl juniper::Context for Database {}
// Queries represent the callable funcitons
struct Query;
#[graphql_object(context = Database)]
impl Query {
fn api_version() -> &'static str {
"1.0"
}
fn user(
context: &Database,
#[graphql(description = "id of the user")] id: i32,
) -> Option<&User> {
context.get_user(&id)
}
}
type Schema = RootNode<'static, Query, EmptyMutation<Database>, EmptySubscription<Database>>;
fn schema() -> Schema {
Schema::new(
Query,
EmptyMutation::<Database>::new(),
EmptySubscription::<Database>::new(),
)
}
async fn graphiql_route() -> Result<HttpResponse, Error> {
graphiql_handler("/graphql", None).await
}
async fn playground_route() -> Result<HttpResponse, Error> {
playground_handler("/graphql", None).await
}
async fn graphql_route(
req: actix_web::HttpRequest,
payload: actix_web::web::Payload,
schema: web::Data<Schema>,
) -> Result<HttpResponse, Error> {
let context = Database::new();
graphql_handler(&schema, &context, req, payload).await
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env::set_var("RUST_LOG", "info");
env_logger::init();
let server = HttpServer::new(move || {
App::new()
.app_data(Data::new(schema()))
.wrap(
Cors::default()
.allow_any_origin()
.allowed_methods(vec!["POST", "GET"])
.allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT])
.allowed_header(header::CONTENT_TYPE)
.supports_credentials()
.max_age(3600),
)
.wrap(middleware::Compress::default())
.wrap(middleware::Logger::default())
.service(
web::resource("/graphql")
.route(web::post().to(graphql_route))
.route(web::get().to(graphql_route)),
)
.service(web::resource("/playground").route(web::get().to(playground_route)))
.service(web::resource("/graphiql").route(web::get().to(graphiql_route)))
});
server.bind("127.0.0.1:8080").unwrap().run().await
}
// now go to http://127.0.0.1:8080/playground or graphiql and execute
//{ apiVersion, user(id: 2){id, name}}

View file

@ -1,3 +1,5 @@
//! This example demonstrates asynchronous subscriptions with [`actix_web`].
use std::{env, pin::Pin, time::Duration};
use actix_cors::Cors;
@ -5,7 +7,7 @@ use actix_web::{
http::header,
middleware,
web::{self, Data},
App, Error, HttpRequest, HttpResponse, HttpServer,
App, Error, HttpRequest, HttpResponse, HttpServer, Responder,
};
use juniper::{
@ -13,7 +15,7 @@ use juniper::{
tests::fixtures::starwars::schema::{Database, Query},
EmptyMutation, FieldError, GraphQLObject, RootNode,
};
use juniper_actix::{graphql_handler, playground_handler, subscriptions::subscriptions_handler};
use juniper_actix::{graphiql_handler, graphql_handler, playground_handler, subscriptions};
use juniper_graphql_ws::ConnectionConfig;
type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;
@ -26,15 +28,45 @@ async fn playground() -> Result<HttpResponse, Error> {
playground_handler("/graphql", Some("/subscriptions")).await
}
async fn graphiql() -> Result<HttpResponse, Error> {
graphiql_handler("/graphql", Some("/subscriptions")).await
}
async fn graphql(
req: actix_web::HttpRequest,
payload: actix_web::web::Payload,
schema: web::Data<Schema>,
req: HttpRequest,
payload: web::Payload,
schema: Data<Schema>,
) -> Result<HttpResponse, Error> {
let context = Database::new();
graphql_handler(&schema, &context, req, payload).await
}
async fn homepage() -> impl Responder {
HttpResponse::Ok()
.insert_header(("content-type", "text/html"))
.message_body(
"<html><h1>juniper_actix/subscription example</h1>\
<div>visit <a href=\"/graphiql\">GraphiQL</a></div>\
<div>visit <a href=\"/playground\">GraphQL Playground</a></div>\
</html>",
)
}
async fn subscriptions(
req: HttpRequest,
stream: web::Payload,
schema: web::Data<Schema>,
) -> Result<HttpResponse, Error> {
let context = Database::new();
let schema = schema.into_inner();
let config = ConnectionConfig::new(context);
// set the keep alive interval to 15 secs so that it doesn't timeout in playground
// playground has a hard-coded timeout set to 20 secs
let config = config.with_keep_alive_interval(Duration::from_secs(15));
subscriptions::ws_handler(req, stream, schema, config).await
}
struct Subscription;
#[derive(GraphQLObject)]
@ -49,7 +81,8 @@ type RandomHumanStream =
#[graphql_subscription(context = Database)]
impl Subscription {
#[graphql(
description = "A random humanoid creature in the Star Wars universe every 3 seconds. Second result will be an error."
description = "A random humanoid creature in the Star Wars universe every 3 seconds. \
Second result will be an error."
)]
async fn random_human(context: &Database) -> RandomHumanStream {
let mut counter = 0;
@ -84,21 +117,6 @@ impl Subscription {
}
}
async fn subscriptions(
req: HttpRequest,
stream: web::Payload,
schema: web::Data<Schema>,
) -> Result<HttpResponse, Error> {
let context = Database::new();
let schema = schema.into_inner();
let config = ConnectionConfig::new(context);
// set the keep alive interval to 15 secs so that it doesn't timeout in playground
// playground has a hard-coded timeout set to 20 secs
let config = config.with_keep_alive_interval(Duration::from_secs(15));
subscriptions_handler(req, stream, schema, config).await
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env::set_var("RUST_LOG", "info");
@ -125,11 +143,8 @@ async fn main() -> std::io::Result<()> {
.route(web::get().to(graphql)),
)
.service(web::resource("/playground").route(web::get().to(playground)))
.default_service(web::to(|| async {
HttpResponse::Found()
.append_header((header::LOCATION, "/playground"))
.finish()
}))
.service(web::resource("/graphiql").route(web::get().to(graphiql)))
.default_service(web::to(homepage))
})
.bind("127.0.0.1:8080")?
.run()

View file

@ -165,44 +165,47 @@ pub async fn playground_handler(
.body(html))
}
/// `juniper_actix` subscriptions handler implementation.
/// Cannot be merged to `juniper_actix` yet as GraphQL over WS[1]
/// is not fully supported in current implementation.
///
/// *Note: this implementation is in an alpha state.*
///
/// [1]: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
#[cfg(feature = "subscriptions")]
/// `juniper_actix` subscriptions handler implementation.
pub mod subscriptions {
use std::{fmt, sync::Arc};
use actix::{prelude::*, Actor, StreamHandler};
use actix::{
AsyncContext as _, ContextFutureSpawner as _, Handler, StreamHandler, WrapFuture as _,
};
use actix_web::{
http::header::{HeaderName, HeaderValue},
web, HttpRequest, HttpResponse,
};
use actix_web_actors::ws;
use juniper::{
futures::{
stream::{SplitSink, SplitStream, StreamExt},
SinkExt,
},
GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue,
use futures::{
stream::{SplitSink, SplitStream},
SinkExt as _, Stream, StreamExt as _,
};
use juniper_graphql_ws::{ArcSchema, ClientMessage, Connection, Init, ServerMessage};
use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue};
use juniper_graphql_transport_ws::{ArcSchema, Init};
use tokio::sync::Mutex;
/// Serves the graphql-ws protocol over a WebSocket connection.
/// Serves by auto-selecting between the
/// [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] and the
/// [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], based on the
/// `Sec-Websocket-Protocol` HTTP header value.
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a `juniper_graphql_ws::ConnectionConfig` if the context and
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the ConnectionInit message. Using a closure allows you to perform
/// authentication based on the parameters provided by the client.
pub async fn subscriptions_handler<Query, Mutation, Subscription, CtxT, S, I>(
/// The `schema` argument is your [`juniper`] schema.
///
/// The `init` argument is used to provide the custom [`juniper::Context`] and additional
/// configuration for connections. This can be a
/// [`juniper_graphql_transport_ws::ConnectionConfig`] if the context and configuration are
/// already known, or it can be a closure that gets executed asynchronously whenever a client
/// sends the subscription initialization message. Using a closure allows to perform an
/// authentication based on the parameters provided by a client.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
/// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
pub async fn ws_handler<Query, Mutation, Subscription, CtxT, S, I>(
req: HttpRequest,
stream: web::Payload,
root_node: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
schema: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
init: I,
) -> Result<HttpResponse, actix_web::Error>
where
@ -216,12 +219,56 @@ pub mod subscriptions {
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
{
let (s_tx, s_rx) = Connection::new(ArcSchema(root_node), init).split::<Message>();
if req
.headers()
.get("sec-websocket-protocol")
.map(AsRef::as_ref)
== Some("graphql-ws".as_bytes())
{
graphql_ws_handler(req, stream, schema, init).await
} else {
graphql_transport_ws_handler(req, stream, schema, init).await
}
}
/// Serves the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old].
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the context and
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the `GQL_CONNECTION_INIT` message. Using a closure allows to perform
/// an authentication based on the parameters provided by a client.
///
/// > __WARNING__: This protocol has been deprecated in favor of the
/// [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], which is
/// provided by the [`graphql_transport_ws_handler()`] function.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
/// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
pub async fn graphql_ws_handler<Query, Mutation, Subscription, CtxT, S, I>(
req: HttpRequest,
stream: web::Payload,
schema: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
init: I,
) -> Result<HttpResponse, actix_web::Error>
where
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Query::TypeInfo: Send + Sync,
Mutation: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Mutation::TypeInfo: Send + Sync,
Subscription: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
{
let (s_tx, s_rx) =
juniper_graphql_ws::Connection::new(ArcSchema(schema), init).split::<Message>();
let mut resp = ws::start(
SubscriptionActor {
graphql_tx: Arc::new(Mutex::new(s_tx)),
graphql_rx: Arc::new(Mutex::new(s_rx)),
Actor {
tx: Arc::new(Mutex::new(s_tx)),
rx: Arc::new(Mutex::new(s_rx)),
},
&req,
stream,
@ -235,18 +282,21 @@ pub mod subscriptions {
Ok(resp)
}
type ConnectionSplitSink<Query, Mutation, Subscription, CtxT, S, I> = Arc<
Mutex<SplitSink<Connection<ArcSchema<Query, Mutation, Subscription, CtxT, S>, I>, Message>>,
>;
type ConnectionSplitStream<Query, Mutation, Subscription, CtxT, S, I> =
Arc<Mutex<SplitStream<Connection<ArcSchema<Query, Mutation, Subscription, CtxT, S>, I>>>>;
/// Subscription Actor
/// coordinates messages between actix_web and juniper_graphql_ws
/// ws message -> actor -> juniper
/// juniper -> actor -> ws response
struct SubscriptionActor<Query, Mutation, Subscription, CtxT, S, I>
/// Serves the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new].
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a [`juniper_graphql_transport_ws::ConnectionConfig`] if the context
/// and configuration are already known, or it can be a closure that gets executed
/// asynchronously when the client sends the `ConnectionInit` message. Using a closure allows to
/// perform an authentication based on the parameters provided by a client.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
pub async fn graphql_transport_ws_handler<Query, Mutation, Subscription, CtxT, S, I>(
req: HttpRequest,
stream: web::Payload,
schema: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
init: I,
) -> Result<HttpResponse, actix_web::Error>
where
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Query::TypeInfo: Send + Sync,
@ -258,37 +308,56 @@ pub mod subscriptions {
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
{
graphql_tx: ConnectionSplitSink<Query, Mutation, Subscription, CtxT, S, I>,
graphql_rx: ConnectionSplitStream<Query, Mutation, Subscription, CtxT, S, I>,
let (s_tx, s_rx) = juniper_graphql_transport_ws::Connection::new(ArcSchema(schema), init)
.split::<Message>();
let mut resp = ws::start(
Actor {
tx: Arc::new(Mutex::new(s_tx)),
rx: Arc::new(Mutex::new(s_rx)),
},
&req,
stream,
)?;
resp.headers_mut().insert(
HeaderName::from_static("sec-websocket-protocol"),
HeaderValue::from_static("graphql-transport-ws"),
);
Ok(resp)
}
/// ws message -> actor -> juniper
impl<Query, Mutation, Subscription, CtxT, S, I>
StreamHandler<Result<ws::Message, ws::ProtocolError>>
for SubscriptionActor<Query, Mutation, Subscription, CtxT, S, I>
type ConnectionSplitSink<Conn> = Arc<Mutex<SplitSink<Conn, Message>>>;
type ConnectionSplitStream<Conn> = Arc<Mutex<SplitStream<Conn>>>;
/// [`actix::Actor`], coordinating messages between [`actix_web`] and [`juniper_graphql_ws`]:
/// - incoming [`ws::Message`] -> [`Actor`] -> [`juniper`]
/// - [`juniper`] -> [`Actor`] -> response [`ws::Message`]
struct Actor<Conn> {
tx: ConnectionSplitSink<Conn>,
rx: ConnectionSplitStream<Conn>,
}
impl<Conn> StreamHandler<Result<ws::Message, ws::ProtocolError>> for Actor<Conn>
where
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Query::TypeInfo: Send + Sync,
Mutation: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Mutation::TypeInfo: Send + Sync,
Subscription: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
Self: actix::Actor<Context = ws::WebsocketContext<Self>>,
Conn: futures::Sink<Message>,
<Conn as futures::Sink<Message>>::Error: fmt::Debug,
{
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = msg.map(Message);
#[allow(clippy::single_match)]
match msg {
Ok(msg) => {
let tx = self.graphql_tx.clone();
let tx = self.tx.clone();
// TODO: Somehow this implementation always closes as `1006: Abnormal closure`
// due to excessive polling of `tx` part.
// Needs to be reworked.
async move {
tx.lock()
.await
.send(msg)
.send(Message(msg))
.await
.expect("Infallible: this should not happen");
}
@ -303,31 +372,23 @@ pub mod subscriptions {
}
}
/// juniper -> actor
impl<Query, Mutation, Subscription, CtxT, S, I> Actor
for SubscriptionActor<Query, Mutation, Subscription, CtxT, S, I>
/// [`juniper`] -> [`Actor`].
impl<Conn> actix::Actor for Actor<Conn>
where
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Query::TypeInfo: Send + Sync,
Mutation: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Mutation::TypeInfo: Send + Sync,
Subscription: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
Conn: Stream + 'static,
<Conn as Stream>::Item: IntoWsResponse + Send,
{
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let stream = self.graphql_rx.clone();
let stream = self.rx.clone();
let addr = ctx.address();
let fut = async move {
let mut stream = stream.lock().await;
while let Some(message) = stream.next().await {
// sending the message to self so that it can be forwarded back to the client
addr.do_send(ServerMessageWrapper { message });
while let Some(msg) = stream.next().await {
// Sending the `msg` to `self`, so that it can be forwarded back to the client.
addr.do_send(ServerMessage(msg));
}
}
.into_actor(self);
@ -341,55 +402,75 @@ pub mod subscriptions {
}
}
/// actor -> websocket response
impl<Query, Mutation, Subscription, CtxT, S, I> actix::prelude::Handler<ServerMessageWrapper<S>>
for SubscriptionActor<Query, Mutation, Subscription, CtxT, S, I>
/// [`Actor`] -> response [`ws::Message`].
impl<Conn, M> Handler<ServerMessage<M>> for Actor<Conn>
where
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Query::TypeInfo: Send + Sync,
Mutation: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Mutation::TypeInfo: Send + Sync,
Subscription: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
Conn: Stream<Item = M> + 'static,
M: IntoWsResponse + Send,
{
type Result = ();
fn handle(
&mut self,
msg: ServerMessageWrapper<S>,
ctx: &mut Self::Context,
) -> Self::Result {
let msg = serde_json::to_string(&msg.message);
match msg {
fn handle(&mut self, msg: ServerMessage<M>, ctx: &mut Self::Context) -> Self::Result {
match msg.0.into_ws_response() {
Ok(msg) => ctx.text(msg),
Err(e) => {
let reason = ws::CloseReason {
code: ws::CloseCode::Error,
description: Some(format!("error serializing response: {e}")),
};
// TODO: trace
ctx.close(Some(reason))
}
};
// TODO: trace
Err(reason) => ctx.close(Some(reason)),
}
}
}
#[derive(Message)]
#[derive(actix::Message)]
#[rtype(result = "()")]
struct ServerMessageWrapper<S>
where
S: ScalarValue + Send + Sync + 'static,
{
message: ServerMessage<S>,
struct ServerMessage<T>(T);
/// Conversion of a [`ServerMessage`] into a response [`ws::Message`].
pub trait IntoWsResponse {
/// Converts this [`ServerMessage`] into response [`ws::Message`].
fn into_ws_response(self) -> Result<String, ws::CloseReason>;
}
impl<S: ScalarValue> IntoWsResponse for juniper_graphql_transport_ws::Output<S> {
fn into_ws_response(self) -> Result<String, ws::CloseReason> {
match self {
Self::Message(msg) => serde_json::to_string(&msg).map_err(|e| ws::CloseReason {
code: ws::CloseCode::Error,
description: Some(format!("error serializing response: {e}")),
}),
Self::Close { code, message } => Err(ws::CloseReason {
code: code.into(),
description: Some(message),
}),
}
}
}
impl<S: ScalarValue> IntoWsResponse for juniper_graphql_ws::ServerMessage<S> {
fn into_ws_response(self) -> Result<String, ws::CloseReason> {
serde_json::to_string(&self).map_err(|e| ws::CloseReason {
code: ws::CloseCode::Error,
description: Some(format!("error serializing response: {e}")),
})
}
}
#[derive(Debug)]
struct Message(ws::Message);
impl<S: ScalarValue> TryFrom<Message> for ClientMessage<S> {
impl<S: ScalarValue> TryFrom<Message> for juniper_graphql_transport_ws::Input<S> {
type Error = Error;
fn try_from(msg: Message) -> Result<Self, Self::Error> {
match msg.0 {
ws::Message::Text(text) => serde_json::from_slice(text.as_bytes())
.map(Self::Message)
.map_err(Error::Serde),
ws::Message::Close(_) => Ok(Self::Close),
_ => Err(Error::UnexpectedClientMessage),
}
}
}
impl<S: ScalarValue> TryFrom<Message> for juniper_graphql_ws::ClientMessage<S> {
type Error = Error;
fn try_from(msg: Message) -> Result<Self, Self::Error> {
@ -397,7 +478,7 @@ pub mod subscriptions {
ws::Message::Text(text) => {
serde_json::from_slice(text.as_bytes()).map_err(Error::Serde)
}
ws::Message::Close(_) => Ok(ClientMessage::ConnectionTerminate),
ws::Message::Close(_) => Ok(Self::ConnectionTerminate),
_ => Err(Error::UnexpectedClientMessage),
}
}
@ -770,7 +851,7 @@ mod subscription_tests {
use juniper_graphql_ws::ConnectionConfig;
use tokio::time::timeout;
use super::subscriptions::subscriptions_handler;
use super::subscriptions::graphql_ws_handler;
#[derive(Default)]
struct TestActixWsIntegration;
@ -852,7 +933,7 @@ mod subscription_tests {
let schema = schema.into_inner();
let config = ConnectionConfig::new(context);
subscriptions_handler(req, stream, schema, config).await
graphql_ws_handler(req, stream, schema, config).await
}
#[actix_web::rt::test]

View file

@ -1,5 +1,5 @@
`juniper_graphql_transport_ws` changelog
==============================
========================================
All user visible changes to `juniper_graphql_transport_ws` crate will be documented in this file. This project uses [Semantic Versioning 2.0.0].
@ -8,9 +8,20 @@ All user visible changes to `juniper_graphql_transport_ws` crate will be documen
## master
### Implemented
- [`graphql-transport-ws` GraphQL over WebSocket Protocol][proto-5.14.0] as of 5.14.0 version of [`graphql-ws` npm package]. ([#1158], [#1191], [#1022])
- On top of 0.16 version of [`juniper` crate] and 0.17 version of [`juniper_subscriptions` crate].
[#1022]: /../../issues/1022
[#1158]: /../../pull/1158
[#1191]: /../../pull/1191
[proto-5.14.0]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws
[`juniper` crate]: https://docs.rs/juniper
[`juniper_subscriptions` crate]: https://docs.rs/juniper_subscriptions
[Semantic Versioning 2.0.0]: https://semver.org

View file

@ -3,7 +3,7 @@ name = "juniper_graphql_transport_ws"
version = "0.4.0-dev"
edition = "2021"
rust-version = "1.65"
description = "GraphQL over WebSocket Protocol implementation for `juniper` crate."
description = "`graphql-transport-ws` GraphQL over WebSocket Protocol implementation for `juniper` crate."
license = "BSD-2-Clause"
authors = ["Christopher Brown <ccbrown112@gmail.com>"]
documentation = "https://docs.rs/juniper_graphql_transport_ws"
@ -11,11 +11,12 @@ homepage = "https://github.com/graphql-rust/juniper/tree/master/juniper_graphql_
repository = "https://github.com/graphql-rust/juniper"
readme = "README.md"
categories = ["asynchronous", "web-programming", "web-programming::http-server"]
keywords = ["apollo", "graphql", "graphql-ws", "subscription", "websocket"]
keywords = ["apollo", "graphql", "graphql-transport-ws", "subscription", "websocket"]
exclude = ["/release.toml"]
[dependencies]
juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false }
juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws" }
juniper_subscriptions = { version = "0.17.0-dev", path = "../juniper_subscriptions" }
serde = { version = "1.0.122", features = ["derive"], default-features = false }
tokio = { version = "1.0", features = ["macros", "rt", "time"], default-features = false }

View file

@ -1,6 +1,6 @@
BSD 2-Clause License
Copyright (c) 2018-2022, Christopher Brown
Copyright (c) 2018-2023, Christopher Brown
All rights reserved.
Redistribution and use in source and binary forms, with or without

View file

@ -1,5 +1,5 @@
`juniper_graphql_transport_ws` crate
==========================
====================================
[![Crates.io](https://img.shields.io/crates/v/juniper_graphql_transport_ws.svg?maxAge=2592000)](https://crates.io/crates/juniper_graphql_transport_ws)
[![Documentation](https://docs.rs/juniper_graphql_transport_ws/badge.svg)](https://docs.rs/juniper_graphql_transport_ws)
@ -8,7 +8,9 @@
- [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_transport_ws/CHANGELOG.md)
This crate contains an implementation of the [graphql-transport-ws WebSocket subprotocol], as used by [Apollo].
This crate contains an implementation of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], as now used by [Apollo] and [`graphql-ws` npm package].
Implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] may be found in the [`juniper_graphql_ws` crate].
@ -20,5 +22,8 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql
[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws
[`juniper_graphql_ws` crate]: https://docs.rs/juniper_graphql_ws
[Apollo]: https://www.apollographql.com
[graphql-transport-ws WebSocket subprotocol]: https://github.com/enisdenjo/graphql-ws/blob/fbb763a662802a6a2584b0cbeb9cf1bde38158e0/PROTOCOL.md
[new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
[old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md

View file

@ -25,58 +25,31 @@ use juniper::{
task::{Context, Poll, Waker},
Sink, Stream,
},
GraphQLError, RuleError, ScalarValue, Variables,
GraphQLError, RuleError, ScalarValue,
};
#[doc(inline)]
pub use juniper_graphql_ws::{ConnectionConfig, Init};
struct ExecutionParams<S: Schema> {
subscribe_payload: SubscribePayload<S::ScalarValue>,
config: Arc<ConnectionConfig<S::Context>>,
schema: S,
}
/// ConnectionConfig is used to configure the connection once the client sends the ConnectionInit
/// message.
pub struct ConnectionConfig<CtxT> {
context: CtxT,
max_in_flight_operations: usize,
keep_alive_interval: Duration,
/// Possible inputs received from a client.
#[derive(Debug)]
pub enum Input<S> {
/// Deserialized [`ClientMessage`].
Message(ClientMessage<S>),
/// Client initiated normal closing of a [`Connection`].
Close,
}
impl<CtxT> ConnectionConfig<CtxT> {
/// Constructs the configuration required for a connection to be accepted.
pub fn new(context: CtxT) -> Self {
Self {
context,
max_in_flight_operations: 0,
keep_alive_interval: Duration::from_secs(15),
}
}
/// Specifies the maximum number of in-flight operations that a connection can have. If this
/// number is exceeded, attempting to start more will result in an error. By default, there is
/// no limit to in-flight operations.
#[must_use]
pub fn with_max_in_flight_operations(mut self, max: usize) -> Self {
self.max_in_flight_operations = max;
self
}
/// Specifies the interval at which to send unsolicited pong messages as keep-alives.
/// Specifying a zero duration will disable keep-alives. By default, keep-alives are sent every
/// 15 seconds.
#[must_use]
pub fn with_keep_alive_interval(mut self, interval: Duration) -> Self {
self.keep_alive_interval = interval;
self
}
}
impl<S: ScalarValue, CtxT: Unpin + Send + 'static> Init<S, CtxT> for ConnectionConfig<CtxT> {
type Error = Infallible;
type Future = future::Ready<Result<Self, Self::Error>>;
fn init(self, _params: Variables<S>) -> Self::Future {
future::ready(Ok(self))
impl<S> From<ClientMessage<S>> for Input<S> {
fn from(val: ClientMessage<S>) -> Self {
Self::Message(val)
}
}
@ -103,36 +76,6 @@ impl<S: ScalarValue + Send> Output<S> {
}
}
/// Init defines the requirements for types that can provide connection configurations when
/// ConnectionInit messages are received. Implementations are provided for `ConnectionConfig` and
/// closures that meet the requirements.
pub trait Init<S: ScalarValue, CtxT>: Unpin + 'static {
/// The error that is returned on failure. The formatted error will be used as the contents of
/// the "message" field sent back to the client.
type Error: Error;
/// The future configuration type.
type Future: Future<Output = Result<ConnectionConfig<CtxT>, Self::Error>> + Send + 'static;
/// Returns a future for the configuration to use.
fn init(self, params: Variables<S>) -> Self::Future;
}
impl<F, S, CtxT, Fut, E> Init<S, CtxT> for F
where
S: ScalarValue,
F: FnOnce(Variables<S>) -> Fut + Unpin + 'static,
Fut: Future<Output = Result<ConnectionConfig<CtxT>, E>> + Send + 'static,
E: Error,
{
type Error = E;
type Future = Fut;
fn init(self, params: Variables<S>) -> Fut {
self(params)
}
}
enum ConnectionState<S: Schema, I: Init<S::ScalarValue, S::Context>> {
/// PreInit is the state before a ConnectionInit message has been accepted.
PreInit { init: I, schema: S },
@ -496,8 +439,8 @@ enum ConnectionSinkState<S: Schema, I: Init<S::ScalarValue, S::Context>> {
Closed,
}
/// Implements the graphql-ws protocol. This is a sink for `TryInto<ClientMessage>` and a stream of
/// `ServerMessage`.
/// Implements the `graphql-ws` protocol.
/// This is a sink for `TryInto<Input>` messages and a stream of `Output` messages.
pub struct Connection<S: Schema, I: Init<S::ScalarValue, S::Context>> {
reactions: SelectAll<BoxStream<'static, Output<S::ScalarValue>>>,
stream_waker: Option<Waker>,
@ -510,7 +453,8 @@ where
S: Schema,
I: Init<S::ScalarValue, S::Context>,
{
/// Creates a new connection, which is a sink for `TryInto<ClientMessage>` and a stream of `ServerMessage`.
/// Creates a new connection, which is a sink for `TryInto<Input>` messages and a stream of
/// `Output` messages.
///
/// The `schema` argument should typically be an `Arc<RootNode<...>>`.
///
@ -533,7 +477,7 @@ where
impl<S, I, T> Sink<T> for Connection<S, I>
where
T: TryInto<ClientMessage<S::ScalarValue>>,
T: TryInto<Input<S::ScalarValue>>,
T::Error: Error,
S: Schema,
I: Init<S::ScalarValue, S::Context> + Send,
@ -563,9 +507,19 @@ where
*state = match std::mem::replace(state, ConnectionSinkState::Closed) {
ConnectionSinkState::Ready { state } => {
match item.try_into() {
Ok(msg) => ConnectionSinkState::HandlingMessage {
Ok(Input::Message(msg)) => ConnectionSinkState::HandlingMessage {
result: state.handle_message(msg).boxed(),
},
Ok(Input::Close) => {
s.reactions.push(
Output::Close {
code: 1000,
message: "Normal Closure".into(),
}
.into_stream(),
);
ConnectionSinkState::Closed
}
Err(e) => {
// If we weren't able to parse the message, we must close the connection.
s.reactions.push(
@ -588,7 +542,7 @@ where
<Self as Sink<T>>::poll_ready(self, cx)
}
fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
fn poll_close(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
self.sink_state = ConnectionSinkState::Closed;
if let Some(waker) = self.stream_waker.take() {
// Wake up the stream so it can close too.
@ -643,7 +597,7 @@ mod test {
futures::sink::SinkExt,
graphql_input_value, graphql_object, graphql_subscription, graphql_value, graphql_vars,
parser::{ParseError, Spanning},
DefaultScalarValue, EmptyMutation, FieldError, FieldResult, RootNode,
DefaultScalarValue, EmptyMutation, FieldError, FieldResult, RootNode, Variables,
};
use super::*;

View file

@ -1,131 +1,2 @@
use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue};
use std::sync::Arc;
/// Schema defines the requirements for schemas that can be used for operations. Typically this is
/// just an `Arc<RootNode<...>>` and you should not have to implement it yourself.
pub trait Schema: Unpin + Clone + Send + Sync + 'static {
/// The context type.
type Context: Unpin + Send + Sync;
/// The scalar value type.
type ScalarValue: ScalarValue + Send + Sync;
/// The query type info.
type QueryTypeInfo: Send + Sync;
/// The query type.
type Query: GraphQLTypeAsync<Self::ScalarValue, Context = Self::Context, TypeInfo = Self::QueryTypeInfo>
+ Send;
/// The mutation type info.
type MutationTypeInfo: Send + Sync;
/// The mutation type.
type Mutation: GraphQLTypeAsync<
Self::ScalarValue,
Context = Self::Context,
TypeInfo = Self::MutationTypeInfo,
> + Send;
/// The subscription type info.
type SubscriptionTypeInfo: Send + Sync;
/// The subscription type.
type Subscription: GraphQLSubscriptionType<
Self::ScalarValue,
Context = Self::Context,
TypeInfo = Self::SubscriptionTypeInfo,
> + Send;
/// Returns the root node for the schema.
fn root_node(
&self,
) -> &RootNode<'static, Self::Query, Self::Mutation, Self::Subscription, Self::ScalarValue>;
}
/// This exists as a work-around for this issue: https://github.com/rust-lang/rust/issues/64552
///
/// It can be used in generators where using Arc directly would result in an error.
// TODO: Remove this once that issue is resolved.
#[doc(hidden)]
pub struct ArcSchema<QueryT, MutationT, SubscriptionT, CtxT, S>(
pub Arc<RootNode<'static, QueryT, MutationT, SubscriptionT, S>>,
)
where
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
QueryT::TypeInfo: Send + Sync,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
MutationT::TypeInfo: Send + Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
SubscriptionT::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync,
S: ScalarValue + Send + Sync + 'static;
impl<QueryT, MutationT, SubscriptionT, CtxT, S> Clone
for ArcSchema<QueryT, MutationT, SubscriptionT, CtxT, S>
where
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
QueryT::TypeInfo: Send + Sync,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
MutationT::TypeInfo: Send + Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
SubscriptionT::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync,
S: ScalarValue + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<QueryT, MutationT, SubscriptionT, CtxT, S> Schema
for ArcSchema<QueryT, MutationT, SubscriptionT, CtxT, S>
where
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
QueryT::TypeInfo: Send + Sync,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
MutationT::TypeInfo: Send + Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
SubscriptionT::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
{
type Context = CtxT;
type ScalarValue = S;
type QueryTypeInfo = QueryT::TypeInfo;
type Query = QueryT;
type MutationTypeInfo = MutationT::TypeInfo;
type Mutation = MutationT;
type SubscriptionTypeInfo = SubscriptionT::TypeInfo;
type Subscription = SubscriptionT;
fn root_node(&self) -> &RootNode<'static, QueryT, MutationT, SubscriptionT, S> {
&self.0
}
}
impl<QueryT, MutationT, SubscriptionT, CtxT, S> Schema
for Arc<RootNode<'static, QueryT, MutationT, SubscriptionT, S>>
where
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
QueryT::TypeInfo: Send + Sync,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
MutationT::TypeInfo: Send + Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
SubscriptionT::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync,
S: ScalarValue + Send + Sync + 'static,
{
type Context = CtxT;
type ScalarValue = S;
type QueryTypeInfo = QueryT::TypeInfo;
type Query = QueryT;
type MutationTypeInfo = MutationT::TypeInfo;
type Mutation = MutationT;
type SubscriptionTypeInfo = SubscriptionT::TypeInfo;
type Subscription = SubscriptionT;
fn root_node(&self) -> &RootNode<'static, QueryT, MutationT, SubscriptionT, S> {
self
}
}
#[doc(inline)]
pub use juniper_graphql_ws::{ArcSchema, Schema};

View file

@ -13,6 +13,12 @@ All user visible changes to `juniper_graphql_ws` crate will be documented in thi
- Switched to 0.16 version of [`juniper` crate].
- Switched to 0.17 version of [`juniper_subscriptions` crate].
### Changed
- Made fields of `ConnectionConfig` public to reuse in [`juniper_graphql_transport_ws` crate]. ([#1191])
[#1191]: /../../pull/1191
@ -24,5 +30,6 @@ See [old CHANGELOG](/../../blob/juniper_graphql_ws-v0.3.0/juniper_graphql_ws/CHA
[`juniper` crate]: https://docs.rs/juniper
[`juniper_graphql_transport_ws` crate]: https://docs.rs/juniper_graphql_transport_ws
[`juniper_subscriptions` crate]: https://docs.rs/juniper_subscriptions
[Semantic Versioning 2.0.0]: https://semver.org

View file

@ -3,7 +3,7 @@ name = "juniper_graphql_ws"
version = "0.4.0-dev"
edition = "2021"
rust-version = "1.65"
description = "GraphQL over WebSocket Protocol implementation for `juniper` crate."
description = "Legacy `graphql-ws` GraphQL over WebSocket Protocol implementation for `juniper` crate."
license = "BSD-2-Clause"
authors = ["Christopher Brown <ccbrown112@gmail.com>"]
documentation = "https://docs.rs/juniper_graphql_ws"

View file

@ -1,6 +1,6 @@
BSD 2-Clause License
Copyright (c) 2018-2022, Christopher Brown
Copyright (c) 2018-2023, Christopher Brown
All rights reserved.
Redistribution and use in source and binary forms, with or without

View file

@ -8,7 +8,7 @@
- [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_ws/CHANGELOG.md)
This crate contains an implementation of the [graphql-ws WebSocket subprotocol], as formerly used by [Apollo]. It has now been deprecated in favor of the protocol implemented by the [`juniper_graphql_transport_ws` crate].
This crate contains an implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old], as formerly used by [Apollo] and [`subscriptions-transport-ws` npm package]. It has now been deprecated in favor of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], implemented by the new [`juniper_graphql_transport_ws` crate] and new [`graphql-ws` npm package].
@ -20,6 +20,9 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql
[Apollo]: https://www.apollographql.com
[graphql-ws WebSocket subprotocol]: https://github.com/apollographql/subscriptions-transport-ws/blob/0ce7a1e1eb687fe51214483e4735f50a2f2d5c79/PROTOCOL.md
[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws
[`juniper_graphql_transport_ws` crate]: https://docs.rs/juniper_graphql_transport_ws
[`subscriptions-transport-ws` npm package]: https://npmjs.com/package/subscriptions-transport-ws
[Apollo]: https://www.apollographql.com
[new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
[old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md

View file

@ -36,10 +36,23 @@ struct ExecutionParams<S: Schema> {
/// ConnectionConfig is used to configure the connection once the client sends the ConnectionInit
/// message.
#[derive(Clone, Copy, Debug)]
pub struct ConnectionConfig<CtxT> {
context: CtxT,
max_in_flight_operations: usize,
keep_alive_interval: Duration,
/// Custom-provided [`juniper::Context`].
pub context: CtxT,
/// Maximum number of in-flight operations that a connection can have.
///
/// If this number is exceeded, attempting to start more will result in an error.
/// By default, there is no limit to in-flight operations.
pub max_in_flight_operations: usize,
/// Interval at which to send keep-alives.
///
/// Specifying a [`Duration::ZERO`] will disable keep-alives.
///
/// By default, keep-alives are sent every 15 seconds.
pub keep_alive_interval: Duration,
}
impl<CtxT> ConnectionConfig<CtxT> {
@ -52,17 +65,21 @@ impl<CtxT> ConnectionConfig<CtxT> {
}
}
/// Specifies the maximum number of in-flight operations that a connection can have. If this
/// number is exceeded, attempting to start more will result in an error. By default, there is
/// no limit to in-flight operations.
/// Specifies the maximum number of in-flight operations that a connection can have.
///
/// If this number is exceeded, attempting to start more will result in an error.
/// By default, there is no limit to in-flight operations.
#[must_use]
pub fn with_max_in_flight_operations(mut self, max: usize) -> Self {
self.max_in_flight_operations = max;
self
}
/// Specifies the interval at which to send keep-alives. Specifying a zero duration will
/// disable keep-alives. By default, keep-alives are sent every 15 seconds.
/// Specifies the interval at which to send keep-alives.
///
/// Specifying a [`Duration::ZERO`] will disable keep-alives.
///
/// By default, keep-alives are sent every 15 seconds.
#[must_use]
pub fn with_keep_alive_interval(mut self, interval: Duration) -> Self {
self.keep_alive_interval = interval;

View file

@ -20,4 +20,4 @@ juniper = { version = "0.16.0-dev", path = "../juniper", default-features = fals
[dev-dependencies]
serde_json = "1.0.18"
tokio = { version = "1.0", features = ["macros", "rt"] }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }

View file

@ -27,7 +27,7 @@ For `SubscriptionCoordinator` and `SubscriptionConnection` documentation, check
## Examples
Check [`examples/warp_subscriptions/`][1] for example code of a working [`warp`] server with [GraphQL] subscription handlers.
Check [`juniper_warp/examples/subscription.rs`][1] for example code of a working [`warp`] server with [GraphQL] subscription handlers.
@ -43,4 +43,4 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql
[`warp`]: https://docs.rs/warp
[GraphQL]: http://graphql.org
[1]: https://github.com/graphql-rust/juniper/blob/master/examples/warp_subscriptions/src/main.rs
[1]: https://github.com/graphql-rust/juniper/blob/master/juniper_warp/examples/subscription.rs

View file

@ -12,12 +12,19 @@ All user visible changes to `juniper_warp` crate will be documented in this file
- Switched to 0.16 version of [`juniper` crate].
### Added
- `subscriptions::serve_graphql_transport_ws()` function allowing to process the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][graphql-transport-ws]. ([#1158])
- `subscriptions::make_ws_filter()` function providing endpoint with auto-selection between the [legacy `graphql-ws` GraphQL over WebSocket Protocol][graphql-ws] and the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][graphql-transport-ws], based on the `Sec-Websocket-Protocol` HTTP header value. ([#1191])
### Changed
- Made `schema` argument of `make_graphql_filter()` and `make_graphql_filter_sync()` polymorphic, allowing to specify external `Arc`ed `schema`. ([#1136], [#1135])
[#1135]: /../../issues/1136
[#1136]: /../../pull/1136
[#1158]: /../../pull/1158
[#1191]: /../../pull/1191
@ -31,3 +38,5 @@ See [old CHANGELOG](/../../blob/juniper_warp-v0.7.0/juniper_warp/CHANGELOG.md).
[`juniper` crate]: https://docs.rs/juniper
[Semantic Versioning 2.0.0]: https://semver.org
[graphql-transport-ws]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
[graphql-ws]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md

View file

@ -12,14 +12,19 @@ repository = "https://github.com/graphql-rust/juniper"
readme = "README.md"
categories = ["asynchronous", "web-programming", "web-programming::http-server"]
keywords = ["apollo", "graphql", "juniper", "warp", "websocket"]
exclude = ["/examples/", "/release.toml"]
include = ["/examples/", "/src/", "/CHANGELOG.md", "/LICENSE", "/README.md"]
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[features]
subscriptions = ["dep:juniper_graphql_ws", "dep:juniper_graphql_transport_ws", "warp/websocket"]
subscriptions = [
"dep:juniper_graphql_transport_ws",
"dep:juniper_graphql_ws",
"dep:log",
"warp/websocket",
]
[dependencies]
anyhow = "1.0.47"
@ -27,6 +32,7 @@ futures = "0.3.22"
juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false }
juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", optional = true }
juniper_graphql_transport_ws = { version = "0.4.0-dev", path = "../juniper_graphql_transport_ws", optional = true }
log = { version = "0.4", optional = true }
serde = { version = "1.0.122", features = ["derive"] }
serde_json = "1.0.18"
thiserror = "1.0"
@ -38,9 +44,14 @@ warp = { version = "0.3.2", default-features = false }
headers = "0.3.8"
[dev-dependencies]
async-stream = "0.3"
env_logger = "0.10"
juniper = { version = "0.16.0-dev", path = "../juniper", features = ["expose-test-schema"] }
log = "0.4"
percent-encoding = "2.1"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
url = "2.0"
[[example]]
name = "subscription"
required-features = ["subscriptions"]

View file

@ -24,7 +24,7 @@ A basic usage example can also be found in the [API docs][`juniper_warp`].
## Examples
Check [`examples/warp_server.rs`][1] for example code of a working [`warp`] server with [GraphQL] handlers.
Check [`examples/subscription.rs`][1] for example code of a working [`warp`] server with [GraphQL] handlers and subscriptions.
@ -43,4 +43,4 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql
[Juniper Book]: https://graphql-rust.github.io
[Rust]: https://www.rust-lang.org
[1]: https://github.com/graphql-rust/juniper/blob/master/juniper_warp/examples/warp_server.rs
[1]: https://github.com/graphql-rust/juniper/blob/master/juniper_warp/examples/subscription.rs

View file

@ -1,18 +1,13 @@
//! This example demonstrates asynchronous subscriptions with warp and tokio 0.2
//! This example demonstrates asynchronous subscriptions with [`warp`].
use std::{env, pin::Pin, sync::Arc, time::Duration};
use futures::{FutureExt as _, Stream};
use futures::Stream;
use juniper::{
graphql_object, graphql_subscription, graphql_value, EmptyMutation, FieldError, GraphQLEnum,
RootNode,
};
use juniper_graphql_transport_ws::ConnectionConfig;
use juniper_graphql_ws::ConnectionConfig as LegacyConnectionConfig;
use juniper_warp::{
graphiql_filter, playground_filter,
subscriptions::{serve_graphql_transport_ws, serve_graphql_ws},
};
use warp::{http::Response, Filter};
#[derive(Clone)]
@ -145,73 +140,49 @@ fn schema() -> Schema {
#[tokio::main]
async fn main() {
env::set_var("RUST_LOG", "warp_subscriptions");
env::set_var("RUST_LOG", "subscription");
env_logger::init();
let log = warp::log("warp_subscriptions");
let log = warp::log("subscription");
let homepage = warp::path::end().map(|| {
Response::builder()
.header("content-type", "text/html")
.body("<html><h1>juniper_subscriptions demo</h1><div>visit <a href=\"/playground\">graphql playground</a></html>")
.body(
"<html><h1>juniper_warp/subscription example</h1>\
<div>visit <a href=\"/graphiql\">GraphiQL</a></div>\
<div>visit <a href=\"/playground\">GraphQL Playground</a></div>\
</html>",
)
});
let schema = Arc::new(schema());
let qm_schema = schema();
let qm_state = warp::any().map(|| Context);
let qm_graphql_filter = juniper_warp::make_graphql_filter(qm_schema, qm_state.boxed());
let ws_schema = Arc::new(schema());
let transport_ws_schema = ws_schema.clone();
let routes = (warp::post()
.and(warp::path("graphql"))
.and(juniper_warp::make_graphql_filter(
schema.clone(),
warp::any().map(|| Context).boxed(),
)))
.or(
warp::path("subscriptions").and(juniper_warp::subscriptions::make_ws_filter(
schema,
ConnectionConfig::new(Context),
)),
)
.or(warp::get()
.and(warp::path("playground"))
.and(juniper_warp::playground_filter(
"/graphql",
Some("/subscriptions"),
)))
.or(warp::get()
.and(warp::path("graphiql"))
.and(juniper_warp::graphiql_filter(
"/graphql",
Some("/subscriptions"),
)))
.or(homepage)
.with(log);
log::info!("Listening on 127.0.0.1:8080");
let routes = warp::path("subscriptions")
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let transport_ws_schema = transport_ws_schema.clone();
ws.on_upgrade(move |websocket| async move {
serve_graphql_transport_ws(
websocket,
transport_ws_schema,
ConnectionConfig::new(Context),
)
.map(|r| {
if let Err(e) = r {
println!("Websocket error: {e}");
}
})
.await
})
})
.or(warp::path("legacy-subscriptions")
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let ws_schema = ws_schema.clone();
ws.on_upgrade(move |websocket| async move {
serve_graphql_ws(websocket, ws_schema, LegacyConnectionConfig::new(Context))
.map(|r| {
if let Err(e) = r {
println!("Websocket error: {e}");
}
})
.await
})
})
.map(|reply| {
// TODO#584: remove this workaround
warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")
}))
.or(warp::post()
.and(warp::path("graphql"))
.and(qm_graphql_filter))
.or(warp::get()
.and(warp::path("playground"))
.and(playground_filter("/graphql", Some("/legacy-subscriptions"))))
.or(warp::get()
.and(warp::path("graphiql"))
.and(graphiql_filter("/graphql", Some("/subscriptions"))))
.or(homepage)
.with(log);
warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
}

View file

@ -1,51 +0,0 @@
#![deny(warnings)]
use std::env;
use juniper::{
tests::fixtures::starwars::schema::{Database, Query},
EmptyMutation, EmptySubscription, RootNode,
};
use warp::{http::Response, Filter};
type Schema = RootNode<'static, Query, EmptyMutation<Database>, EmptySubscription<Database>>;
fn schema() -> Schema {
Schema::new(
Query,
EmptyMutation::<Database>::new(),
EmptySubscription::<Database>::new(),
)
}
#[tokio::main]
async fn main() {
env::set_var("RUST_LOG", "warp_server");
env_logger::init();
let log = warp::log("warp_server");
let homepage = warp::path::end().map(|| {
Response::builder()
.header("content-type", "text/html")
.body(format!(
"<html><h1>juniper_warp</h1><div>visit <a href=\"/graphiql\">/graphiql</a></html>"
))
});
log::info!("Listening on 127.0.0.1:8080");
let state = warp::any().map(Database::new);
let graphql_filter = juniper_warp::make_graphql_filter(schema(), state.boxed());
warp::serve(
warp::get()
.and(warp::path("graphiql"))
.and(juniper_warp::graphiql_filter("/graphql", None))
.or(homepage)
.or(warp::path("graphql").and(graphql_filter))
.with(log),
)
.run(([127, 0, 0, 1], 8080))
.await
}

View file

@ -14,17 +14,17 @@ use juniper::{
use tokio::task;
use warp::{body, filters::BoxedFilter, http, hyper::body::Bytes, query, Filter};
/// Make a filter for graphql queries/mutations.
/// Makes a filter for GraphQL queries/mutations.
///
/// The `schema` argument is your juniper schema.
/// The `schema` argument is your [`juniper`] schema.
///
/// The `context_extractor` argument should be a filter that provides the GraphQL context required by the schema.
///
/// In order to avoid blocking, this helper will use the `tokio_threadpool` threadpool created by hyper to resolve GraphQL requests.
///
/// Example:
/// # Example
///
/// ```
/// ```rust
/// # use std::sync::Arc;
/// # use warp::Filter;
/// # use juniper::{graphql_object, EmptyMutation, EmptySubscription, RootNode};
@ -336,14 +336,8 @@ fn playground_response(
.expect("response is valid")
}
/// `juniper_warp` subscriptions handler implementation.
/// Cannot be merged to `juniper_warp` yet as GraphQL over WS[1]
/// is not fully supported in current implementation.
///
/// *Note: this implementation is in an alpha state.*
///
/// [1]: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
#[cfg(feature = "subscriptions")]
/// `juniper_warp` subscriptions handler implementation.
pub mod subscriptions {
use std::{convert::Infallible, fmt, sync::Arc};
@ -357,6 +351,7 @@ pub mod subscriptions {
};
use juniper_graphql_transport_ws;
use juniper_graphql_ws;
use warp::{filters::BoxedFilter, reply::Reply, Filter as _};
struct Message(warp::ws::Message);
@ -364,15 +359,23 @@ pub mod subscriptions {
type Error = serde_json::Error;
fn try_from(msg: Message) -> serde_json::Result<Self> {
serde_json::from_slice(msg.0.as_bytes())
if msg.0.is_close() {
Ok(Self::ConnectionTerminate)
} else {
serde_json::from_slice(msg.0.as_bytes())
}
}
}
impl<S: ScalarValue> TryFrom<Message> for juniper_graphql_transport_ws::ClientMessage<S> {
impl<S: ScalarValue> TryFrom<Message> for juniper_graphql_transport_ws::Input<S> {
type Error = serde_json::Error;
fn try_from(msg: Message) -> serde_json::Result<Self> {
serde_json::from_slice(msg.0.as_bytes())
if msg.0.is_close() {
Ok(Self::Close)
} else {
serde_json::from_slice(msg.0.as_bytes()).map(Self::Message)
}
}
}
@ -410,16 +413,173 @@ pub mod subscriptions {
}
}
/// Serves the graphql-ws protocol over a WebSocket connection.
/// Makes a filter for GraphQL subscriptions.
///
/// This filter auto-selects between the
/// [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] and the
/// [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], based on the
/// `Sec-Websocket-Protocol` HTTP header value.
///
/// The `schema` argument is your [`juniper`] schema.
///
/// The `init` argument is used to provide the custom [`juniper::Context`] and additional
/// configuration for connections. This can be a
/// [`juniper_graphql_transport_ws::ConnectionConfig`] if the context and configuration are
/// already known, or it can be a closure that gets executed asynchronously whenever a client
/// sends the subscription initialization message. Using a closure allows to perform an
/// authentication based on the parameters provided by a client.
///
/// # Example
///
/// ```rust
/// # use std::{convert::Infallible, pin::Pin, sync::Arc, time::Duration};
/// #
/// # use futures::Stream;
/// # use juniper::{graphql_object, graphql_subscription, EmptyMutation, RootNode};
/// # use juniper_graphql_transport_ws::ConnectionConfig;
/// # use juniper_warp::make_graphql_filter;
/// # use warp::Filter as _;
/// #
/// type UserId = String;
/// # #[derive(Debug)]
/// struct AppState(Vec<i64>);
/// #[derive(Clone)]
/// struct ExampleContext(Arc<AppState>, UserId);
/// # impl juniper::Context for ExampleContext {}
///
/// struct QueryRoot;
///
/// #[graphql_object(context = ExampleContext)]
/// impl QueryRoot {
/// fn say_hello(context: &ExampleContext) -> String {
/// format!(
/// "good morning {}, the app state is {:?}",
/// context.1,
/// context.0,
/// )
/// }
/// }
///
/// type StringsStream = Pin<Box<dyn Stream<Item = String> + Send>>;
///
/// struct SubscriptionRoot;
///
/// #[graphql_subscription(context = ExampleContext)]
/// impl SubscriptionRoot {
/// async fn say_hellos(context: &ExampleContext) -> StringsStream {
/// let mut interval = tokio::time::interval(Duration::from_secs(1));
/// let context = context.clone();
/// Box::pin(async_stream::stream! {
/// let mut counter = 0;
/// while counter < 5 {
/// counter += 1;
/// interval.tick().await;
/// yield format!(
/// "{counter}: good morning {}, the app state is {:?}",
/// context.1,
/// context.0,
/// )
/// }
/// })
/// }
/// }
///
/// let schema = Arc::new(RootNode::new(QueryRoot, EmptyMutation::new(), SubscriptionRoot));
/// let app_state = Arc::new(AppState(vec![3, 4, 5]));
/// let app_state_for_ws = app_state.clone();
///
/// let context_extractor = warp::any()
/// .and(warp::header::<String>("authorization"))
/// .and(warp::any().map(move || app_state.clone()))
/// .map(|auth_header: String, app_state: Arc<AppState>| {
/// let user_id = auth_header; // we believe them
/// ExampleContext(app_state, user_id)
/// })
/// .boxed();
///
/// let graphql_endpoint = (warp::path("graphql")
/// .and(warp::post())
/// .and(make_graphql_filter(schema.clone(), context_extractor)))
/// .or(warp::path("subscriptions")
/// .and(juniper_warp::subscriptions::make_ws_filter(
/// schema,
/// move |variables: juniper::Variables| {
/// let user_id = variables
/// .get("authorization")
/// .map(ToString::to_string)
/// .unwrap_or_default(); // we believe them
/// async move {
/// Ok::<_, Infallible>(ConnectionConfig::new(
/// ExampleContext(app_state_for_ws.clone(), user_id),
/// ))
/// }
/// },
/// )));
/// ```
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
/// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
pub fn make_ws_filter<Query, Mutation, Subscription, CtxT, S, I>(
schema: impl Into<Arc<juniper::RootNode<'static, Query, Mutation, Subscription, S>>>,
init: I,
) -> BoxedFilter<(impl Reply,)>
where
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Query::TypeInfo: Send + Sync,
Mutation: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Mutation::TypeInfo: Send + Sync,
Subscription: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: juniper_graphql_transport_ws::Init<S, CtxT> + Clone + Send + Sync,
{
let schema = schema.into();
warp::ws()
.and(warp::filters::header::value("sec-websocket-protocol"))
.map(move |ws: warp::ws::Ws, subproto| {
let schema = schema.clone();
let init = init.clone();
let is_legacy = subproto == "graphql-ws";
warp::reply::with_header(
ws.on_upgrade(move |ws| async move {
if is_legacy {
serve_graphql_ws(ws, schema, init).await
} else {
serve_graphql_transport_ws(ws, schema, init).await
}
.unwrap_or_else(|e| {
log::error!("GraphQL over WebSocket Protocol error: {e}");
})
}),
"sec-websocket-protocol",
if is_legacy {
"graphql-ws"
} else {
"graphql-transport-ws"
},
)
})
.boxed()
}
/// Serves the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old].
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a `juniper_graphql_ws::ConnectionConfig` if the context and
/// connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the context and
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the ConnectionInit message. Using a closure allows you to perform
/// authentication based on the parameters provided by the client.
/// when the client sends the `GQL_CONNECTION_INIT` message. Using a closure allows to perform
/// an authentication based on the parameters provided by a client.
///
/// This protocol has been deprecated in favor of the `graphql-transport-ws` protocol, which is
/// provided by the `serve_graphql_transport_ws` function.
/// > __WARNING__: This protocol has been deprecated in favor of the
/// [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], which is
/// provided by the [`serve_graphql_transport_ws()`] function.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
/// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
pub async fn serve_graphql_ws<Query, Mutation, Subscription, CtxT, S, I>(
websocket: warp::ws::WebSocket,
root_node: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
@ -459,13 +619,15 @@ pub mod subscriptions {
}
}
/// Serves the graphql-transport-ws protocol over a WebSocket connection.
/// Serves the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new].
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a `juniper_graphql_transport_ws::ConnectionConfig` if the context and
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the ConnectionInit message. Using a closure allows you to perform
/// authentication based on the parameters provided by the client.
/// connections. This can be a [`juniper_graphql_transport_ws::ConnectionConfig`] if the context
/// and configuration are already known, or it can be a closure that gets executed
/// asynchronously when the client sends the `ConnectionInit` message. Using a closure allows to
/// perform an authentication based on the parameters provided by a client.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
pub async fn serve_graphql_transport_ws<Query, Mutation, Subscription, CtxT, S, I>(
websocket: warp::ws::WebSocket,
root_node: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,