From 7e8724751f1264215068d66c91cc384884142caf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jord=C3=A3o=20Rodrigues=20Oliveira=20Rosario?= Date: Fri, 17 Apr 2020 03:16:00 -0300 Subject: [PATCH] Initial implementation of Subscription Docs (#609) Co-authored-by: Christian Legnitto --- docs/book/content/SUMMARY.md | 1 + docs/book/content/advanced/index.md | 1 + docs/book/content/advanced/subscriptions.md | 176 ++++++++++++++++++ .../content/schema/schemas_and_mutations.md | 16 +- docs/book/tests/Cargo.toml | 5 +- examples/basic_subscriptions/.gitignore | 1 + examples/basic_subscriptions/Cargo.toml | 16 ++ examples/basic_subscriptions/src/main.rs | 65 +++++++ 8 files changed, 274 insertions(+), 7 deletions(-) create mode 100644 docs/book/content/advanced/subscriptions.md create mode 100644 examples/basic_subscriptions/.gitignore create mode 100644 examples/basic_subscriptions/Cargo.toml create mode 100644 examples/basic_subscriptions/src/main.rs diff --git a/docs/book/content/SUMMARY.md b/docs/book/content/SUMMARY.md index 947d9e03..49c542f5 100644 --- a/docs/book/content/SUMMARY.md +++ b/docs/book/content/SUMMARY.md @@ -32,6 +32,7 @@ - [Objects and generics](advanced/objects_and_generics.md) - [Multiple operations per request](advanced/multiple_ops_per_request.md) - [Dataloaders](advanced/dataloaders.md) + - [Subscriptions](advanced/subscriptions.md) # - [Context switching] diff --git a/docs/book/content/advanced/index.md b/docs/book/content/advanced/index.md index a9704030..e46978b3 100644 --- a/docs/book/content/advanced/index.md +++ b/docs/book/content/advanced/index.md @@ -7,3 +7,4 @@ The chapters below cover some more advanced scenarios. - [Objects and generics](objects_and_generics.md) - [Multiple operations per request](multiple_ops_per_request.md) - [Dataloaders](dataloaders.md) +- [Subscriptions](subscriptions.md) \ No newline at end of file diff --git a/docs/book/content/advanced/subscriptions.md b/docs/book/content/advanced/subscriptions.md new file mode 100644 index 00000000..d687c666 --- /dev/null +++ b/docs/book/content/advanced/subscriptions.md @@ -0,0 +1,176 @@ +# Subscriptions +### How to achieve realtime data with GraphQL subscriptions + +GraphQL subscriptions are a way to push data from the server to clients requesting real-time messages +from the server. Subscriptions are similar to queries in that they specify a set of fields to be delivered to the client, +but instead of immediately returning a single answer, a result is sent every time a particular event happens on the +server. + +In order to execute subscriptions you need a coordinator (that spawns connections) +and a GraphQL object that can be resolved into a stream--elements of which will then +be returned to the end user. The [juniper_subscriptions][juniper_subscriptions] crate +provides a default connection implementation. Currently subscriptions are only supported on the `master` branch. Add the following to your `Cargo.toml`: +```toml +[dependencies] +juniper = { git = "https://github.com/graphql-rust/juniper", branch = "master" } +juniper_subscriptions = { git = "https://github.com/graphql-rust/juniper", branch = "master" } +``` + +### Schema Definition + +The Subscription is just a GraphQL object, similar to the Query root and Mutations object that you defined for the +operations in your [Schema][Schema], the difference is that all the operations defined there should be async and the return of it +should be a [Stream][Stream]. + +This example shows a subscription operation that returns two events, the strings `Hello` and `World!` +sequentially: + +```rust +# use juniper::http::GraphQLRequest; +# use juniper::{DefaultScalarValue, FieldError, SubscriptionCoordinator}; +# use juniper_subscriptions::Coordinator; +# use futures::{Stream, StreamExt}; +# use std::pin::Pin; +# #[derive(Clone)] +# pub struct Database; +# impl juniper::Context for Database {} +# impl Database { +# fn new() -> Self { +# Self {} +# } +# } +# pub struct Query; +# #[juniper::graphql_object(Context = Database)] +# impl Query { +# fn hello_world() -> &str { +# "Hello World!" +# } +# } +pub struct Subscription; + +type StringStream = Pin> + Send>>; + +#[juniper::graphql_subscription(Context = Database)] +impl Subscription { + async fn hello_world() -> StringStream { + let stream = tokio::stream::iter(vec![ + Ok(String::from("Hello")), + Ok(String::from("World!")) + ]); + Box::pin(stream) + } +} +# fn main () {} +``` + + + +### Coordinator + +Subscriptions require a bit more resources than regular queries, since they can provide a great vector +for DOS attacks and can bring down a server easily if not handled right. [SubscriptionCoordinator][SubscriptionCoordinator] trait provides the coordination logic. +It contains the schema and can keep track of opened connections, handle subscription +start and maintains a global subscription id. Once connection is established, subscription +coordinator spawns a [SubscriptionConnection][SubscriptionConnection], which handles a +single connection, provides resolver logic for a client stream and can provide re-connection +and shutdown logic. + + +The [Coordinator][Coordinator] struct is a simple implementation of the trait [SubscriptionCoordinator][SubscriptionCoordinator] +that is responsible for handling the execution of subscription operation into your schema. The execution of the `subscribe` +operation returns a [Future][Future] with a Item value of a Result<[Connection][Connection], [GraphQLError][GraphQLError]>, +where the connection is the Stream of values returned by the operation and the GraphQLError is the error that occurred in the +resolution of this connection, which means that the subscription failed. + +```rust +# use juniper::http::GraphQLRequest; +# use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode, SubscriptionCoordinator}; +# use juniper_subscriptions::Coordinator; +# use futures::{Stream, StreamExt}; +# use std::pin::Pin; +# use tokio::runtime::Runtime; +# use tokio::task; +# +# #[derive(Clone)] +# pub struct Database; +# +# impl juniper::Context for Database {} +# +# impl Database { +# fn new() -> Self { +# Self {} +# } +# } +# +# pub struct Query; +# +# #[juniper::graphql_object(Context = Database)] +# impl Query { +# fn hello_world() -> &str { +# "Hello World!" +# } +# } +# +# pub struct Subscription; +# +# type StringStream = Pin> + Send>>; +# +# #[juniper::graphql_subscription(Context = Database)] +# impl Subscription { +# async fn hello_world() -> StringStream { +# let stream = +# tokio::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]); +# Box::pin(stream) +# } +# } +type Schema = RootNode<'static, Query, EmptyMutation, Subscription>; + +fn schema() -> Schema { + Schema::new(Query {}, EmptyMutation::new(), Subscription {}) +} + +async fn run_subscription() { + let schema = schema(); + let coordinator = Coordinator::new(schema); + let req: GraphQLRequest = serde_json::from_str( + r#" + { + "query": "subscription { helloWorld }" + } + "#, + ) + .unwrap(); + let ctx = Database::new(); + let mut conn = coordinator.subscribe(&req, &ctx).await.unwrap(); + while let Some(result) = conn.next().await { + println!("{}", serde_json::to_string(&result).unwrap()); + } +} + +# fn main() { } +``` + +### Web Integration and Examples + +Currently there is an example of subscriptions with [warp][warp], but it still in an alpha state. +GraphQL over [WS][WS] is not fully supported yet and is non-standard. + +- [Warp Subscription Example](https://github.com/graphql-rust/juniper/tree/master/examples/warp_subscriptions) +- [Small Example](https://github.com/graphql-rust/juniper/tree/master/examples/basic_subscriptions) + + + + +[juniper_subscriptions]: https://github.com/graphql-rust/juniper/tree/master/juniper_subscriptions +[Stream]: https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html + +[Coordinator]: https://docs.rs/juniper_subscriptions/0.15.0/struct.Coordinator.html +[SubscriptionCoordinator]: https://docs.rs/juniper_subscriptions/0.15.0/trait.SubscriptionCoordinator.html +[Connection]: https://docs.rs/juniper_subscriptions/0.15.0/struct.Connection.html +[SubscriptionConnection]: https://docs.rs/juniper_subscriptions/0.15.0/trait.SubscriptionConnection.html + +[Future]: https://docs.rs/futures/0.3.4/futures/future/trait.Future.html +[warp]: https://github.com/graphql-rust/juniper/tree/master/juniper_warp +[WS]: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md +[GraphQLError]: https://docs.rs/juniper/0.14.2/juniper/enum.GraphQLError.html +[Schema]: ../schema/schemas_and_mutations.md diff --git a/docs/book/content/schema/schemas_and_mutations.md b/docs/book/content/schema/schemas_and_mutations.md index b3ba9a74..124b7a89 100644 --- a/docs/book/content/schema/schemas_and_mutations.md +++ b/docs/book/content/schema/schemas_and_mutations.md @@ -1,12 +1,13 @@ # Schemas -A schema consists of two types: a query object and a mutation object (Juniper -does not support subscriptions yet). These two define the root query fields -and mutations of the schema, respectively. +A schema consists of three types: a query object, a mutation object, and a subscription object. +These three define the root query fields, mutations and subscriptions of the schema, respectively. + +The usage of subscriptions is a little different from the mutation and query objects, so there is a specific [section][section] that discusses them. Both query and mutation objects are regular GraphQL objects, defined like any -other object in Juniper. The mutation object, however, is optional since schemas -can be read-only. +other object in Juniper. The mutation and subscription object, however, is optional since schemas +can be read-only and without subscriptions as well. If mutations/subscriptions functionality is not needed, consider using [EmptyMutation][EmptyMutation]/[EmptySubscription][EmptySubscription]. In Juniper, the `RootNode` type represents a schema. You usually don't have to create this object yourself: see the framework integrations for [Iron](../servers/iron.md) @@ -58,3 +59,8 @@ impl Mutations { # fn main() { } ``` + +[section]: ../advanced/subscriptions.md +[EmptyMutation]: https://docs.rs/juniper/0.14.2/juniper/struct.EmptyMutation.html + +[EmptySubscription]: https://docs.rs/juniper/0.14.2/juniper/struct.EmptySubscription.html diff --git a/docs/book/tests/Cargo.toml b/docs/book/tests/Cargo.toml index 04f07daf..d7c4d0bc 100644 --- a/docs/book/tests/Cargo.toml +++ b/docs/book/tests/Cargo.toml @@ -8,8 +8,9 @@ build = "build.rs" [dependencies] juniper = { path = "../../../juniper" } juniper_iron = { path = "../../../juniper_iron" } -futures = "0.3.1" - +juniper_subscriptions = { path = "../../../juniper_subscriptions" } +futures = "0.3" +tokio = { version = "0.2", features = ["rt-core", "blocking", "stream", "rt-util"] } iron = "0.5.0" mount = "0.4.0" diff --git a/examples/basic_subscriptions/.gitignore b/examples/basic_subscriptions/.gitignore new file mode 100644 index 00000000..eb5a316c --- /dev/null +++ b/examples/basic_subscriptions/.gitignore @@ -0,0 +1 @@ +target diff --git a/examples/basic_subscriptions/Cargo.toml b/examples/basic_subscriptions/Cargo.toml new file mode 100644 index 00000000..e6ac93a2 --- /dev/null +++ b/examples/basic_subscriptions/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "basic_subscriptions" +version = "0.1.0" +edition = "2018" +authors = ["Jordao Rosario "] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures = "0.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "0.2", features = ["rt-core", "macros", "stream"] } + +juniper = { git = "https://github.com/graphql-rust/juniper" } +juniper_subscriptions = { git = "https://github.com/graphql-rust/juniper" } diff --git a/examples/basic_subscriptions/src/main.rs b/examples/basic_subscriptions/src/main.rs new file mode 100644 index 00000000..3f3cbe0d --- /dev/null +++ b/examples/basic_subscriptions/src/main.rs @@ -0,0 +1,65 @@ +#![deny(warnings)] + +use futures::{Stream, StreamExt}; +use juniper::http::GraphQLRequest; +use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode, SubscriptionCoordinator}; +use juniper_subscriptions::Coordinator; +use std::pin::Pin; + +#[derive(Clone)] +pub struct Database; + +impl juniper::Context for Database {} + +impl Database { + fn new() -> Self { + Self {} + } +} + +pub struct Query; + +#[juniper::graphql_object(Context = Database)] +impl Query { + fn hello_world() -> &str { + "Hello World!" + } +} + +pub struct Subscription; + +type StringStream = Pin> + Send>>; + +#[juniper::graphql_subscription(Context = Database)] +impl Subscription { + async fn hello_world() -> StringStream { + let stream = + tokio::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]); + Box::pin(stream) + } +} + +type Schema = RootNode<'static, Query, EmptyMutation, Subscription>; + +fn schema() -> Schema { + Schema::new(Query {}, EmptyMutation::new(), Subscription {}) +} + +#[tokio::main] +async fn main() { + let schema = schema(); + let coordinator = Coordinator::new(schema); + let req: GraphQLRequest = serde_json::from_str( + r#" + { + "query": "subscription { helloWorld }" + } + "#, + ) + .unwrap(); + let ctx = Database::new(); + let mut conn = coordinator.subscribe(&req, &ctx).await.unwrap(); + while let Some(result) = conn.next().await { + println!("{}", serde_json::to_string(&result).unwrap()); + } +}