add juniper_graphql_transport_ws crate for new subscription protocol (#1158)

Co-authored-by: Christian Legnitto <LegNeato@users.noreply.github.com>
This commit is contained in:
Chris 2023-08-24 21:02:32 -04:00 committed by GitHub
parent 1474da53f6
commit 27430bf60c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 1759 additions and 8 deletions

View file

@ -12,6 +12,7 @@ members = [
"juniper_rocket",
"juniper_subscriptions",
"juniper_graphql_ws",
"juniper_graphql_transport_ws",
"juniper_warp",
"juniper_actix",
"tests/codegen",

View file

@ -0,0 +1,16 @@
`juniper_graphql_transport_ws` changelog
==============================
All user visible changes to `juniper_graphql_transport_ws` crate will be documented in this file. This project uses [Semantic Versioning 2.0.0].
## master
[`juniper` crate]: https://docs.rs/juniper
[`juniper_subscriptions` crate]: https://docs.rs/juniper_subscriptions
[Semantic Versioning 2.0.0]: https://semver.org

View file

@ -0,0 +1,24 @@
[package]
name = "juniper_graphql_transport_ws"
version = "0.4.0-dev"
edition = "2021"
rust-version = "1.65"
description = "GraphQL over WebSocket Protocol implementation for `juniper` crate."
license = "BSD-2-Clause"
authors = ["Christopher Brown <ccbrown112@gmail.com>"]
documentation = "https://docs.rs/juniper_graphql_transport_ws"
homepage = "https://github.com/graphql-rust/juniper/tree/master/juniper_graphql_transport_ws"
repository = "https://github.com/graphql-rust/juniper"
readme = "README.md"
categories = ["asynchronous", "web-programming", "web-programming::http-server"]
keywords = ["apollo", "graphql", "graphql-ws", "subscription", "websocket"]
exclude = ["/release.toml"]
[dependencies]
juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false }
juniper_subscriptions = { version = "0.17.0-dev", path = "../juniper_subscriptions" }
serde = { version = "1.0.122", features = ["derive"], default-features = false }
tokio = { version = "1.0", features = ["macros", "rt", "time"], default-features = false }
[dev-dependencies]
serde_json = "1.0.18"

View file

@ -0,0 +1,25 @@
BSD 2-Clause License
Copyright (c) 2018-2022, Christopher Brown
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -0,0 +1,24 @@
`juniper_graphql_transport_ws` crate
==========================
[![Crates.io](https://img.shields.io/crates/v/juniper_graphql_transport_ws.svg?maxAge=2592000)](https://crates.io/crates/juniper_graphql_transport_ws)
[![Documentation](https://docs.rs/juniper_graphql_transport_ws/badge.svg)](https://docs.rs/juniper_graphql_transport_ws)
[![CI](https://github.com/graphql-rust/juniper/workflows/CI/badge.svg?branch=master "CI")](https://github.com/graphql-rust/juniper/actions?query=workflow%3ACI+branch%3Amaster)
[![Rust 1.65+](https://img.shields.io/badge/rustc-1.65+-lightgray.svg "Rust 1.65+")](https://blog.rust-lang.org/2022/11/03/Rust-1.65.0.html)
- [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_transport_ws/CHANGELOG.md)
This crate contains an implementation of the [graphql-transport-ws WebSocket subprotocol], as used by [Apollo].
## License
This project is licensed under [BSD 2-Clause License](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_transport_ws/LICENSE).
[Apollo]: https://www.apollographql.com
[graphql-transport-ws WebSocket subprotocol]: https://github.com/enisdenjo/graphql-ws/blob/fbb763a662802a6a2584b0cbeb9cf1bde38158e0/PROTOCOL.md

View file

@ -0,0 +1,24 @@
[[pre-release-replacements]]
file = "../juniper_actix/Cargo.toml"
exactly = 1
search = "juniper_graphql_transport_ws = \\{ version = \"[^\"]+\""
replace = "juniper_graphql_transport_ws = { version = \"{{version}}\""
[[pre-release-replacements]]
file = "../juniper_warp/Cargo.toml"
exactly = 1
search = "juniper_graphql_transport_ws = \\{ version = \"[^\"]+\""
replace = "juniper_graphql_transport_ws = { version = \"{{version}}\""
[[pre-release-replacements]]
file = "CHANGELOG.md"
max = 1
min = 0
search = "## master"
replace = "## [{{version}}] · {{date}}\n[{{version}}]: /../../tree/{{crate_name}}-v{{version}}/{{crate_name}}"
[[pre-release-replacements]]
file = "README.md"
exactly = 2
search = "graphql-rust/juniper/blob/[^/]+/"
replace = "graphql-rust/juniper/blob/{{crate_name}}-v{{version}}/"

View file

@ -0,0 +1,155 @@
use juniper::Variables;
use serde::Deserialize;
use crate::utils::default_for_null;
/// The payload for a client's "start" message. This triggers execution of a query, mutation, or
/// subscription.
#[derive(Debug, Deserialize, PartialEq)]
#[serde(bound(deserialize = "S: Deserialize<'de>"))]
#[serde(rename_all = "camelCase")]
pub struct SubscribePayload<S> {
/// The document body.
pub query: String,
/// The optional variables.
#[serde(default, deserialize_with = "default_for_null")]
pub variables: Variables<S>,
/// The optional operation name (required if the document contains multiple operations).
pub operation_name: Option<String>,
/// The optional extension data.
#[serde(default, deserialize_with = "default_for_null")]
pub extensions: Variables<S>,
}
/// ClientMessage defines the message types that clients can send.
#[derive(Debug, Deserialize, PartialEq)]
#[serde(bound(deserialize = "S: Deserialize<'de>"))]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub enum ClientMessage<S> {
/// ConnectionInit is sent by the client upon connecting.
ConnectionInit {
/// Optional parameters of any type sent from the client. These are often used for
/// authentication.
#[serde(default, deserialize_with = "default_for_null")]
payload: Variables<S>,
},
/// Ping is used for detecting failed connections, displaying latency metrics or other types of network probing.
Ping {
/// Optional parameters of any type used to transfer additional details about the ping.
#[serde(default, deserialize_with = "default_for_null")]
payload: Variables<S>,
},
/// The response to the `Ping` message.
Pong {
/// Optional parameters of any type used to transfer additional details about the pong.
#[serde(default, deserialize_with = "default_for_null")]
payload: Variables<S>,
},
/// Requests an operation specified in the message payload.
Subscribe {
/// The id of the operation. This can be anything, but must be unique. If there are other
/// in-flight operations with the same id, the message will cause an error.
id: String,
/// The query, variables, and operation name.
payload: SubscribePayload<S>,
},
/// Indicates that the client has stopped listening and wants to complete the subscription.
Complete {
/// The id of the operation to stop.
id: String,
},
}
#[cfg(test)]
mod test {
use juniper::{graphql_vars, DefaultScalarValue};
use super::*;
#[test]
fn test_deserialization() {
type ClientMessage = super::ClientMessage<DefaultScalarValue>;
assert_eq!(
ClientMessage::ConnectionInit {
payload: graphql_vars! {"foo": "bar"},
},
serde_json::from_str(r##"{"type": "connection_init", "payload": {"foo": "bar"}}"##)
.unwrap(),
);
assert_eq!(
ClientMessage::ConnectionInit {
payload: graphql_vars! {},
},
serde_json::from_str(r##"{"type": "connection_init"}"##).unwrap(),
);
assert_eq!(
ClientMessage::Subscribe {
id: "foo".into(),
payload: SubscribePayload {
query: "query MyQuery { __typename }".into(),
variables: graphql_vars! {"foo": "bar"},
operation_name: Some("MyQuery".into()),
extensions: Default::default(),
},
},
serde_json::from_str(
r##"{"type": "subscribe", "id": "foo", "payload": {
"query": "query MyQuery { __typename }",
"variables": {
"foo": "bar"
},
"operationName": "MyQuery"
}}"##
)
.unwrap(),
);
assert_eq!(
ClientMessage::Subscribe {
id: "foo".into(),
payload: SubscribePayload {
query: "query MyQuery { __typename }".into(),
variables: graphql_vars! {},
operation_name: None,
extensions: Default::default(),
},
},
serde_json::from_str(
r##"{"type": "subscribe", "id": "foo", "payload": {
"query": "query MyQuery { __typename }"
}}"##
)
.unwrap(),
);
assert_eq!(
ClientMessage::Complete { id: "foo".into() },
serde_json::from_str(r##"{"type": "complete", "id": "foo"}"##).unwrap(),
);
}
#[test]
fn test_deserialization_of_null() -> serde_json::Result<()> {
let payload = r#"{"query":"query","variables":null}"#;
let payload: SubscribePayload<DefaultScalarValue> = serde_json::from_str(payload)?;
let expected = SubscribePayload {
query: "query".into(),
variables: graphql_vars! {},
operation_name: None,
extensions: Default::default(),
};
assert_eq!(expected, payload);
Ok(())
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,131 @@
use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue};
use std::sync::Arc;
/// Schema defines the requirements for schemas that can be used for operations. Typically this is
/// just an `Arc<RootNode<...>>` and you should not have to implement it yourself.
pub trait Schema: Unpin + Clone + Send + Sync + 'static {
/// The context type.
type Context: Unpin + Send + Sync;
/// The scalar value type.
type ScalarValue: ScalarValue + Send + Sync;
/// The query type info.
type QueryTypeInfo: Send + Sync;
/// The query type.
type Query: GraphQLTypeAsync<Self::ScalarValue, Context = Self::Context, TypeInfo = Self::QueryTypeInfo>
+ Send;
/// The mutation type info.
type MutationTypeInfo: Send + Sync;
/// The mutation type.
type Mutation: GraphQLTypeAsync<
Self::ScalarValue,
Context = Self::Context,
TypeInfo = Self::MutationTypeInfo,
> + Send;
/// The subscription type info.
type SubscriptionTypeInfo: Send + Sync;
/// The subscription type.
type Subscription: GraphQLSubscriptionType<
Self::ScalarValue,
Context = Self::Context,
TypeInfo = Self::SubscriptionTypeInfo,
> + Send;
/// Returns the root node for the schema.
fn root_node(
&self,
) -> &RootNode<'static, Self::Query, Self::Mutation, Self::Subscription, Self::ScalarValue>;
}
/// This exists as a work-around for this issue: https://github.com/rust-lang/rust/issues/64552
///
/// It can be used in generators where using Arc directly would result in an error.
// TODO: Remove this once that issue is resolved.
#[doc(hidden)]
pub struct ArcSchema<QueryT, MutationT, SubscriptionT, CtxT, S>(
pub Arc<RootNode<'static, QueryT, MutationT, SubscriptionT, S>>,
)
where
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
QueryT::TypeInfo: Send + Sync,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
MutationT::TypeInfo: Send + Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
SubscriptionT::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync,
S: ScalarValue + Send + Sync + 'static;
impl<QueryT, MutationT, SubscriptionT, CtxT, S> Clone
for ArcSchema<QueryT, MutationT, SubscriptionT, CtxT, S>
where
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
QueryT::TypeInfo: Send + Sync,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
MutationT::TypeInfo: Send + Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
SubscriptionT::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync,
S: ScalarValue + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<QueryT, MutationT, SubscriptionT, CtxT, S> Schema
for ArcSchema<QueryT, MutationT, SubscriptionT, CtxT, S>
where
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
QueryT::TypeInfo: Send + Sync,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
MutationT::TypeInfo: Send + Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
SubscriptionT::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
{
type Context = CtxT;
type ScalarValue = S;
type QueryTypeInfo = QueryT::TypeInfo;
type Query = QueryT;
type MutationTypeInfo = MutationT::TypeInfo;
type Mutation = MutationT;
type SubscriptionTypeInfo = SubscriptionT::TypeInfo;
type Subscription = SubscriptionT;
fn root_node(&self) -> &RootNode<'static, QueryT, MutationT, SubscriptionT, S> {
&self.0
}
}
impl<QueryT, MutationT, SubscriptionT, CtxT, S> Schema
for Arc<RootNode<'static, QueryT, MutationT, SubscriptionT, S>>
where
QueryT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
QueryT::TypeInfo: Send + Sync,
MutationT: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
MutationT::TypeInfo: Send + Sync,
SubscriptionT: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
SubscriptionT::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync,
S: ScalarValue + Send + Sync + 'static,
{
type Context = CtxT;
type ScalarValue = S;
type QueryTypeInfo = QueryT::TypeInfo;
type Query = QueryT;
type MutationTypeInfo = MutationT::TypeInfo;
type Mutation = MutationT;
type SubscriptionTypeInfo = SubscriptionT::TypeInfo;
type Subscription = SubscriptionT;
fn root_node(&self) -> &RootNode<'static, QueryT, MutationT, SubscriptionT, S> {
self
}
}

View file

@ -0,0 +1,158 @@
use std::{any::Any, fmt, marker::PhantomPinned};
use juniper::{ExecutionError, GraphQLError, Value};
use serde::{Serialize, Serializer};
/// Sent after execution of an operation. For queries and mutations, this is sent to the client
/// once. For subscriptions, this is sent for every event in the event stream.
#[derive(Debug, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct NextPayload<S> {
/// The result data.
pub data: Value<S>,
/// The errors that have occurred during execution. Note that parse and validation errors are
/// not included here. They are sent via Error messages.
#[serde(skip_serializing_if = "Vec::is_empty")]
pub errors: Vec<ExecutionError<S>>,
}
/// A payload for errors that can happen before execution. Errors that happen during execution are
/// instead sent to the client via `NextPayload`. `ErrorPayload` is a wrapper for an owned
/// `GraphQLError`.
// XXX: Think carefully before deriving traits. This is self-referential (error references
// _execution_params).
pub struct ErrorPayload {
_execution_params: Option<Box<dyn Any + Send>>,
error: GraphQLError,
_marker: PhantomPinned,
}
impl ErrorPayload {
/// Creates a new [`ErrorPayload`] out of the provide `execution_params` and
/// [`GraphQLError`].
pub(crate) fn new(execution_params: Box<dyn Any + Send>, error: GraphQLError) -> Self {
Self {
_execution_params: Some(execution_params),
error,
_marker: PhantomPinned,
}
}
/// Returns the contained GraphQLError.
pub fn graphql_error(&self) -> &GraphQLError {
&self.error
}
}
impl fmt::Debug for ErrorPayload {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.error.fmt(f)
}
}
impl PartialEq for ErrorPayload {
fn eq(&self, other: &Self) -> bool {
self.error.eq(&other.error)
}
}
impl Serialize for ErrorPayload {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.error.serialize(serializer)
}
}
impl From<GraphQLError> for ErrorPayload {
fn from(error: GraphQLError) -> Self {
Self {
_execution_params: None,
error,
_marker: PhantomPinned,
}
}
}
/// ServerMessage defines the message types that servers can send.
#[derive(Debug, Serialize, PartialEq)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub enum ServerMessage<S> {
/// ConnectionAck is sent in response to a client's ConnectionInit message if the server accepted a
/// connection.
ConnectionAck,
/// The response to the `Ping` message.
Pong,
/// Data contains the result of a query, mutation, or subscription event.
Next {
/// The id of the operation that the data is for.
id: String,
/// The data and errors that occurred during execution.
payload: NextPayload<S>,
},
/// Error contains an error that occurs before execution, such as validation errors.
Error {
/// The id of the operation that triggered this error.
id: String,
/// The error(s).
payload: ErrorPayload,
},
/// Complete indicates that no more data will be sent for the given operation.
Complete {
/// The id of the operation that has completed.
id: String,
},
}
#[cfg(test)]
mod test {
use juniper::{graphql_value, DefaultScalarValue};
use super::*;
#[test]
fn test_serialization() {
type ServerMessage = super::ServerMessage<DefaultScalarValue>;
assert_eq!(
serde_json::to_string(&ServerMessage::ConnectionAck).unwrap(),
r##"{"type":"connection_ack"}"##,
);
assert_eq!(
serde_json::to_string(&ServerMessage::Pong).unwrap(),
r##"{"type":"pong"}"##,
);
assert_eq!(
serde_json::to_string(&ServerMessage::Next {
id: "foo".into(),
payload: NextPayload {
data: graphql_value!(null),
errors: vec![],
},
})
.unwrap(),
r##"{"type":"next","id":"foo","payload":{"data":null}}"##,
);
assert_eq!(
serde_json::to_string(&ServerMessage::Error {
id: "foo".into(),
payload: GraphQLError::UnknownOperationName.into(),
})
.unwrap(),
r##"{"type":"error","id":"foo","payload":[{"message":"Unknown operation"}]}"##,
);
assert_eq!(
serde_json::to_string(&ServerMessage::Complete { id: "foo".into() }).unwrap(),
r##"{"type":"complete","id":"foo"}"##,
);
}
}

View file

@ -0,0 +1,9 @@
use serde::{Deserialize, Deserializer};
pub(crate) fn default_for_null<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de> + Default,
{
Ok(Option::<T>::deserialize(deserializer)?.unwrap_or_default())
}

View file

@ -8,7 +8,7 @@
- [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_ws/CHANGELOG.md)
This crate contains an implementation of the [GraphQL over WebSocket Protocol][1], as used by [Apollo].
This crate contains an implementation of the [graphql-ws WebSocket subprotocol], as formerly used by [Apollo]. It has now been deprecated in favor of the protocol implemented by the [`juniper_graphql_transport_ws` crate].
@ -21,5 +21,5 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql
[Apollo]: https://www.apollographql.com
[1]: https://github.com/apollographql/subscriptions-transport-ws/blob/0ce7a1e1eb687fe51214483e4735f50a2f2d5c79/PROTOCOL.md
[graphql-ws WebSocket subprotocol]: https://github.com/apollographql/subscriptions-transport-ws/blob/0ce7a1e1eb687fe51214483e4735f50a2f2d5c79/PROTOCOL.md
[`juniper_graphql_transport_ws` crate]: https://docs.rs/juniper_graphql_transport_ws

View file

@ -10,6 +10,12 @@ exactly = 1
search = "juniper_subscriptions = \\{ version = \"[^\"]+\""
replace = "juniper_subscriptions = { version = \"{{version}}\""
[[pre-release-replacements]]
file = "../juniper_graphql_transport_ws/Cargo.toml"
exactly = 1
search = "juniper_subscriptions = \\{ version = \"[^\"]+\""
replace = "juniper_subscriptions = { version = \"{{version}}\""
[[pre-release-replacements]]
file = "CHANGELOG.md"
max = 1

View file

@ -19,13 +19,14 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[features]
subscriptions = ["dep:juniper_graphql_ws", "warp/websocket"]
subscriptions = ["dep:juniper_graphql_ws", "dep:juniper_graphql_transport_ws", "warp/websocket"]
[dependencies]
anyhow = "1.0.47"
futures = "0.3.22"
juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false }
juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", optional = true }
juniper_graphql_transport_ws = { version = "0.4.0-dev", path = "../juniper_graphql_transport_ws", optional = true }
serde = { version = "1.0.122", features = ["derive"] }
serde_json = "1.0.18"
thiserror = "1.0"

View file

@ -355,11 +355,20 @@ pub mod subscriptions {
},
GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue,
};
use juniper_graphql_ws::{ArcSchema, ClientMessage, Connection, Init};
use juniper_graphql_transport_ws;
use juniper_graphql_ws;
struct Message(warp::ws::Message);
impl<S: ScalarValue> TryFrom<Message> for ClientMessage<S> {
impl<S: ScalarValue> TryFrom<Message> for juniper_graphql_ws::ClientMessage<S> {
type Error = serde_json::Error;
fn try_from(msg: Message) -> serde_json::Result<Self> {
serde_json::from_slice(msg.0.as_bytes())
}
}
impl<S: ScalarValue> TryFrom<Message> for juniper_graphql_transport_ws::ClientMessage<S> {
type Error = serde_json::Error;
fn try_from(msg: Message) -> serde_json::Result<Self> {
@ -408,6 +417,9 @@ pub mod subscriptions {
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the ConnectionInit message. Using a closure allows you to perform
/// authentication based on the parameters provided by the client.
///
/// This protocol has been deprecated in favor of the `graphql-transport-ws` protocol, which is
/// provided by the `serve_graphql_transport_ws` function.
pub async fn serve_graphql_ws<Query, Mutation, Subscription, CtxT, S, I>(
websocket: warp::ws::WebSocket,
root_node: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
@ -422,10 +434,12 @@ pub mod subscriptions {
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
I: juniper_graphql_ws::Init<S, CtxT> + Send,
{
let (ws_tx, ws_rx) = websocket.split();
let (s_tx, s_rx) = Connection::new(ArcSchema(root_node), init).split();
let (s_tx, s_rx) =
juniper_graphql_ws::Connection::new(juniper_graphql_ws::ArcSchema(root_node), init)
.split();
let ws_rx = ws_rx.map(|r| r.map(Message));
let s_rx = s_rx.map(|msg| {
@ -444,6 +458,57 @@ pub mod subscriptions {
Either::Right((r, _)) => r,
}
}
/// Serves the graphql-transport-ws protocol over a WebSocket connection.
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a `juniper_graphql_transport_ws::ConnectionConfig` if the context and
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the ConnectionInit message. Using a closure allows you to perform
/// authentication based on the parameters provided by the client.
pub async fn serve_graphql_transport_ws<Query, Mutation, Subscription, CtxT, S, I>(
websocket: warp::ws::WebSocket,
root_node: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
init: I,
) -> Result<(), Error>
where
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Query::TypeInfo: Send + Sync,
Mutation: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Mutation::TypeInfo: Send + Sync,
Subscription: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: juniper_graphql_transport_ws::Init<S, CtxT> + Send,
{
let (ws_tx, ws_rx) = websocket.split();
let (s_tx, s_rx) = juniper_graphql_transport_ws::Connection::new(
juniper_graphql_transport_ws::ArcSchema(root_node),
init,
)
.split();
let ws_rx = ws_rx.map(|r| r.map(Message));
let s_rx = s_rx.map(|output| match output {
juniper_graphql_transport_ws::Output::Message(msg) => serde_json::to_string(&msg)
.map(warp::ws::Message::text)
.map_err(Error::Serde),
juniper_graphql_transport_ws::Output::Close { code, message } => {
Ok(warp::ws::Message::close_with(code, message))
}
});
match future::select(
ws_rx.forward(s_tx.sink_err_into()),
s_rx.forward(ws_tx.sink_err_into()),
)
.await
{
Either::Left((r, _)) => r.map_err(|e| e.into()),
Either::Right((r, _)) => r,
}
}
}
#[cfg(test)]