- consider `juniper_graphql_transport_ws` crate on CI - implement auto-selection of protocol in `juniper_warp` crate - support `graphql-transport-ws` protocol in `juniper_actix` crate - implement auto-selection of protocol in `juniper_actix` crate Additionally: - move `examples/warp_subscriptions` into `juniper_warp/examples/subscription.rs` - move `examples/actix_subscriptions` into `juniper_actix/examples/subscription.rs` - move `examples/basic_subscriptions` into `juniper_subscriptions/examples/basic.rs` - bump up MSRV of `juniper_actix` crate to 1.68
152 lines
4.9 KiB
152 lines
4.9 KiB
//! This example demonstrates asynchronous subscriptions with [`actix_web`].
use std::{env, pin::Pin, time::Duration};
use actix_cors::Cors;
use actix_web::{
web::{self, Data},
App, Error, HttpRequest, HttpResponse, HttpServer, Responder,
use juniper::{
graphql_subscription, graphql_value,
tests::fixtures::starwars::schema::{Database, Query},
EmptyMutation, FieldError, GraphQLObject, RootNode,
use juniper_actix::{graphiql_handler, graphql_handler, playground_handler, subscriptions};
use juniper_graphql_ws::ConnectionConfig;
type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;
fn schema() -> Schema {
Schema::new(Query, EmptyMutation::<Database>::new(), Subscription)
async fn playground() -> Result<HttpResponse, Error> {
playground_handler("/graphql", Some("/subscriptions")).await
async fn graphiql() -> Result<HttpResponse, Error> {
graphiql_handler("/graphql", Some("/subscriptions")).await
async fn graphql(
req: HttpRequest,
payload: web::Payload,
schema: Data<Schema>,
) -> Result<HttpResponse, Error> {
let context = Database::new();
graphql_handler(&schema, &context, req, payload).await
async fn homepage() -> impl Responder {
.insert_header(("content-type", "text/html"))
"<html><h1>juniper_actix/subscription example</h1>\
<div>visit <a href=\"/graphiql\">GraphiQL</a></div>\
<div>visit <a href=\"/playground\">GraphQL Playground</a></div>\
async fn subscriptions(
req: HttpRequest,
stream: web::Payload,
schema: web::Data<Schema>,
) -> Result<HttpResponse, Error> {
let context = Database::new();
let schema = schema.into_inner();
let config = ConnectionConfig::new(context);
// set the keep alive interval to 15 secs so that it doesn't timeout in playground
// playground has a hard-coded timeout set to 20 secs
let config = config.with_keep_alive_interval(Duration::from_secs(15));
subscriptions::ws_handler(req, stream, schema, config).await
struct Subscription;
struct RandomHuman {
id: String,
name: String,
type RandomHumanStream =
Pin<Box<dyn futures::Stream<Item = Result<RandomHuman, FieldError>> + Send>>;
#[graphql_subscription(context = Database)]
impl Subscription {
description = "A random humanoid creature in the Star Wars universe every 3 seconds. \
Second result will be an error."
async fn random_human(context: &Database) -> RandomHumanStream {
let mut counter = 0;
let context = (*context).clone();
use rand::{rngs::StdRng, Rng, SeedableRng};
let mut rng = StdRng::from_entropy();
let mut interval = tokio::time::interval(Duration::from_secs(5));
let stream = async_stream::stream! {
counter += 1;
loop {
if counter == 2 {
yield Err(FieldError::new(
"some field error from handler",
graphql_value!("some additional string"),
} else {
let random_id = rng.gen_range(1000..1005).to_string();
let human = context.get_human(&random_id).unwrap().clone();
yield Ok(RandomHuman {
id: human.id().into(),
name: human.name().unwrap().into(),
async fn main() -> std::io::Result<()> {
env::set_var("RUST_LOG", "info");
HttpServer::new(move || {
.allowed_methods(vec!["POST", "GET"])
.allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT])