juniper/docs/book/content/advanced/subscriptions.md
Kai Ren cbf16c5a33
Make interfaces great again! (#682)
* Bootstrap

* Upd

* Bootstrap macro

* Revert stuff

* Correct PoC to compile

* Bootstrap #[graphql_interface] expansion

* Bootstrap #[graphql_interface] meta parsing

* Bootstrap #[graphql_interface] very basic code generation [skip ci]

* Upd trait code generation and fix keywords usage [skip ci]

* Expand trait impls [skip ci]

* Tune up objects [skip ci]

* Finally! Complies at least... [skip ci]

* Parse meta for fields and its arguments [skip ci]

- also, refactor and bikeshed new macros code

* Impl filling fields meta and bootstrap field resolution [skip ci]

* Poking with fields resolution [skip ci]

* Solve Rust's teen async HRTB problems [skip ci]

* Start parsing trait methods [skip ci]

* Finish parsing fields from trait methods [skip ci]

* Autodetect trait asyncness and allow to specify it [skip ci]

* Allow to autogenerate trait object alias via attribute

* Support generics in trait definition and asyncify them correctly

* Temporary disable explicit async

* Cover arguments and custom names/descriptions in tests

* Re-enable tests with explicit async and fix the codegen to satisfy it

* Check implementers are registered in schema and vice versa

* Check argument camelCases

* Test argument defaults, and allow Into coercions for them

* Re-enable markers

* Re-enable markers and relax Sized requirement on IsInputType/IsOutputType marker traits

* Revert 'juniper_actix' fmt

* Fix missing marks for object

* Fix subscriptions marks

* Deduce result type correctly via traits

* Final fixes

* Fmt

* Restore marks checking

* Support custom ScalarValue

* Cover deprecations with tests

* Impl dowcasting via methods

* Impl dowcasting via external functions

* Support custom context, vol. 1

* Support custom context, vol. 2

* Cover fallible field with test

* Impl explicit generic ScalarValue, vol.1

* Impl explicit generic ScalarValue, vol.2

* Allow passing executor into methods

* Generating enum, vol.1

* Generating enum, vol.2

* Generating enum, vol.3

* Generating enum, vol.3

* Generating enum, vol.4

* Generating enum, vol.5

* Generating enum, vol.6

* Generating enum, vol.7

* Generating enum, vol.8

* Refactor juniper stuff

* Fix juniper tests, vol.1

* Fix juniper tests, vol.2

* Polish 'juniper' crate changes, vol.1

* Polish 'juniper' crate changes, vol.2

* Remove redundant stuf

* Polishing 'juniper_codegen', vol.1

* Polishing 'juniper_codegen', vol.2

* Polishing 'juniper_codegen', vol.3

* Polishing 'juniper_codegen', vol.4

* Polishing 'juniper_codegen', vol.5

* Polishing 'juniper_codegen', vol.6

* Polishing 'juniper_codegen', vol.7

* Polishing 'juniper_codegen', vol.8

* Polishing 'juniper_codegen', vol.9

* Fix other crates tests and make Clippy happier

* Fix examples

* Add codegen failure tests, vol. 1

* Add codegen failure tests, vol. 2

* Add codegen failure tests, vol.3

* Fix codegen failure tests accordingly to latest nightly Rust

* Fix codegen when interface has no implementers

* Fix warnings in book tests

* Describing new interfaces in Book, vol.1

Co-authored-by: Christian Legnitto <LegNeato@users.noreply.github.com>
2020-10-05 21:21:01 -10:00

6.8 KiB

Subscriptions

How to achieve realtime data with GraphQL subscriptions

GraphQL subscriptions are a way to push data from the server to clients requesting real-time messages from the server. Subscriptions are similar to queries in that they specify a set of fields to be delivered to the client, but instead of immediately returning a single answer a result is sent every time a particular event happens on the server.

In order to execute subscriptions you need a coordinator (that spawns connections) and a GraphQL object that can be resolved into a stream--elements of which will then be returned to the end user. The juniper_subscriptions crate provides a default connection implementation. Currently subscriptions are only supported on the master branch. Add the following to your Cargo.toml:

[dependencies]
juniper = { git = "https://github.com/graphql-rust/juniper", branch = "master" }
juniper_subscriptions = { git = "https://github.com/graphql-rust/juniper", branch = "master" }

Schema Definition

The Subscription is just a GraphQL object, similar to the query root and mutations object that you defined for the operations in your Schema. For subscriptions all fields/operations should be async and should return a Stream.

This example shows a subscription operation that returns two events, the strings Hello and World! sequentially:

# extern crate futures;
# extern crate juniper;
# extern crate juniper_subscriptions;
# extern crate tokio;
# use juniper::FieldError;
# use futures::Stream;
# use std::pin::Pin;
#
# #[derive(Clone)]
# pub struct Database;
# impl juniper::Context for Database {}

# pub struct Query;
# #[juniper::graphql_object(Context = Database)]
# impl Query {
#    fn hello_world() -> &str {
#        "Hello World!"
#    }
# }
pub struct Subscription;

type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Database)]
impl Subscription {
    async fn hello_world() -> StringStream {
        let stream = tokio::stream::iter(vec![
            Ok(String::from("Hello")),
            Ok(String::from("World!"))
        ]);
        Box::pin(stream)
    }
}
# fn main () {}

Coordinator

Subscriptions require a bit more resources than regular queries and provide a great vector for DOS attacks. This can can bring down a server easily if not handled correctly. The SubscriptionCoordinator trait provides coordination logic to enable functionality like DOS attack mitigation and resource limits.

The SubscriptionCoordinator contains the schema and can keep track of opened connections, handle subscription start and end, and maintain a global subscription id for each subscription. Each time a connection is established,
the SubscriptionCoordinator spawns a SubscriptionConnection. The SubscriptionConnection handles a single connection, providing resolver logic for a client stream as well as reconnection and shutdown logic.

While you can implement SubscriptionCoordinator yourself, Juniper contains a simple and generic implementation called Coordinator. The subscribe operation returns a Future with an Item value of a Result<Connection, GraphQLError>, where Connection is a Stream of values returned by the operation and GraphQLError is the error when the subscription fails.

# #![allow(dead_code)]
# extern crate futures;
# extern crate juniper;
# extern crate juniper_subscriptions;
# extern crate serde_json;
# extern crate tokio;
# use juniper::http::GraphQLRequest;
# use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode, SubscriptionCoordinator};
# use juniper_subscriptions::Coordinator;
# use futures::{Stream, StreamExt};
# use std::pin::Pin;
# 
# #[derive(Clone)]
# pub struct Database;
# 
# impl juniper::Context for Database {}
# 
# impl Database {
#     fn new() -> Self {
#         Self {}
#     }
# }
# 
# pub struct Query;
# 
# #[juniper::graphql_object(Context = Database)]
# impl Query {
#     fn hello_world() -> &str {
#         "Hello World!"
#     }
# }
# 
# pub struct Subscription;
# 
# type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;
# 
# #[juniper::graphql_subscription(Context = Database)]
# impl Subscription {
#     async fn hello_world() -> StringStream {
#         let stream =
#             tokio::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
#         Box::pin(stream)
#     }
# }
type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;

fn schema() -> Schema {
    Schema::new(Query {}, EmptyMutation::new(), Subscription {})
}

async fn run_subscription() {
    let schema = schema();
    let coordinator = Coordinator::new(schema);
    let req: GraphQLRequest<DefaultScalarValue> = serde_json::from_str(
        r#"
        {
            "query": "subscription { helloWorld }"
        }
    "#,
    )
        .unwrap();
    let ctx = Database::new();
    let mut conn = coordinator.subscribe(&req, &ctx).await.unwrap();
    while let Some(result) = conn.next().await {
        println!("{}", serde_json::to_string(&result).unwrap());
    }
}

# fn main() { }

Web Integration and Examples

Currently there is an example of subscriptions with warp, but it still in an alpha state. GraphQL over WS is not fully supported yet and is non-standard.