Merge juniper_graphql_transport_ws and juniper_graphql_ws crates (#1196, #1022)

This commit is contained in:
Kai Ren 2023-10-17 16:12:20 +02:00 committed by GitHub
parent f3a1a0c65d
commit aaf28e962d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 1220 additions and 1361 deletions

View file

@ -101,6 +101,8 @@ jobs:
- { feature: time, crate: juniper }
- { feature: url, crate: juniper }
- { feature: uuid, crate: juniper }
- { feature: graphql-transport-ws, crate: juniper_graphql_ws }
- { feature: graphql-ws, crate: juniper_graphql_ws }
- { feature: <none>, crate: juniper_actix }
- { feature: subscriptions, crate: juniper_actix }
- { feature: <none>, crate: juniper_warp }
@ -134,7 +136,6 @@ jobs:
- juniper_codegen
- juniper
- juniper_subscriptions
- juniper_graphql_transport_ws
- juniper_graphql_ws
#- juniper_actix
- juniper_hyper
@ -189,7 +190,6 @@ jobs:
- juniper_codegen
- juniper
- juniper_subscriptions
- juniper_graphql_transport_ws
- juniper_graphql_ws
- juniper_integration_tests
- juniper_codegen_tests
@ -318,7 +318,6 @@ jobs:
- juniper_codegen
- juniper
- juniper_subscriptions
- juniper_graphql_transport_ws
- juniper_graphql_ws
- juniper_actix
- juniper_hyper

View file

@ -8,7 +8,6 @@ members = [
"juniper_iron",
"juniper_rocket",
"juniper_subscriptions",
"juniper_graphql_transport_ws",
"juniper_graphql_ws",
"juniper_warp",
"juniper_actix",

View file

@ -94,7 +94,7 @@ ifeq ($(clean),yes)
cargo clean
endif
$(eval target := $(strip $(shell cargo -vV | sed -n 's/host: //p')))
cargo build
cargo build --all-features
mdbook test book -L target/debug/deps $(strip \
$(if $(call eq,$(findstring windows,$(target)),),,\
$(shell cargo metadata -q \

View file

@ -22,7 +22,6 @@ rustdoc-args = ["--cfg", "docsrs"]
subscriptions = [
"dep:actix",
"dep:actix-web-actors",
"dep:juniper_graphql_transport_ws",
"dep:juniper_graphql_ws",
"dep:tokio",
]
@ -35,8 +34,7 @@ actix-web-actors = { version = "4.1", optional = true }
anyhow = "1.0.47"
futures = "0.3.22"
juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false }
juniper_graphql_transport_ws = { version = "0.4.0-dev", path = "../juniper_graphql_transport_ws", optional = true }
juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", optional = true }
juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", features = ["graphql-transport-ws", "graphql-ws"], optional = true }
http = "0.2.4"
serde = { version = "1.0.122", features = ["derive"] }
serde_json = "1.0.18"

View file

@ -183,7 +183,7 @@ pub mod subscriptions {
SinkExt as _, Stream, StreamExt as _,
};
use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue};
use juniper_graphql_transport_ws::{ArcSchema, Init};
use juniper_graphql_ws::{graphql_transport_ws, graphql_ws, ArcSchema, Init};
use tokio::sync::Mutex;
/// Serves by auto-selecting between the
@ -194,11 +194,10 @@ pub mod subscriptions {
/// The `schema` argument is your [`juniper`] schema.
///
/// The `init` argument is used to provide the custom [`juniper::Context`] and additional
/// configuration for connections. This can be a
/// [`juniper_graphql_transport_ws::ConnectionConfig`] if the context and configuration are
/// already known, or it can be a closure that gets executed asynchronously whenever a client
/// sends the subscription initialization message. Using a closure allows to perform an
/// authentication based on the parameters provided by a client.
/// configuration for connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the
/// context and configuration are already known, or it can be a closure that gets executed
/// asynchronously whenever a client sends the subscription initialization message. Using a
/// closure allows to perform an authentication based on the parameters provided by a client.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
/// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
@ -262,8 +261,7 @@ pub mod subscriptions {
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
{
let (s_tx, s_rx) =
juniper_graphql_ws::Connection::new(ArcSchema(schema), init).split::<Message>();
let (s_tx, s_rx) = graphql_ws::Connection::new(ArcSchema(schema), init).split::<Message>();
let mut resp = ws::start(
Actor {
@ -285,10 +283,10 @@ pub mod subscriptions {
/// Serves the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new].
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a [`juniper_graphql_transport_ws::ConnectionConfig`] if the context
/// and configuration are already known, or it can be a closure that gets executed
/// asynchronously when the client sends the `ConnectionInit` message. Using a closure allows to
/// perform an authentication based on the parameters provided by a client.
/// connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the context and
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the `ConnectionInit` message. Using a closure allows to perform an
/// authentication based on the parameters provided by a client.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
pub async fn graphql_transport_ws_handler<Query, Mutation, Subscription, CtxT, S, I>(
@ -308,8 +306,8 @@ pub mod subscriptions {
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
{
let (s_tx, s_rx) = juniper_graphql_transport_ws::Connection::new(ArcSchema(schema), init)
.split::<Message>();
let (s_tx, s_rx) =
graphql_transport_ws::Connection::new(ArcSchema(schema), init).split::<Message>();
let mut resp = ws::start(
Actor {
@ -429,7 +427,7 @@ pub mod subscriptions {
fn into_ws_response(self) -> Result<String, ws::CloseReason>;
}
impl<S: ScalarValue> IntoWsResponse for juniper_graphql_transport_ws::Output<S> {
impl<S: ScalarValue> IntoWsResponse for graphql_transport_ws::Output<S> {
fn into_ws_response(self) -> Result<String, ws::CloseReason> {
match self {
Self::Message(msg) => serde_json::to_string(&msg).map_err(|e| ws::CloseReason {
@ -444,7 +442,7 @@ pub mod subscriptions {
}
}
impl<S: ScalarValue> IntoWsResponse for juniper_graphql_ws::ServerMessage<S> {
impl<S: ScalarValue> IntoWsResponse for graphql_ws::ServerMessage<S> {
fn into_ws_response(self) -> Result<String, ws::CloseReason> {
serde_json::to_string(&self).map_err(|e| ws::CloseReason {
code: ws::CloseCode::Error,
@ -456,7 +454,7 @@ pub mod subscriptions {
#[derive(Debug)]
struct Message(ws::Message);
impl<S: ScalarValue> TryFrom<Message> for juniper_graphql_transport_ws::Input<S> {
impl<S: ScalarValue> TryFrom<Message> for graphql_transport_ws::Input<S> {
type Error = Error;
fn try_from(msg: Message) -> Result<Self, Self::Error> {
@ -470,7 +468,7 @@ pub mod subscriptions {
}
}
impl<S: ScalarValue> TryFrom<Message> for juniper_graphql_ws::ClientMessage<S> {
impl<S: ScalarValue> TryFrom<Message> for graphql_ws::ClientMessage<S> {
type Error = Error;
fn try_from(msg: Message) -> Result<Self, Self::Error> {

View file

@ -1,27 +0,0 @@
`juniper_graphql_transport_ws` changelog
========================================
All user visible changes to `juniper_graphql_transport_ws` crate will be documented in this file. This project uses [Semantic Versioning 2.0.0].
## master
### Implemented
- [`graphql-transport-ws` GraphQL over WebSocket Protocol][proto-5.14.0] as of 5.14.0 version of [`graphql-ws` npm package]. ([#1158], [#1191], [#1022])
- On top of 0.16 version of [`juniper` crate] and 0.17 version of [`juniper_subscriptions` crate].
[#1022]: /../../issues/1022
[#1158]: /../../pull/1158
[#1191]: /../../pull/1191
[proto-5.14.0]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws
[`juniper` crate]: https://docs.rs/juniper
[`juniper_subscriptions` crate]: https://docs.rs/juniper_subscriptions
[Semantic Versioning 2.0.0]: https://semver.org

View file

@ -1,25 +0,0 @@
[package]
name = "juniper_graphql_transport_ws"
version = "0.4.0-dev"
edition = "2021"
rust-version = "1.65"
description = "`graphql-transport-ws` GraphQL over WebSocket Protocol implementation for `juniper` crate."
license = "BSD-2-Clause"
authors = ["Christopher Brown <ccbrown112@gmail.com>"]
documentation = "https://docs.rs/juniper_graphql_transport_ws"
homepage = "https://github.com/graphql-rust/juniper/tree/master/juniper_graphql_transport_ws"
repository = "https://github.com/graphql-rust/juniper"
readme = "README.md"
categories = ["asynchronous", "web-programming", "web-programming::http-server"]
keywords = ["apollo", "graphql", "graphql-transport-ws", "subscription", "websocket"]
exclude = ["/release.toml"]
[dependencies]
juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false }
juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws" }
juniper_subscriptions = { version = "0.17.0-dev", path = "../juniper_subscriptions" }
serde = { version = "1.0.122", features = ["derive"], default-features = false }
tokio = { version = "1.0", features = ["macros", "rt", "time"], default-features = false }
[dev-dependencies]
serde_json = "1.0.18"

View file

@ -1,25 +0,0 @@
BSD 2-Clause License
Copyright (c) 2018-2023, Christopher Brown
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -1,29 +0,0 @@
`juniper_graphql_transport_ws` crate
====================================
[![Crates.io](https://img.shields.io/crates/v/juniper_graphql_transport_ws.svg?maxAge=2592000)](https://crates.io/crates/juniper_graphql_transport_ws)
[![Documentation](https://docs.rs/juniper_graphql_transport_ws/badge.svg)](https://docs.rs/juniper_graphql_transport_ws)
[![CI](https://github.com/graphql-rust/juniper/workflows/CI/badge.svg?branch=master "CI")](https://github.com/graphql-rust/juniper/actions?query=workflow%3ACI+branch%3Amaster)
[![Rust 1.65+](https://img.shields.io/badge/rustc-1.65+-lightgray.svg "Rust 1.65+")](https://blog.rust-lang.org/2022/11/03/Rust-1.65.0.html)
- [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_transport_ws/CHANGELOG.md)
This crate contains an implementation of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], as now used by [Apollo] and [`graphql-ws` npm package].
Implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] may be found in the [`juniper_graphql_ws` crate].
## License
This project is licensed under [BSD 2-Clause License](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_transport_ws/LICENSE).
[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws
[`juniper_graphql_ws` crate]: https://docs.rs/juniper_graphql_ws
[Apollo]: https://www.apollographql.com
[new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
[old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md

View file

@ -1,24 +0,0 @@
[[pre-release-replacements]]
file = "../juniper_actix/Cargo.toml"
exactly = 1
search = "juniper_graphql_transport_ws = \\{ version = \"[^\"]+\""
replace = "juniper_graphql_transport_ws = { version = \"{{version}}\""
[[pre-release-replacements]]
file = "../juniper_warp/Cargo.toml"
exactly = 1
search = "juniper_graphql_transport_ws = \\{ version = \"[^\"]+\""
replace = "juniper_graphql_transport_ws = { version = \"{{version}}\""
[[pre-release-replacements]]
file = "CHANGELOG.md"
max = 1
min = 0
search = "## master"
replace = "## [{{version}}] · {{date}}\n[{{version}}]: /../../tree/{{crate_name}}-v{{version}}/{{crate_name}}"
[[pre-release-replacements]]
file = "README.md"
exactly = 2
search = "graphql-rust/juniper/blob/[^/]+/"
replace = "graphql-rust/juniper/blob/{{crate_name}}-v{{version}}/"

View file

@ -1,2 +0,0 @@
#[doc(inline)]
pub use juniper_graphql_ws::{ArcSchema, Schema};

View file

@ -10,14 +10,24 @@ All user visible changes to `juniper_graphql_ws` crate will be documented in thi
### BC Breaks
- Moved existing implementation to `graphql_ws` module implementing [legacy `graphql-ws` GraphQL over WebSocket Protocol][proto-legacy] behind `graphql-ws` Cargo feature. ([#1196])
- Switched to 0.16 version of [`juniper` crate].
- Switched to 0.17 version of [`juniper_subscriptions` crate].
### Added
- `graphql_transport_ws` module implementing [`graphql-transport-ws` GraphQL over WebSocket Protocol][proto-5.14.0] as of 5.14.0 version of [`graphql-ws` npm package] behind `graphql-transport-ws` Cargo feature. ([#1158], [#1191], [#1196], [#1022])
### Changed
- Made fields of `ConnectionConfig` public to reuse in [`juniper_graphql_transport_ws` crate]. ([#1191])
- Made fields of `ConnectionConfig` public. ([#1191])
[#1022]: /../../issues/1022
[#1158]: /../../pull/1158
[#1191]: /../../pull/1191
[#1196]: /../../pull/1196
[proto-5.14.0]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
[proto-legacy]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
@ -29,7 +39,7 @@ See [old CHANGELOG](/../../blob/juniper_graphql_ws-v0.3.0/juniper_graphql_ws/CHA
[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws
[`juniper` crate]: https://docs.rs/juniper
[`juniper_graphql_transport_ws` crate]: https://docs.rs/juniper_graphql_transport_ws
[`juniper_subscriptions` crate]: https://docs.rs/juniper_subscriptions
[Semantic Versioning 2.0.0]: https://semver.org

View file

@ -3,17 +3,28 @@ name = "juniper_graphql_ws"
version = "0.4.0-dev"
edition = "2021"
rust-version = "1.65"
description = "Legacy `graphql-ws` GraphQL over WebSocket Protocol implementation for `juniper` crate."
description = "GraphQL over WebSocket Protocol implementations for `juniper` crate."
license = "BSD-2-Clause"
authors = ["Christopher Brown <ccbrown112@gmail.com>"]
authors = [
"Christopher Brown <ccbrown112@gmail.com>",
"Kai Ren <tyranron@gmail.com>",
]
documentation = "https://docs.rs/juniper_graphql_ws"
homepage = "https://github.com/graphql-rust/juniper/tree/master/juniper_graphql_ws"
repository = "https://github.com/graphql-rust/juniper"
readme = "README.md"
categories = ["asynchronous", "web-programming", "web-programming::http-server"]
keywords = ["apollo", "graphql", "graphql-ws", "subscription", "websocket"]
keywords = ["apollo", "graphql-transport-ws", "graphql-ws", "subscription", "websocket"]
exclude = ["/release.toml"]
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[features]
graphql-transport-ws = []
graphql-ws = []
[dependencies]
juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false }
juniper_subscriptions = { version = "0.17.0-dev", path = "../juniper_subscriptions" }

View file

@ -8,7 +8,11 @@
- [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_ws/CHANGELOG.md)
This crate contains an implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old], as formerly used by [Apollo] and [`subscriptions-transport-ws` npm package]. It has now been deprecated in favor of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], implemented by the new [`juniper_graphql_transport_ws` crate] and new [`graphql-ws` npm package].
This crate contains implementations of 2 protocols:
1. (`graphql-transport-ws` feature) The [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], as now used by [Apollo] and [`graphql-ws` npm package].
2. (`graphql-ws` feature) The [legacy `graphql-ws` GraphQL over WebSocket Protocol][old], as formerly used by [Apollo] and [`subscriptions-transport-ws` npm package] (deprecated in favor of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new] mentioned above).
@ -21,7 +25,6 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql
[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws
[`juniper_graphql_transport_ws` crate]: https://docs.rs/juniper_graphql_transport_ws
[`subscriptions-transport-ws` npm package]: https://npmjs.com/package/subscriptions-transport-ws
[Apollo]: https://www.apollographql.com
[new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md

View file

@ -1,7 +1,7 @@
use juniper::Variables;
use serde::Deserialize;
use crate::utils::default_for_null;
use crate::util::default_for_null;
/// The payload for a client's "start" message. This triggers execution of a query, mutation, or
/// subscription.

View file

@ -1,16 +1,17 @@
#![doc = include_str!("../README.md")]
#![deny(missing_docs, warnings)]
//! Implementation of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], as now
//! used by [Apollo] and [`graphql-ws` npm package].
//!
//! Implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] may be found in
//! the [`graphql_ws` module].
//!
//! [`graphql_ws` module]: crate::graphql_ws
//! [`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws
//! [Apollo]: https://www.apollographql.com
//! [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
//! [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
mod client_message;
pub use client_message::*;
mod server_message;
pub use server_message::*;
mod schema;
pub use schema::*;
mod utils;
use std::{
collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, pin::Pin,
@ -28,8 +29,12 @@ use juniper::{
GraphQLError, RuleError, ScalarValue,
};
#[doc(inline)]
pub use juniper_graphql_ws::{ConnectionConfig, Init};
use super::{ConnectionConfig, Init, Schema};
pub use self::{
client_message::{ClientMessage, SubscribePayload},
server_message::{ErrorPayload, NextPayload, ServerMessage},
};
struct ExecutionParams<S: Schema> {
subscribe_payload: SubscribePayload<S::ScalarValue>,

View file

@ -1,11 +1,11 @@
use std::{any::Any, fmt, marker::PhantomPinned};
use juniper::{ExecutionError, Value};
use serde::Serialize;
use juniper::{ExecutionError, GraphQLError, Value};
use serde::{Serialize, Serializer};
pub use crate::server_message::ErrorPayload;
/// Sent after execution of an operation. For queries and mutations, this is sent to the client
/// once. For subscriptions, this is sent for every event in the event stream.
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct NextPayload<S> {
/// The result data.
@ -17,67 +17,8 @@ pub struct NextPayload<S> {
pub errors: Vec<ExecutionError<S>>,
}
/// A payload for errors that can happen before execution. Errors that happen during execution are
/// instead sent to the client via `NextPayload`. `ErrorPayload` is a wrapper for an owned
/// `GraphQLError`.
// XXX: Think carefully before deriving traits. This is self-referential (error references
// _execution_params).
pub struct ErrorPayload {
_execution_params: Option<Box<dyn Any + Send>>,
error: GraphQLError,
_marker: PhantomPinned,
}
impl ErrorPayload {
/// Creates a new [`ErrorPayload`] out of the provide `execution_params` and
/// [`GraphQLError`].
pub(crate) fn new(execution_params: Box<dyn Any + Send>, error: GraphQLError) -> Self {
Self {
_execution_params: Some(execution_params),
error,
_marker: PhantomPinned,
}
}
/// Returns the contained GraphQLError.
pub fn graphql_error(&self) -> &GraphQLError {
&self.error
}
}
impl fmt::Debug for ErrorPayload {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.error.fmt(f)
}
}
impl PartialEq for ErrorPayload {
fn eq(&self, other: &Self) -> bool {
self.error.eq(&other.error)
}
}
impl Serialize for ErrorPayload {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.error.serialize(serializer)
}
}
impl From<GraphQLError> for ErrorPayload {
fn from(error: GraphQLError) -> Self {
Self {
_execution_params: None,
error,
_marker: PhantomPinned,
}
}
}
/// ServerMessage defines the message types that servers can send.
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub enum ServerMessage<S> {
@ -111,7 +52,7 @@ pub enum ServerMessage<S> {
#[cfg(test)]
mod test {
use juniper::{graphql_value, DefaultScalarValue};
use juniper::{graphql_value, DefaultScalarValue, GraphQLError};
use super::*;

View file

@ -1,7 +1,7 @@
use juniper::Variables;
use serde::Deserialize;
use crate::utils::default_for_null;
use crate::util::default_for_null;
/// The payload for a client's "start" message. This triggers execution of a query, mutation, or
/// subscription.

View file

@ -0,0 +1,959 @@
//! Implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old], as formerly
//! used by [Apollo] and [`subscriptions-transport-ws` npm package].
//!
//! It has now been deprecated in favor of the
//! [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], implemented by the
//! [`graphql_transport_ws` module] and new [`graphql-ws` npm package].
//!
//! [`graphql_transport_ws` module]: crate::graphql_transport_ws
//! [`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws
//! [`subscriptions-transport-ws` npm package]: https://npmjs.com/package/subscriptions-transport-ws
//! [Apollo]: https://www.apollographql.com
//! [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
//! [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
mod client_message;
mod server_message;
use std::{
collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, pin::Pin,
sync::Arc, time::Duration,
};
use juniper::{
futures::{
channel::oneshot,
future::{self, BoxFuture, Either, Future, FutureExt, TryFutureExt},
stream::{self, BoxStream, SelectAll, StreamExt},
task::{Context, Poll, Waker},
Sink, Stream,
},
GraphQLError, RuleError,
};
use super::{ConnectionConfig, Init, Schema};
pub use self::{
client_message::{ClientMessage, StartPayload},
server_message::{ConnectionErrorPayload, DataPayload, ErrorPayload, ServerMessage},
};
struct ExecutionParams<S: Schema> {
start_payload: StartPayload<S::ScalarValue>,
config: Arc<ConnectionConfig<S::Context>>,
schema: S,
}
enum Reaction<S: Schema> {
ServerMessage(ServerMessage<S::ScalarValue>),
EndStream,
}
impl<S: Schema> Reaction<S> {
/// Converts the reaction into a one-item stream.
fn into_stream(self) -> BoxStream<'static, Self> {
stream::once(future::ready(self)).boxed()
}
}
enum ConnectionState<S: Schema, I: Init<S::ScalarValue, S::Context>> {
/// PreInit is the state before a ConnectionInit message has been accepted.
PreInit { init: I, schema: S },
/// Active is the state after a ConnectionInit message has been accepted.
Active {
config: Arc<ConnectionConfig<S::Context>>,
stoppers: HashMap<String, oneshot::Sender<()>>,
schema: S,
},
/// Terminated is the state after a ConnectionInit message has been rejected.
Terminated,
}
impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
// Each message we receive results in a stream of zero or more reactions. For example, a
// ConnectionTerminate message results in a one-item stream with the EndStream reaction.
async fn handle_message(
self,
msg: ClientMessage<S::ScalarValue>,
) -> (Self, BoxStream<'static, Reaction<S>>) {
if let ClientMessage::ConnectionTerminate = msg {
return (self, Reaction::EndStream.into_stream());
}
match self {
Self::PreInit { init, schema } => match msg {
ClientMessage::ConnectionInit { payload } => match init.init(payload).await {
Ok(config) => {
let keep_alive_interval = config.keep_alive_interval;
let mut s = stream::iter(vec![Reaction::ServerMessage(
ServerMessage::ConnectionAck,
)])
.boxed();
if keep_alive_interval > Duration::from_secs(0) {
s = s
.chain(
Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive)
.into_stream(),
)
.boxed();
s = s
.chain(stream::unfold((), move |_| async move {
tokio::time::sleep(keep_alive_interval).await;
Some((
Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive),
(),
))
}))
.boxed();
}
(
Self::Active {
config: Arc::new(config),
stoppers: HashMap::new(),
schema,
},
s,
)
}
Err(e) => (
Self::Terminated,
stream::iter(vec![
Reaction::ServerMessage(ServerMessage::ConnectionError {
payload: ConnectionErrorPayload {
message: e.to_string(),
},
}),
Reaction::EndStream,
])
.boxed(),
),
},
_ => (Self::PreInit { init, schema }, stream::empty().boxed()),
},
Self::Active {
config,
mut stoppers,
schema,
} => {
let reactions = match msg {
ClientMessage::Start { id, payload } => {
if stoppers.contains_key(&id) {
// We already have an operation with this id, so we can't start a new
// one.
stream::empty().boxed()
} else {
// Go ahead and prune canceled stoppers before adding a new one.
stoppers.retain(|_, tx| !tx.is_canceled());
if config.max_in_flight_operations > 0
&& stoppers.len() >= config.max_in_flight_operations
{
// Too many in-flight operations. Just send back a validation error.
stream::iter(vec![
Reaction::ServerMessage(ServerMessage::Error {
id: id.clone(),
payload: GraphQLError::ValidationError(vec![
RuleError::new("Too many in-flight operations.", &[]),
])
.into(),
}),
Reaction::ServerMessage(ServerMessage::Complete { id }),
])
.boxed()
} else {
// Create a channel that we can use to cancel the operation.
let (tx, rx) = oneshot::channel::<()>();
stoppers.insert(id.clone(), tx);
// Create the operation stream. This stream will emit Data and Error
// messages, but will not emit Complete that part is up to us.
let s = Self::start(
id.clone(),
ExecutionParams {
start_payload: payload,
config: config.clone(),
schema: schema.clone(),
},
)
.into_stream()
.flatten();
// Combine this with our oneshot channel so that the stream ends if the
// oneshot is ever fired.
let s = stream::unfold((rx, s.boxed()), |(rx, mut s)| async move {
let next = match future::select(rx, s.next()).await {
Either::Left(_) => None,
Either::Right((r, rx)) => r.map(|r| (r, rx)),
};
next.map(|(r, rx)| (r, (rx, s)))
});
// Once the stream ends, send the Complete message.
let s = s.chain(
Reaction::ServerMessage(ServerMessage::Complete { id })
.into_stream(),
);
s.boxed()
}
}
}
ClientMessage::Stop { id } => {
stoppers.remove(&id);
stream::empty().boxed()
}
_ => stream::empty().boxed(),
};
(
Self::Active {
config,
stoppers,
schema,
},
reactions,
)
}
Self::Terminated => (self, stream::empty().boxed()),
}
}
async fn start(id: String, params: ExecutionParams<S>) -> BoxStream<'static, Reaction<S>> {
// TODO: This could be made more efficient if `juniper` exposed
// functionality to allow us to parse and validate the query,
// determine whether it's a subscription, and then execute it.
// For now, the query gets parsed and validated twice.
let params = Arc::new(params);
// Try to execute this as a query or mutation.
match juniper::execute(
&params.start_payload.query,
params.start_payload.operation_name.as_deref(),
params.schema.root_node(),
&params.start_payload.variables,
&params.config.context,
)
.await
{
Ok((data, errors)) => {
return Reaction::ServerMessage(ServerMessage::Data {
id: id.clone(),
payload: DataPayload { data, errors },
})
.into_stream();
}
Err(GraphQLError::IsSubscription) => {}
Err(e) => {
return Reaction::ServerMessage(ServerMessage::Error {
id: id.clone(),
payload: ErrorPayload::new(Box::new(params.clone()), e),
})
.into_stream();
}
}
// Try to execute as a subscription.
SubscriptionStart::new(id, params.clone()).boxed()
}
}
struct InterruptableStream<S> {
stream: S,
rx: oneshot::Receiver<()>,
}
impl<S: Stream + Unpin> Stream for InterruptableStream<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.rx).poll(cx) {
Poll::Ready(_) => return Poll::Ready(None),
Poll::Pending => {}
}
Pin::new(&mut self.stream).poll_next(cx)
}
}
/// SubscriptionStartState is the state for a subscription operation.
enum SubscriptionStartState<S: Schema> {
/// Init is the start before being polled for the first time.
Init { id: String },
/// ResolvingIntoStream is the state after being polled for the first time. In this state,
/// we're parsing, validating, and getting the actual event stream.
ResolvingIntoStream {
id: String,
future: BoxFuture<
'static,
Result<juniper_subscriptions::Connection<'static, S::ScalarValue>, GraphQLError>,
>,
},
/// Streaming is the state after we've successfully obtained the event stream for the
/// subscription. In this state, we're just forwarding events back to the client.
Streaming {
id: String,
stream: juniper_subscriptions::Connection<'static, S::ScalarValue>,
},
/// Terminated is the state once we're all done.
Terminated,
}
/// SubscriptionStart is the stream for a subscription operation.
struct SubscriptionStart<S: Schema> {
params: Arc<ExecutionParams<S>>,
state: SubscriptionStartState<S>,
_marker: PhantomPinned,
}
impl<S: Schema> SubscriptionStart<S> {
fn new(id: String, params: Arc<ExecutionParams<S>>) -> Pin<Box<Self>> {
Box::pin(Self {
params,
state: SubscriptionStartState::Init { id },
_marker: PhantomPinned,
})
}
}
impl<S: Schema> Stream for SubscriptionStart<S> {
type Item = Reaction<S>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let (params, state) = unsafe {
// XXX: The execution parameters are referenced by state and must not be modified.
// Modifying state is fine though.
let inner = self.get_unchecked_mut();
(&inner.params, &mut inner.state)
};
loop {
match state {
SubscriptionStartState::Init { id } => {
// XXX: resolve_into_stream returns a Future that references the execution
// parameters, and the returned stream also references them. We can guarantee
// that everything has the same lifetime in this self-referential struct.
let params = Arc::as_ptr(params);
*state = SubscriptionStartState::ResolvingIntoStream {
id: id.clone(),
future: unsafe {
juniper::resolve_into_stream(
&(*params).start_payload.query,
(*params).start_payload.operation_name.as_deref(),
(*params).schema.root_node(),
&(*params).start_payload.variables,
&(*params).config.context,
)
}
.map_ok(|(stream, errors)| {
juniper_subscriptions::Connection::from_stream(stream, errors)
})
.boxed(),
};
}
SubscriptionStartState::ResolvingIntoStream {
ref id,
ref mut future,
} => match future.as_mut().poll(cx) {
Poll::Ready(r) => match r {
Ok(stream) => {
*state = SubscriptionStartState::Streaming {
id: id.clone(),
stream,
}
}
Err(e) => {
return Poll::Ready(Some(Reaction::ServerMessage(
ServerMessage::Error {
id: id.clone(),
payload: ErrorPayload::new(Box::new(params.clone()), e),
},
)));
}
},
Poll::Pending => return Poll::Pending,
},
SubscriptionStartState::Streaming {
ref id,
ref mut stream,
} => match Pin::new(stream).poll_next(cx) {
Poll::Ready(Some(output)) => {
return Poll::Ready(Some(Reaction::ServerMessage(ServerMessage::Data {
id: id.clone(),
payload: DataPayload {
data: output.data,
errors: output.errors,
},
})));
}
Poll::Ready(None) => {
*state = SubscriptionStartState::Terminated;
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
},
SubscriptionStartState::Terminated => return Poll::Ready(None),
}
}
}
}
enum ConnectionSinkState<S: Schema, I: Init<S::ScalarValue, S::Context>> {
Ready {
state: ConnectionState<S, I>,
},
HandlingMessage {
#[allow(clippy::type_complexity)]
result: BoxFuture<'static, (ConnectionState<S, I>, BoxStream<'static, Reaction<S>>)>,
},
Closed,
}
/// Implements the graphql-ws protocol. This is a sink for `TryInto<ClientMessage>` and a stream of
/// `ServerMessage`.
pub struct Connection<S: Schema, I: Init<S::ScalarValue, S::Context>> {
reactions: SelectAll<BoxStream<'static, Reaction<S>>>,
stream_waker: Option<Waker>,
sink_state: ConnectionSinkState<S, I>,
}
impl<S, I> Connection<S, I>
where
S: Schema,
I: Init<S::ScalarValue, S::Context>,
{
/// Creates a new connection, which is a sink for `TryInto<ClientMessage>` and a stream of `ServerMessage`.
///
/// The `schema` argument should typically be an `Arc<RootNode<...>>`.
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a `ConnectionConfig` if the context and configuration are already
/// known, or it can be a closure that gets executed asynchronously when the client sends the
/// ConnectionInit message. Using a closure allows you to perform authentication based on the
/// parameters provided by the client.
pub fn new(schema: S, init: I) -> Self {
Self {
reactions: SelectAll::new(),
stream_waker: None,
sink_state: ConnectionSinkState::Ready {
state: ConnectionState::PreInit { init, schema },
},
}
}
}
impl<S, I, T> Sink<T> for Connection<S, I>
where
T: TryInto<ClientMessage<S::ScalarValue>>,
T::Error: Error,
S: Schema,
I: Init<S::ScalarValue, S::Context> + Send,
{
type Error = Infallible;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
match &mut self.sink_state {
ConnectionSinkState::Ready { .. } => Poll::Ready(Ok(())),
ConnectionSinkState::HandlingMessage { ref mut result } => {
match Pin::new(result).poll(cx) {
Poll::Ready((state, reactions)) => {
self.reactions.push(reactions);
self.sink_state = ConnectionSinkState::Ready { state };
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
ConnectionSinkState::Closed => panic!("poll_ready called after close"),
}
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let s = self.get_mut();
let state = &mut s.sink_state;
*state = match std::mem::replace(state, ConnectionSinkState::Closed) {
ConnectionSinkState::Ready { state } => {
match item.try_into() {
Ok(msg) => ConnectionSinkState::HandlingMessage {
result: state.handle_message(msg).boxed(),
},
Err(e) => {
// If we weren't able to parse the message, send back an error.
s.reactions.push(
Reaction::ServerMessage(ServerMessage::ConnectionError {
payload: ConnectionErrorPayload {
message: e.to_string(),
},
})
.into_stream(),
);
ConnectionSinkState::Ready { state }
}
}
}
_ => panic!("start_send called when not ready"),
};
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
<Self as Sink<T>>::poll_ready(self, cx)
}
fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.sink_state = ConnectionSinkState::Closed;
if let Some(waker) = self.stream_waker.take() {
// Wake up the stream so it can close too.
waker.wake();
}
Poll::Ready(Ok(()))
}
}
impl<S, I> Stream for Connection<S, I>
where
S: Schema,
I: Init<S::ScalarValue, S::Context>,
{
type Item = ServerMessage<S::ScalarValue>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.stream_waker = Some(cx.waker().clone());
if let ConnectionSinkState::Closed = self.sink_state {
return Poll::Ready(None);
}
// Poll the reactions for new outgoing messages.
if !self.reactions.is_empty() {
match Pin::new(&mut self.reactions).poll_next(cx) {
Poll::Ready(Some(reaction)) => match reaction {
Reaction::ServerMessage(msg) => return Poll::Ready(Some(msg)),
Reaction::EndStream => return Poll::Ready(None),
},
Poll::Ready(None) => {
// In rare cases, the reaction stream may terminate. For example, this will
// happen if the first message we receive does not require any reaction. Just
// recreate it in that case.
self.reactions = SelectAll::new();
}
_ => (),
}
}
Poll::Pending
}
}
#[cfg(test)]
mod test {
use std::{convert::Infallible, io};
use juniper::{
futures::sink::SinkExt,
graphql_input_value, graphql_object, graphql_subscription, graphql_value, graphql_vars,
parser::{ParseError, Spanning},
DefaultScalarValue, EmptyMutation, FieldError, FieldResult, RootNode, Variables,
};
use super::*;
struct Context(i32);
impl juniper::Context for Context {}
struct Query;
#[graphql_object(context = Context)]
impl Query {
/// context just resolves to the current context.
async fn context(context: &Context) -> i32 {
context.0
}
}
struct Subscription;
#[graphql_subscription(context = Context)]
impl Subscription {
/// never never emits anything.
async fn never(_context: &Context) -> BoxStream<'static, FieldResult<i32>> {
tokio::time::sleep(Duration::from_secs(10000))
.map(|_| unreachable!())
.into_stream()
.boxed()
}
/// context emits the current context once, then never emits anything else.
async fn context(context: &Context) -> BoxStream<'static, FieldResult<i32>> {
stream::once(future::ready(Ok(context.0)))
.chain(
tokio::time::sleep(Duration::from_secs(10000))
.map(|_| unreachable!())
.into_stream(),
)
.boxed()
}
/// error emits an error once, then never emits anything else.
async fn error(_context: &Context) -> BoxStream<'static, FieldResult<i32>> {
stream::once(future::ready(Err(FieldError::new(
"field error",
graphql_value!(null),
))))
.chain(
tokio::time::sleep(Duration::from_secs(10000))
.map(|_| unreachable!())
.into_stream(),
)
.boxed()
}
}
type ClientMessage = super::ClientMessage<DefaultScalarValue>;
type ServerMessage = super::ServerMessage<DefaultScalarValue>;
fn new_test_schema() -> Arc<RootNode<'static, Query, EmptyMutation<Context>, Subscription>> {
Arc::new(RootNode::new(Query, EmptyMutation::new(), Subscription))
}
#[tokio::test]
async fn test_query() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "{context}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
assert_eq!(
ServerMessage::Data {
id: "foo".into(),
payload: DataPayload {
data: graphql_value!({"context": 1}),
errors: vec![],
},
},
conn.next().await.unwrap()
);
assert_eq!(
ServerMessage::Complete { id: "foo".into() },
conn.next().await.unwrap()
);
}
#[tokio::test]
async fn test_subscriptions() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "subscription Foo {context}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
assert_eq!(
ServerMessage::Data {
id: "foo".into(),
payload: DataPayload {
data: graphql_value!({"context": 1}),
errors: vec![],
},
},
conn.next().await.unwrap()
);
conn.send(ClientMessage::Start {
id: "bar".into(),
payload: StartPayload {
query: "subscription Bar {context}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
assert_eq!(
ServerMessage::Data {
id: "bar".into(),
payload: DataPayload {
data: graphql_value!({"context": 1}),
errors: vec![],
},
},
conn.next().await.unwrap()
);
conn.send(ClientMessage::Stop { id: "foo".into() })
.await
.unwrap();
assert_eq!(
ServerMessage::Complete { id: "foo".into() },
conn.next().await.unwrap()
);
}
#[tokio::test]
async fn test_init_params_ok() {
let mut conn = Connection::new(new_test_schema(), |params: Variables| async move {
assert_eq!(params.get("foo"), Some(&graphql_input_value!("bar")));
Ok(ConnectionConfig::new(Context(1))) as Result<_, Infallible>
});
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {"foo": "bar"},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
}
#[tokio::test]
async fn test_init_params_error() {
let mut conn = Connection::new(new_test_schema(), |params: Variables| async move {
assert_eq!(params.get("foo"), Some(&graphql_input_value!("bar")));
Err(io::Error::new(io::ErrorKind::Other, "init error"))
});
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {"foo": "bar"},
})
.await
.unwrap();
assert_eq!(
ServerMessage::ConnectionError {
payload: ConnectionErrorPayload {
message: "init error".into(),
},
},
conn.next().await.unwrap()
);
}
#[tokio::test]
async fn test_max_in_flight_operations() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1))
.with_keep_alive_interval(Duration::from_secs(0))
.with_max_in_flight_operations(1),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "subscription Foo {never}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
conn.send(ClientMessage::Start {
id: "bar".into(),
payload: StartPayload {
query: "subscription Bar {never}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
match conn.next().await.unwrap() {
ServerMessage::Error { id, .. } => {
assert_eq!(id, "bar");
}
msg => panic!("expected error, got: {msg:?}"),
}
}
#[tokio::test]
async fn test_parse_error() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "asd".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
match conn.next().await.unwrap() {
ServerMessage::Error { id, payload } => {
assert_eq!(id, "foo");
match payload.graphql_error() {
GraphQLError::ParseError(Spanning {
item: ParseError::UnexpectedToken(token),
..
}) => assert_eq!(token, "asd"),
p => panic!("expected graphql parse error, got: {p:?}"),
}
}
msg => panic!("expected error, got: {msg:?}"),
}
}
#[tokio::test]
async fn test_keep_alives() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_millis(20)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
for _ in 0..10 {
assert_eq!(
ServerMessage::ConnectionKeepAlive,
conn.next().await.unwrap()
);
}
}
#[tokio::test]
async fn test_slow_init() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
// If we send the start message before the init is handled, we should still get results.
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "{context}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
assert_eq!(
ServerMessage::Data {
id: "foo".into(),
payload: DataPayload {
data: graphql_value!({"context": 1}),
errors: vec![],
},
},
conn.next().await.unwrap()
);
}
#[tokio::test]
async fn test_subscription_field_error() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "subscription Foo {error}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
match conn.next().await.unwrap() {
ServerMessage::Data {
id,
payload: DataPayload { data, errors },
} => {
assert_eq!(id, "foo");
assert_eq!(data, graphql_value!({ "error": null }));
assert_eq!(errors.len(), 1);
}
msg => panic!("expected data, got: {msg:?}"),
}
}
}

View file

@ -0,0 +1,127 @@
use juniper::{ExecutionError, Value};
use serde::Serialize;
pub use crate::server_message::ErrorPayload;
/// The payload for errors that are not associated with a GraphQL operation.
#[derive(Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ConnectionErrorPayload {
/// The error message.
pub message: String,
}
/// Sent after execution of an operation. For queries and mutations, this is sent to the client
/// once. For subscriptions, this is sent for every event in the event stream.
#[derive(Debug, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DataPayload<S> {
/// The result data.
pub data: Value<S>,
/// The errors that have occurred during execution. Note that parse and validation errors are
/// not included here. They are sent via Error messages.
#[serde(skip_serializing_if = "Vec::is_empty")]
pub errors: Vec<ExecutionError<S>>,
}
/// ServerMessage defines the message types that servers can send.
#[derive(Debug, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub enum ServerMessage<S> {
/// ConnectionError is used for errors that are not associated with a GraphQL operation. For
/// example, this will be used when:
///
/// * The server is unable to parse a client's message.
/// * The client's initialization parameters are rejected.
ConnectionError {
/// The error that occurred.
payload: ConnectionErrorPayload,
},
/// ConnectionAck is sent in response to a client's ConnectionInit message if the server accepted a
/// connection.
ConnectionAck,
/// Data contains the result of a query, mutation, or subscription event.
Data {
/// The id of the operation that the data is for.
id: String,
/// The data and errors that occurred during execution.
payload: DataPayload<S>,
},
/// Error contains an error that occurs before execution, such as validation errors.
Error {
/// The id of the operation that triggered this error.
id: String,
/// The error(s).
payload: ErrorPayload,
},
/// Complete indicates that no more data will be sent for the given operation.
Complete {
/// The id of the operation that has completed.
id: String,
},
/// ConnectionKeepAlive is sent periodically after accepting a connection.
#[serde(rename = "ka")]
ConnectionKeepAlive,
}
#[cfg(test)]
mod test {
use juniper::{graphql_value, DefaultScalarValue, GraphQLError};
use super::*;
#[test]
fn test_serialization() {
type ServerMessage = super::ServerMessage<DefaultScalarValue>;
assert_eq!(
serde_json::to_string(&ServerMessage::ConnectionError {
payload: ConnectionErrorPayload {
message: "foo".into(),
},
})
.unwrap(),
r#"{"type":"connection_error","payload":{"message":"foo"}}"#,
);
assert_eq!(
serde_json::to_string(&ServerMessage::ConnectionAck).unwrap(),
r#"{"type":"connection_ack"}"#,
);
assert_eq!(
serde_json::to_string(&ServerMessage::Data {
id: "foo".into(),
payload: DataPayload {
data: graphql_value!(null),
errors: vec![],
},
})
.unwrap(),
r#"{"type":"data","id":"foo","payload":{"data":null}}"#,
);
assert_eq!(
serde_json::to_string(&ServerMessage::Error {
id: "foo".into(),
payload: GraphQLError::UnknownOperationName.into(),
})
.unwrap(),
r#"{"type":"error","id":"foo","payload":[{"message":"Unknown operation"}]}"#,
);
assert_eq!(
serde_json::to_string(&ServerMessage::Complete { id: "foo".into() }).unwrap(),
r#"{"type":"complete","id":"foo"}"#,
);
assert_eq!(
serde_json::to_string(&ServerMessage::ConnectionKeepAlive).unwrap(),
r#"{"type":"ka"}"#,
);
}
}

View file

@ -1,38 +1,30 @@
#![doc = include_str!("../README.md")]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![deny(missing_docs, warnings)]
mod client_message;
pub use client_message::*;
mod server_message;
pub use server_message::*;
#[cfg(not(any(feature = "graphql-transport-ws", feature = "graphql-ws")))]
compile_error!(
r#"at least one feature must be enabled (either "graphql-transport-ws" or "graphql-ws")"#
);
#[cfg(feature = "graphql-transport-ws")]
pub mod graphql_transport_ws;
#[cfg(feature = "graphql-ws")]
pub mod graphql_ws;
mod schema;
pub use schema::*;
mod utils;
mod server_message;
mod util;
use std::{
collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, pin::Pin,
sync::Arc, time::Duration,
convert::Infallible,
error::Error,
future::{self, Future},
time::Duration,
};
use juniper::{
futures::{
channel::oneshot,
future::{self, BoxFuture, Either, Future, FutureExt, TryFutureExt},
stream::{self, BoxStream, SelectAll, StreamExt},
task::{Context, Poll, Waker},
Sink, Stream,
},
GraphQLError, RuleError, ScalarValue, Variables,
};
use juniper::{ScalarValue, Variables};
struct ExecutionParams<S: Schema> {
start_payload: StartPayload<S::ScalarValue>,
config: Arc<ConnectionConfig<S::Context>>,
schema: S,
}
pub use self::schema::{ArcSchema, Schema};
/// ConnectionConfig is used to configure the connection once the client sends the ConnectionInit
/// message.
@ -96,18 +88,6 @@ impl<S: ScalarValue, CtxT: Unpin + Send + 'static> Init<S, CtxT> for ConnectionC
}
}
enum Reaction<S: Schema> {
ServerMessage(ServerMessage<S::ScalarValue>),
EndStream,
}
impl<S: Schema> Reaction<S> {
/// Converts the reaction into a one-item stream.
fn into_stream(self) -> BoxStream<'static, Self> {
stream::once(future::ready(self)).boxed()
}
}
/// Init defines the requirements for types that can provide connection configurations when
/// ConnectionInit messages are received. Implementations are provided for `ConnectionConfig` and
/// closures that meet the requirements.
@ -137,905 +117,3 @@ where
self(params)
}
}
enum ConnectionState<S: Schema, I: Init<S::ScalarValue, S::Context>> {
/// PreInit is the state before a ConnectionInit message has been accepted.
PreInit { init: I, schema: S },
/// Active is the state after a ConnectionInit message has been accepted.
Active {
config: Arc<ConnectionConfig<S::Context>>,
stoppers: HashMap<String, oneshot::Sender<()>>,
schema: S,
},
/// Terminated is the state after a ConnectionInit message has been rejected.
Terminated,
}
impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
// Each message we receive results in a stream of zero or more reactions. For example, a
// ConnectionTerminate message results in a one-item stream with the EndStream reaction.
async fn handle_message(
self,
msg: ClientMessage<S::ScalarValue>,
) -> (Self, BoxStream<'static, Reaction<S>>) {
if let ClientMessage::ConnectionTerminate = msg {
return (self, Reaction::EndStream.into_stream());
}
match self {
Self::PreInit { init, schema } => match msg {
ClientMessage::ConnectionInit { payload } => match init.init(payload).await {
Ok(config) => {
let keep_alive_interval = config.keep_alive_interval;
let mut s = stream::iter(vec![Reaction::ServerMessage(
ServerMessage::ConnectionAck,
)])
.boxed();
if keep_alive_interval > Duration::from_secs(0) {
s = s
.chain(
Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive)
.into_stream(),
)
.boxed();
s = s
.chain(stream::unfold((), move |_| async move {
tokio::time::sleep(keep_alive_interval).await;
Some((
Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive),
(),
))
}))
.boxed();
}
(
Self::Active {
config: Arc::new(config),
stoppers: HashMap::new(),
schema,
},
s,
)
}
Err(e) => (
Self::Terminated,
stream::iter(vec![
Reaction::ServerMessage(ServerMessage::ConnectionError {
payload: ConnectionErrorPayload {
message: e.to_string(),
},
}),
Reaction::EndStream,
])
.boxed(),
),
},
_ => (Self::PreInit { init, schema }, stream::empty().boxed()),
},
Self::Active {
config,
mut stoppers,
schema,
} => {
let reactions = match msg {
ClientMessage::Start { id, payload } => {
if stoppers.contains_key(&id) {
// We already have an operation with this id, so we can't start a new
// one.
stream::empty().boxed()
} else {
// Go ahead and prune canceled stoppers before adding a new one.
stoppers.retain(|_, tx| !tx.is_canceled());
if config.max_in_flight_operations > 0
&& stoppers.len() >= config.max_in_flight_operations
{
// Too many in-flight operations. Just send back a validation error.
stream::iter(vec![
Reaction::ServerMessage(ServerMessage::Error {
id: id.clone(),
payload: GraphQLError::ValidationError(vec![
RuleError::new("Too many in-flight operations.", &[]),
])
.into(),
}),
Reaction::ServerMessage(ServerMessage::Complete { id }),
])
.boxed()
} else {
// Create a channel that we can use to cancel the operation.
let (tx, rx) = oneshot::channel::<()>();
stoppers.insert(id.clone(), tx);
// Create the operation stream. This stream will emit Data and Error
// messages, but will not emit Complete that part is up to us.
let s = Self::start(
id.clone(),
ExecutionParams {
start_payload: payload,
config: config.clone(),
schema: schema.clone(),
},
)
.into_stream()
.flatten();
// Combine this with our oneshot channel so that the stream ends if the
// oneshot is ever fired.
let s = stream::unfold((rx, s.boxed()), |(rx, mut s)| async move {
let next = match future::select(rx, s.next()).await {
Either::Left(_) => None,
Either::Right((r, rx)) => r.map(|r| (r, rx)),
};
next.map(|(r, rx)| (r, (rx, s)))
});
// Once the stream ends, send the Complete message.
let s = s.chain(
Reaction::ServerMessage(ServerMessage::Complete { id })
.into_stream(),
);
s.boxed()
}
}
}
ClientMessage::Stop { id } => {
stoppers.remove(&id);
stream::empty().boxed()
}
_ => stream::empty().boxed(),
};
(
Self::Active {
config,
stoppers,
schema,
},
reactions,
)
}
Self::Terminated => (self, stream::empty().boxed()),
}
}
async fn start(id: String, params: ExecutionParams<S>) -> BoxStream<'static, Reaction<S>> {
// TODO: This could be made more efficient if `juniper` exposed
// functionality to allow us to parse and validate the query,
// determine whether it's a subscription, and then execute it.
// For now, the query gets parsed and validated twice.
let params = Arc::new(params);
// Try to execute this as a query or mutation.
match juniper::execute(
&params.start_payload.query,
params.start_payload.operation_name.as_deref(),
params.schema.root_node(),
&params.start_payload.variables,
&params.config.context,
)
.await
{
Ok((data, errors)) => {
return Reaction::ServerMessage(ServerMessage::Data {
id: id.clone(),
payload: DataPayload { data, errors },
})
.into_stream();
}
Err(GraphQLError::IsSubscription) => {}
Err(e) => {
return Reaction::ServerMessage(ServerMessage::Error {
id: id.clone(),
payload: ErrorPayload::new(Box::new(params.clone()), e),
})
.into_stream();
}
}
// Try to execute as a subscription.
SubscriptionStart::new(id, params.clone()).boxed()
}
}
struct InterruptableStream<S> {
stream: S,
rx: oneshot::Receiver<()>,
}
impl<S: Stream + Unpin> Stream for InterruptableStream<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.rx).poll(cx) {
Poll::Ready(_) => return Poll::Ready(None),
Poll::Pending => {}
}
Pin::new(&mut self.stream).poll_next(cx)
}
}
/// SubscriptionStartState is the state for a subscription operation.
enum SubscriptionStartState<S: Schema> {
/// Init is the start before being polled for the first time.
Init { id: String },
/// ResolvingIntoStream is the state after being polled for the first time. In this state,
/// we're parsing, validating, and getting the actual event stream.
ResolvingIntoStream {
id: String,
future: BoxFuture<
'static,
Result<juniper_subscriptions::Connection<'static, S::ScalarValue>, GraphQLError>,
>,
},
/// Streaming is the state after we've successfully obtained the event stream for the
/// subscription. In this state, we're just forwarding events back to the client.
Streaming {
id: String,
stream: juniper_subscriptions::Connection<'static, S::ScalarValue>,
},
/// Terminated is the state once we're all done.
Terminated,
}
/// SubscriptionStart is the stream for a subscription operation.
struct SubscriptionStart<S: Schema> {
params: Arc<ExecutionParams<S>>,
state: SubscriptionStartState<S>,
_marker: PhantomPinned,
}
impl<S: Schema> SubscriptionStart<S> {
fn new(id: String, params: Arc<ExecutionParams<S>>) -> Pin<Box<Self>> {
Box::pin(Self {
params,
state: SubscriptionStartState::Init { id },
_marker: PhantomPinned,
})
}
}
impl<S: Schema> Stream for SubscriptionStart<S> {
type Item = Reaction<S>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let (params, state) = unsafe {
// XXX: The execution parameters are referenced by state and must not be modified.
// Modifying state is fine though.
let inner = self.get_unchecked_mut();
(&inner.params, &mut inner.state)
};
loop {
match state {
SubscriptionStartState::Init { id } => {
// XXX: resolve_into_stream returns a Future that references the execution
// parameters, and the returned stream also references them. We can guarantee
// that everything has the same lifetime in this self-referential struct.
let params = Arc::as_ptr(params);
*state = SubscriptionStartState::ResolvingIntoStream {
id: id.clone(),
future: unsafe {
juniper::resolve_into_stream(
&(*params).start_payload.query,
(*params).start_payload.operation_name.as_deref(),
(*params).schema.root_node(),
&(*params).start_payload.variables,
&(*params).config.context,
)
}
.map_ok(|(stream, errors)| {
juniper_subscriptions::Connection::from_stream(stream, errors)
})
.boxed(),
};
}
SubscriptionStartState::ResolvingIntoStream {
ref id,
ref mut future,
} => match future.as_mut().poll(cx) {
Poll::Ready(r) => match r {
Ok(stream) => {
*state = SubscriptionStartState::Streaming {
id: id.clone(),
stream,
}
}
Err(e) => {
return Poll::Ready(Some(Reaction::ServerMessage(
ServerMessage::Error {
id: id.clone(),
payload: ErrorPayload::new(Box::new(params.clone()), e),
},
)));
}
},
Poll::Pending => return Poll::Pending,
},
SubscriptionStartState::Streaming {
ref id,
ref mut stream,
} => match Pin::new(stream).poll_next(cx) {
Poll::Ready(Some(output)) => {
return Poll::Ready(Some(Reaction::ServerMessage(ServerMessage::Data {
id: id.clone(),
payload: DataPayload {
data: output.data,
errors: output.errors,
},
})));
}
Poll::Ready(None) => {
*state = SubscriptionStartState::Terminated;
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
},
SubscriptionStartState::Terminated => return Poll::Ready(None),
}
}
}
}
enum ConnectionSinkState<S: Schema, I: Init<S::ScalarValue, S::Context>> {
Ready {
state: ConnectionState<S, I>,
},
HandlingMessage {
#[allow(clippy::type_complexity)]
result: BoxFuture<'static, (ConnectionState<S, I>, BoxStream<'static, Reaction<S>>)>,
},
Closed,
}
/// Implements the graphql-ws protocol. This is a sink for `TryInto<ClientMessage>` and a stream of
/// `ServerMessage`.
pub struct Connection<S: Schema, I: Init<S::ScalarValue, S::Context>> {
reactions: SelectAll<BoxStream<'static, Reaction<S>>>,
stream_waker: Option<Waker>,
sink_state: ConnectionSinkState<S, I>,
}
impl<S, I> Connection<S, I>
where
S: Schema,
I: Init<S::ScalarValue, S::Context>,
{
/// Creates a new connection, which is a sink for `TryInto<ClientMessage>` and a stream of `ServerMessage`.
///
/// The `schema` argument should typically be an `Arc<RootNode<...>>`.
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a `ConnectionConfig` if the context and configuration are already
/// known, or it can be a closure that gets executed asynchronously when the client sends the
/// ConnectionInit message. Using a closure allows you to perform authentication based on the
/// parameters provided by the client.
pub fn new(schema: S, init: I) -> Self {
Self {
reactions: SelectAll::new(),
stream_waker: None,
sink_state: ConnectionSinkState::Ready {
state: ConnectionState::PreInit { init, schema },
},
}
}
}
impl<S, I, T> Sink<T> for Connection<S, I>
where
T: TryInto<ClientMessage<S::ScalarValue>>,
T::Error: Error,
S: Schema,
I: Init<S::ScalarValue, S::Context> + Send,
{
type Error = Infallible;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
match &mut self.sink_state {
ConnectionSinkState::Ready { .. } => Poll::Ready(Ok(())),
ConnectionSinkState::HandlingMessage { ref mut result } => {
match Pin::new(result).poll(cx) {
Poll::Ready((state, reactions)) => {
self.reactions.push(reactions);
self.sink_state = ConnectionSinkState::Ready { state };
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
ConnectionSinkState::Closed => panic!("poll_ready called after close"),
}
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let s = self.get_mut();
let state = &mut s.sink_state;
*state = match std::mem::replace(state, ConnectionSinkState::Closed) {
ConnectionSinkState::Ready { state } => {
match item.try_into() {
Ok(msg) => ConnectionSinkState::HandlingMessage {
result: state.handle_message(msg).boxed(),
},
Err(e) => {
// If we weren't able to parse the message, send back an error.
s.reactions.push(
Reaction::ServerMessage(ServerMessage::ConnectionError {
payload: ConnectionErrorPayload {
message: e.to_string(),
},
})
.into_stream(),
);
ConnectionSinkState::Ready { state }
}
}
}
_ => panic!("start_send called when not ready"),
};
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
<Self as Sink<T>>::poll_ready(self, cx)
}
fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.sink_state = ConnectionSinkState::Closed;
if let Some(waker) = self.stream_waker.take() {
// Wake up the stream so it can close too.
waker.wake();
}
Poll::Ready(Ok(()))
}
}
impl<S, I> Stream for Connection<S, I>
where
S: Schema,
I: Init<S::ScalarValue, S::Context>,
{
type Item = ServerMessage<S::ScalarValue>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.stream_waker = Some(cx.waker().clone());
if let ConnectionSinkState::Closed = self.sink_state {
return Poll::Ready(None);
}
// Poll the reactions for new outgoing messages.
if !self.reactions.is_empty() {
match Pin::new(&mut self.reactions).poll_next(cx) {
Poll::Ready(Some(reaction)) => match reaction {
Reaction::ServerMessage(msg) => return Poll::Ready(Some(msg)),
Reaction::EndStream => return Poll::Ready(None),
},
Poll::Ready(None) => {
// In rare cases, the reaction stream may terminate. For example, this will
// happen if the first message we receive does not require any reaction. Just
// recreate it in that case.
self.reactions = SelectAll::new();
}
_ => (),
}
}
Poll::Pending
}
}
#[cfg(test)]
mod test {
use std::{convert::Infallible, io};
use juniper::{
futures::sink::SinkExt,
graphql_input_value, graphql_object, graphql_subscription, graphql_value, graphql_vars,
parser::{ParseError, Spanning},
DefaultScalarValue, EmptyMutation, FieldError, FieldResult, RootNode,
};
use super::*;
struct Context(i32);
impl juniper::Context for Context {}
struct Query;
#[graphql_object(context = Context)]
impl Query {
/// context just resolves to the current context.
async fn context(context: &Context) -> i32 {
context.0
}
}
struct Subscription;
#[graphql_subscription(context = Context)]
impl Subscription {
/// never never emits anything.
async fn never(_context: &Context) -> BoxStream<'static, FieldResult<i32>> {
tokio::time::sleep(Duration::from_secs(10000))
.map(|_| unreachable!())
.into_stream()
.boxed()
}
/// context emits the current context once, then never emits anything else.
async fn context(context: &Context) -> BoxStream<'static, FieldResult<i32>> {
stream::once(future::ready(Ok(context.0)))
.chain(
tokio::time::sleep(Duration::from_secs(10000))
.map(|_| unreachable!())
.into_stream(),
)
.boxed()
}
/// error emits an error once, then never emits anything else.
async fn error(_context: &Context) -> BoxStream<'static, FieldResult<i32>> {
stream::once(future::ready(Err(FieldError::new(
"field error",
graphql_value!(null),
))))
.chain(
tokio::time::sleep(Duration::from_secs(10000))
.map(|_| unreachable!())
.into_stream(),
)
.boxed()
}
}
type ClientMessage = super::ClientMessage<DefaultScalarValue>;
type ServerMessage = super::ServerMessage<DefaultScalarValue>;
fn new_test_schema() -> Arc<RootNode<'static, Query, EmptyMutation<Context>, Subscription>> {
Arc::new(RootNode::new(Query, EmptyMutation::new(), Subscription))
}
#[tokio::test]
async fn test_query() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "{context}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
assert_eq!(
ServerMessage::Data {
id: "foo".into(),
payload: DataPayload {
data: graphql_value!({"context": 1}),
errors: vec![],
},
},
conn.next().await.unwrap()
);
assert_eq!(
ServerMessage::Complete { id: "foo".into() },
conn.next().await.unwrap()
);
}
#[tokio::test]
async fn test_subscriptions() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "subscription Foo {context}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
assert_eq!(
ServerMessage::Data {
id: "foo".into(),
payload: DataPayload {
data: graphql_value!({"context": 1}),
errors: vec![],
},
},
conn.next().await.unwrap()
);
conn.send(ClientMessage::Start {
id: "bar".into(),
payload: StartPayload {
query: "subscription Bar {context}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
assert_eq!(
ServerMessage::Data {
id: "bar".into(),
payload: DataPayload {
data: graphql_value!({"context": 1}),
errors: vec![],
},
},
conn.next().await.unwrap()
);
conn.send(ClientMessage::Stop { id: "foo".into() })
.await
.unwrap();
assert_eq!(
ServerMessage::Complete { id: "foo".into() },
conn.next().await.unwrap()
);
}
#[tokio::test]
async fn test_init_params_ok() {
let mut conn = Connection::new(new_test_schema(), |params: Variables| async move {
assert_eq!(params.get("foo"), Some(&graphql_input_value!("bar")));
Ok(ConnectionConfig::new(Context(1))) as Result<_, Infallible>
});
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {"foo": "bar"},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
}
#[tokio::test]
async fn test_init_params_error() {
let mut conn = Connection::new(new_test_schema(), |params: Variables| async move {
assert_eq!(params.get("foo"), Some(&graphql_input_value!("bar")));
Err(io::Error::new(io::ErrorKind::Other, "init error"))
});
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {"foo": "bar"},
})
.await
.unwrap();
assert_eq!(
ServerMessage::ConnectionError {
payload: ConnectionErrorPayload {
message: "init error".into(),
},
},
conn.next().await.unwrap()
);
}
#[tokio::test]
async fn test_max_in_flight_operations() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1))
.with_keep_alive_interval(Duration::from_secs(0))
.with_max_in_flight_operations(1),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "subscription Foo {never}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
conn.send(ClientMessage::Start {
id: "bar".into(),
payload: StartPayload {
query: "subscription Bar {never}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
match conn.next().await.unwrap() {
ServerMessage::Error { id, .. } => {
assert_eq!(id, "bar");
}
msg => panic!("expected error, got: {msg:?}"),
}
}
#[tokio::test]
async fn test_parse_error() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "asd".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
match conn.next().await.unwrap() {
ServerMessage::Error { id, payload } => {
assert_eq!(id, "foo");
match payload.graphql_error() {
GraphQLError::ParseError(Spanning {
item: ParseError::UnexpectedToken(token),
..
}) => assert_eq!(token, "asd"),
p => panic!("expected graphql parse error, got: {p:?}"),
}
}
msg => panic!("expected error, got: {msg:?}"),
}
}
#[tokio::test]
async fn test_keep_alives() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_millis(20)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
for _ in 0..10 {
assert_eq!(
ServerMessage::ConnectionKeepAlive,
conn.next().await.unwrap()
);
}
}
#[tokio::test]
async fn test_slow_init() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
// If we send the start message before the init is handled, we should still get results.
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "{context}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
assert_eq!(
ServerMessage::Data {
id: "foo".into(),
payload: DataPayload {
data: graphql_value!({"context": 1}),
errors: vec![],
},
},
conn.next().await.unwrap()
);
}
#[tokio::test]
async fn test_subscription_field_error() {
let mut conn = Connection::new(
new_test_schema(),
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
);
conn.send(ClientMessage::ConnectionInit {
payload: graphql_vars! {},
})
.await
.unwrap();
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
conn.send(ClientMessage::Start {
id: "foo".into(),
payload: StartPayload {
query: "subscription Foo {error}".into(),
variables: graphql_vars! {},
operation_name: None,
},
})
.await
.unwrap();
match conn.next().await.unwrap() {
ServerMessage::Data {
id,
payload: DataPayload { data, errors },
} => {
assert_eq!(id, "foo");
assert_eq!(data, graphql_value!({ "error": null }));
assert_eq!(errors.len(), 1);
}
msg => panic!("expected data, got: {msg:?}"),
}
}
}

View file

@ -1,6 +1,7 @@
use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue};
use std::sync::Arc;
use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue};
/// Schema defines the requirements for schemas that can be used for operations. Typically this is
/// just an `Arc<RootNode<...>>` and you should not have to implement it yourself.
pub trait Schema: Unpin + Clone + Send + Sync + 'static {

View file

@ -1,53 +1,37 @@
//! Common definitions regarding server messages.
use std::{any::Any, fmt, marker::PhantomPinned};
use juniper::{ExecutionError, GraphQLError, Value};
use juniper::GraphQLError;
use serde::{Serialize, Serializer};
/// The payload for errors that are not associated with a GraphQL operation.
#[derive(Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ConnectionErrorPayload {
/// The error message.
pub message: String,
}
/// Sent after execution of an operation. For queries and mutations, this is sent to the client
/// once. For subscriptions, this is sent for every event in the event stream.
#[derive(Debug, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct DataPayload<S> {
/// The result data.
pub data: Value<S>,
/// The errors that have occurred during execution. Note that parse and validation errors are
/// not included here. They are sent via Error messages.
#[serde(skip_serializing_if = "Vec::is_empty")]
pub errors: Vec<ExecutionError<S>>,
}
/// A payload for errors that can happen before execution. Errors that happen during execution are
/// instead sent to the client via `DataPayload`. `ErrorPayload` is a wrapper for an owned
/// `GraphQLError`.
/// Payload for errors that can happen before execution.
///
/// Errors that happen during execution are instead sent to the client via
/// [`graphql_ws::DataPayload`] or [`graphql_transport_ws::NextPayload`]. [`ErrorPayload`] is a
/// wrapper for an owned [`GraphQLError`].
///
/// [`graphql_transport_ws::NextPayload`]: crate::graphql_transport_ws::NextPayload
/// [`graphql_ws::DataPayload`]: crate::graphql_ws::DataPayload
// XXX: Think carefully before deriving traits. This is self-referential (error references
// _execution_params).
pub struct ErrorPayload {
_execution_params: Option<Box<dyn Any + Send>>,
error: GraphQLError,
_marker: PhantomPinned,
_pinned: PhantomPinned,
}
impl ErrorPayload {
/// Creates a new [`ErrorPayload`] out of the provide `execution_params` and
/// [`GraphQLError`].
/// Creates a new [`ErrorPayload`] out of the provide `execution_params` and [`GraphQLError`].
pub(crate) fn new(execution_params: Box<dyn Any + Send>, error: GraphQLError) -> Self {
Self {
_execution_params: Some(execution_params),
error,
_marker: PhantomPinned,
_pinned: PhantomPinned,
}
}
/// Returns the contained GraphQLError.
/// Returns the contained [`GraphQLError`].
pub fn graphql_error(&self) -> &GraphQLError {
&self.error
}
@ -79,108 +63,7 @@ impl From<GraphQLError> for ErrorPayload {
Self {
_execution_params: None,
error,
_marker: PhantomPinned,
_pinned: PhantomPinned,
}
}
}
/// ServerMessage defines the message types that servers can send.
#[derive(Debug, Serialize, PartialEq)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub enum ServerMessage<S> {
/// ConnectionError is used for errors that are not associated with a GraphQL operation. For
/// example, this will be used when:
///
/// * The server is unable to parse a client's message.
/// * The client's initialization parameters are rejected.
ConnectionError {
/// The error that occurred.
payload: ConnectionErrorPayload,
},
/// ConnectionAck is sent in response to a client's ConnectionInit message if the server accepted a
/// connection.
ConnectionAck,
/// Data contains the result of a query, mutation, or subscription event.
Data {
/// The id of the operation that the data is for.
id: String,
/// The data and errors that occurred during execution.
payload: DataPayload<S>,
},
/// Error contains an error that occurs before execution, such as validation errors.
Error {
/// The id of the operation that triggered this error.
id: String,
/// The error(s).
payload: ErrorPayload,
},
/// Complete indicates that no more data will be sent for the given operation.
Complete {
/// The id of the operation that has completed.
id: String,
},
/// ConnectionKeepAlive is sent periodically after accepting a connection.
#[serde(rename = "ka")]
ConnectionKeepAlive,
}
#[cfg(test)]
mod test {
use juniper::{graphql_value, DefaultScalarValue};
use super::*;
#[test]
fn test_serialization() {
type ServerMessage = super::ServerMessage<DefaultScalarValue>;
assert_eq!(
serde_json::to_string(&ServerMessage::ConnectionError {
payload: ConnectionErrorPayload {
message: "foo".into(),
},
})
.unwrap(),
r#"{"type":"connection_error","payload":{"message":"foo"}}"#,
);
assert_eq!(
serde_json::to_string(&ServerMessage::ConnectionAck).unwrap(),
r#"{"type":"connection_ack"}"#,
);
assert_eq!(
serde_json::to_string(&ServerMessage::Data {
id: "foo".into(),
payload: DataPayload {
data: graphql_value!(null),
errors: vec![],
},
})
.unwrap(),
r#"{"type":"data","id":"foo","payload":{"data":null}}"#,
);
assert_eq!(
serde_json::to_string(&ServerMessage::Error {
id: "foo".into(),
payload: GraphQLError::UnknownOperationName.into(),
})
.unwrap(),
r#"{"type":"error","id":"foo","payload":[{"message":"Unknown operation"}]}"#,
);
assert_eq!(
serde_json::to_string(&ServerMessage::Complete { id: "foo".into() }).unwrap(),
r#"{"type":"complete","id":"foo"}"#,
);
assert_eq!(
serde_json::to_string(&ServerMessage::ConnectionKeepAlive).unwrap(),
r#"{"type":"ka"}"#,
);
}
}

View file

@ -1,5 +1,6 @@
use serde::{Deserialize, Deserializer};
/// Deserializes `null`able value by placing the [`Default`] value instead of `null`.
pub(crate) fn default_for_null<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,

View file

@ -1,9 +0,0 @@
use serde::{Deserialize, Deserializer};
pub(crate) fn default_for_null<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de> + Default,
{
Ok(Option::<T>::deserialize(deserializer)?.unwrap_or_default())
}

View file

@ -10,12 +10,6 @@ exactly = 1
search = "juniper_subscriptions = \\{ version = \"[^\"]+\""
replace = "juniper_subscriptions = { version = \"{{version}}\""
[[pre-release-replacements]]
file = "../juniper_graphql_transport_ws/Cargo.toml"
exactly = 1
search = "juniper_subscriptions = \\{ version = \"[^\"]+\""
replace = "juniper_subscriptions = { version = \"{{version}}\""
[[pre-release-replacements]]
file = "CHANGELOG.md"
max = 1

View file

@ -20,7 +20,6 @@ rustdoc-args = ["--cfg", "docsrs"]
[features]
subscriptions = [
"dep:juniper_graphql_transport_ws",
"dep:juniper_graphql_ws",
"dep:log",
"warp/websocket",
@ -30,8 +29,7 @@ subscriptions = [
anyhow = "1.0.47"
futures = "0.3.22"
juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false }
juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", optional = true }
juniper_graphql_transport_ws = { version = "0.4.0-dev", path = "../juniper_graphql_transport_ws", optional = true }
juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", features = ["graphql-transport-ws", "graphql-ws"], optional = true }
log = { version = "0.4", optional = true }
serde = { version = "1.0.122", features = ["derive"] }
serde_json = "1.0.18"

View file

@ -7,7 +7,7 @@ use juniper::{
graphql_object, graphql_subscription, graphql_value, EmptyMutation, FieldError, GraphQLEnum,
RootNode,
};
use juniper_graphql_transport_ws::ConnectionConfig;
use juniper_graphql_ws::ConnectionConfig;
use warp::{http::Response, Filter};
#[derive(Clone)]

View file

@ -349,13 +349,12 @@ pub mod subscriptions {
},
GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue,
};
use juniper_graphql_transport_ws;
use juniper_graphql_ws;
use juniper_graphql_ws::{graphql_transport_ws, graphql_ws};
use warp::{filters::BoxedFilter, reply::Reply, Filter as _};
struct Message(warp::ws::Message);
impl<S: ScalarValue> TryFrom<Message> for juniper_graphql_ws::ClientMessage<S> {
impl<S: ScalarValue> TryFrom<Message> for graphql_ws::ClientMessage<S> {
type Error = serde_json::Error;
fn try_from(msg: Message) -> serde_json::Result<Self> {
@ -367,7 +366,7 @@ pub mod subscriptions {
}
}
impl<S: ScalarValue> TryFrom<Message> for juniper_graphql_transport_ws::Input<S> {
impl<S: ScalarValue> TryFrom<Message> for graphql_transport_ws::Input<S> {
type Error = serde_json::Error;
fn try_from(msg: Message) -> serde_json::Result<Self> {
@ -423,11 +422,10 @@ pub mod subscriptions {
/// The `schema` argument is your [`juniper`] schema.
///
/// The `init` argument is used to provide the custom [`juniper::Context`] and additional
/// configuration for connections. This can be a
/// [`juniper_graphql_transport_ws::ConnectionConfig`] if the context and configuration are
/// already known, or it can be a closure that gets executed asynchronously whenever a client
/// sends the subscription initialization message. Using a closure allows to perform an
/// authentication based on the parameters provided by a client.
/// configuration for connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the
/// context and configuration are already known, or it can be a closure that gets executed
/// asynchronously whenever a client sends the subscription initialization message. Using a
/// closure allows to perform an authentication based on the parameters provided by a client.
///
/// # Example
///
@ -436,7 +434,7 @@ pub mod subscriptions {
/// #
/// # use futures::Stream;
/// # use juniper::{graphql_object, graphql_subscription, EmptyMutation, RootNode};
/// # use juniper_graphql_transport_ws::ConnectionConfig;
/// # use juniper_graphql_ws::ConnectionConfig;
/// # use juniper_warp::make_graphql_filter;
/// # use warp::Filter as _;
/// #
@ -532,7 +530,7 @@ pub mod subscriptions {
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: juniper_graphql_transport_ws::Init<S, CtxT> + Clone + Send + Sync,
I: juniper_graphql_ws::Init<S, CtxT> + Clone + Send + Sync,
{
let schema = schema.into();
@ -598,8 +596,7 @@ pub mod subscriptions {
{
let (ws_tx, ws_rx) = websocket.split();
let (s_tx, s_rx) =
juniper_graphql_ws::Connection::new(juniper_graphql_ws::ArcSchema(root_node), init)
.split();
graphql_ws::Connection::new(juniper_graphql_ws::ArcSchema(root_node), init).split();
let ws_rx = ws_rx.map(|r| r.map(Message));
let s_rx = s_rx.map(|msg| {
@ -622,10 +619,10 @@ pub mod subscriptions {
/// Serves the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new].
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a [`juniper_graphql_transport_ws::ConnectionConfig`] if the context
/// and configuration are already known, or it can be a closure that gets executed
/// asynchronously when the client sends the `ConnectionInit` message. Using a closure allows to
/// perform an authentication based on the parameters provided by a client.
/// connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the context and
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the `ConnectionInit` message. Using a closure allows to perform an
/// authentication based on the parameters provided by a client.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
pub async fn serve_graphql_transport_ws<Query, Mutation, Subscription, CtxT, S, I>(
@ -642,21 +639,19 @@ pub mod subscriptions {
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: juniper_graphql_transport_ws::Init<S, CtxT> + Send,
I: juniper_graphql_ws::Init<S, CtxT> + Send,
{
let (ws_tx, ws_rx) = websocket.split();
let (s_tx, s_rx) = juniper_graphql_transport_ws::Connection::new(
juniper_graphql_transport_ws::ArcSchema(root_node),
init,
)
let (s_tx, s_rx) =
graphql_transport_ws::Connection::new(juniper_graphql_ws::ArcSchema(root_node), init)
.split();
let ws_rx = ws_rx.map(|r| r.map(Message));
let s_rx = s_rx.map(|output| match output {
juniper_graphql_transport_ws::Output::Message(msg) => serde_json::to_string(&msg)
graphql_transport_ws::Output::Message(msg) => serde_json::to_string(&msg)
.map(warp::ws::Message::text)
.map_err(Error::Serde),
juniper_graphql_transport_ws::Output::Close { code, message } => {
graphql_transport_ws::Output::Close { code, message } => {
Ok(warp::ws::Message::close_with(code, message))
}
});