Supporting real-time updates in your mobile app
Introduction to real-time updates
After developing some queries and mutations, you might want to implement real-time updates.
Real-time updates are supported in the GraphQL specification by an operation type called Subscription
.
To support subscriptions in a production environment, Data Sync implements subscriptions using an MQTT PubSub subscription mechanism; however, you might want to use the Apollo PubSub module to develop proof-of-concept applications.
When coding for real-time updates, use the following modules:
-
@aerogear/voyager-server - supports clients that use voyager-client to enable GraphQL queries and mutations
-
@aerogear/voyager-subscriptions - supports clients that use voyager-client to enable GraphQL subscriptions
-
@aerogear/graphql-mqtt-subscriptions - supports GraphQL resolvers connections to a MQTT broker
GraphQL Subscriptions enable clients to subscribe to server events over a websocket connection.
The flow can be summarized as follows:
-
Client connects to the server using websockets, and subscribes to certain events.
-
As events occur, the server notifies the clients that are subscribed to those events.
-
Any currently connected client that is subscribed to a given event receives updates.
-
The client can close the connection at any time and no longer receives updates.
To receive updates, the client must be currently connected to the server. The client does not receive events from subscriptions while offline. To support inactive clients, use Push Notifications.
-
For more information about GraphQL subscriptions, see the Subscriptions documentation.
Implementing real-time updates on a Data Sync server
The follow code shows typical code for a Data Sync Server without subscriptions:
const apolloServer = VoyagerServer({
typeDefs,
resolvers
})
const app = express()
apolloServer.applyMiddleware({ app })
app.listen({ port }, () =>
console.log(`🚀 Server ready at http://localhost:${port}${apolloServer.graphqlPath}`)
)
The following sections outline the steps required to enable real-time updates:
-
Implement a SubscriptionServer
-
Implement a Publish Subscribe Mechanism
-
Define subscriptions in the schema
-
Implement resolvers
Implementing a SubscriptionServer using voyager-subscription
To allow you create GraphQL subscription types in your schema:
-
Install the
@aerogear/voyager-subscriptions
package:$ npm i @aerogear/voyager-subscriptions
-
Configure SubscriptionServer using
@aerogear/voyager-subscriptions
const { createSubscriptionServer } = require('@aerogear/voyager-subscriptions') const apolloServer = VoyagerServer({ typeDefs, resolvers }) const app = express() apolloServer.applyMiddleware({ app }) const port = 4000 const server = app.listen({ port }, () => { console.log(`🚀 Server ready at http://localhost:${port}${apolloServer.graphqlPath}`) createSubscriptionServer({ schema: apolloServer.schema }, { server, path: '/graphql' }) })
The
createSubscriptionServer
code:-
returns a
SubscriptionServer
instance -
installs handlers for
-
managing websocket connections
-
delivering subscriptions on the server
-
-
provides integrations with other modules such as
@aerogear/voyager-keycloak
.
-
-
For more information about arguments and options, see the subscriptions-transport-ws module.
Implementing a Publish Subscribe Mechanism
This procedure describes an in-memory implementation which is useful for prototyping but not suitable for production. {org-name} recommends using MQTT PubSub in production. See Configuring a Publish Subscribe mechanism for more information about all the implementation methods. |
To provide a channel to push updates to the client using the default PubSub
provided by apollo-server
, you implement a Publish Subscribe mechanism, for example:
const { PubSub } = require('apollo-server')
const pubsub = new PubSub()
Subscriptions depend on a publish subscribe mechanism to generate the events that notify a subscription. There are several PubSub implementations available based on the PubSubEngine
interface.
Defining subscriptions in the schema
Subscriptions are a root level type.
They are defined in the schema similar to Query
and Mutation
.
For example, in the following schema, a Task
type is defined and so are mutations and subscriptions.
type Subscription { taskCreated: Task } type Mutation { createTask(title: String!, description: String!): Task } type Task { id: ID! title: String! description: String! }
Implementing resolvers
Inside the resolver map, subscription resolvers return an AsyncIterator,
which listens for events.
To generate an event, call the publish
method.
The pubsub.publish
code is typically located inside a mutation resolver.
In the following example, when a new task is created, the createTask
resolver publishes the result of this mutation to the TaskCreated
channel.
const TASK_CREATED = 'TaskCreated'
const resolvers = {
Subscription: {
taskCreated: {
subscribe: () => pubSub.asyncIterator(TASK_CREATED)
}
},
Mutation: {
createTask: async (obj, args, context, info) => {
const task = tasks.create(args)
pubSub.publish(TASK_CREATED, { taskCreated: task })
return task
}
},
}
This subscription server does not implement authentication or authorization. For information about implementing authenication and authorization, see Supporting authentication and authorization in your mobile app. |
-
For information on how to use subscriptions in your client code, see Realtime Updates.
Configuring a Publish Subscribe mechanism
You can use the Apollo PubSub mechanism for development, but you must use the MQTT PubSub mechanism for production.
Using the Apollo PubSub mechanism
The Implementing real-time updates on a Data Sync server section describes how to set up the default PubSub
provided by apollo-server
. For a production system, use MQTT PubSub.
Using the MQTT PubSub mechanism
The @aerogear/graphql-mqtt-subscriptions
module provides an AsyncIterator
interface used for implementing subscription resolvers
It connects the Data Sync server to an MQTT broker to support horizontally scalable subscriptions.
Initialize an MQTT client and pass that client to the @aerogeaar/graphql-mqtt-subscriptions
module, for example:
const mqtt = require('mqtt')
const { MQTTPubSub } = require('@aerogear/graphql-mqtt-subscriptions')
const client = mqtt.connect('mqtt://test.mosquitto.org', {
reconnectPeriod: 1000,
})
const pubsub = new MQTTPubSub({
client
})
In the example, an mqtt
client is created using mqtt.connect
and then this client is passed into an MQTTPubSub
instance.
The pubsub
instance can then be used to publish and subscribe to events in the server.
Configuring AMQ Online for MQTT Messaging
Red Hat AMQ supports the MQTT protocol which makes it a suitable PubSub technology for powering GraphQL subscriptions at scale.
This section provides recommendations for:
-
Configuring AMQ Online for MQTT messaging.
-
Connecting to AMQ Online and using it as a pubsub within server applications.
-
AMQ Online is a mechanism that allows developers to consume the features of Red Hat AMQ within OpenShift.
-
Red Hat AMQ provides fast, lightweight, and secure messaging for Internet-scale applications. AMQ Broker supports multiple protocols and fast message persistence.
-
MQTT stands for MQ Telemetry Transport. It is a publish-subscribe, extremely simple and lightweight messaging protocol.
AMQ Online includes many configuration options that address the specific needs of your application. The minimum configuration steps for using AMQ Online for MQTT messaging and enabling GraphQL subscriptions are:
-
Create an
AddressSpace
-
Create an
Address
-
Create a
MessagingUser
Creating an address space
A user can request messaging resources by creating an AddressSpace
. There are two types of address spaces, standard
and brokered
.
You must use the brokered
address space for MQTT based applications.
-
Create an address space. For example, the following resource creates a brokered
AddressSpace
:apiVersion: enmasse.io/v1beta1 kind: AddressSpace metadata: name: myaddressspace spec: type: brokered plan: brokered-single-broker
-
Create the
AddressSpace
.oc create -f brokered-address-space.yaml
-
Check the status of the address space:
oc get <`AddressSpace` name> -o yaml
The output displays information about the address space, including details required for connecting applications.
-
See Creating address spaces using the command line for more information.
Creating an Address
An adress is part of an AddressSpace
and represents a destination for sending and receiving messages.
Use an Address
with type topic
to represent an MQTT topic.
-
Create an address definition:
apiVersion: enmasse.io/v1beta1 kind: Address metadata: name: myaddressspace.myaddress # must have the format <`AddressSpace` name>.<address name> spec: address: myaddress type: topic plan: brokered-topic
-
Create the address:
oc create -f topic-address.yaml
See the Configuring your server for real-time updates guide for more information about using pubsub.asyncIterator() .
Create an Address for each topic name passed into pubsub.asyncIterator() .
|
-
See Creating addresses using the command line for more information.
Creating an AMQ Online user
A messaging client connects using an AMQ Online user, also known as a`MessagingUser`.
A MessagingUser
specifies an authorization policy that controls which addresses can be used and the operations that can be performed on those addresses.
Users are configured as MessagingUser
resources.
Users can be created, deleted, read, updated, and listed.
-
Create a user definition:
apiVersion: user.enmasse.io/v1beta1 kind: MessagingUser metadata: name: myaddressspace.mymessaginguser # must be in the format <`AddressSpace` name>.<username> spec: username: mymessaginguser authentication: type: password password: cGFzc3dvcmQ= # must be Base64 encoded. Password is 'password' authorization: - addresses: ["*"] operations: ["send", "recv"]
-
Create the
MessagingUser
.oc create -f my-messaging-user.yaml
An application can now connect to an AMQ Online address using this user’s credentials.
For more information see the AMQ Online User Model.
Using GraphQL MQTT PubSub with AMQ Online
The following AMQ Online resources are available for MQTT Applications
-
AddressSpace
-
Address
-
MessagingUser
This section describes how to use @aerogear/graphql-mqtt-subscriptions
to connect to an AMQ Online Address
.
-
Retrieve the connection details for the
AddressSpace
you want to use:oc get addressspace <addressspace> -o yaml
-
Determine which method you want to use to connect to the address:
-
Using the service hostname - Allows clients to connect from within the OpenShift cluster.
{org-name} recommends that applications running inside OpenShift connect using the service hostname. The service hostname is only accessible within the OpenShift cluster. This ensures messages routed between your application and AMQ Online stay within the OpenShift cluster and never go onto the public internet.
-
Using the external hostname - Allows clients to connect from outside the OpenShift cluster.
The external hostname allows connections from outside the OpenShift cluster. This is useful for the following cases:
-
Production applications running outside of OpenShift connecting and publishing messages.
-
Quick Prototyping and local development. Create a non-production
AddressSpace
, allowing developers to connect applications from their local environments.
-
-
-
To connect to an AMQ Online
Address
using the service hostname-
Retrieve the service hostname:
oc get addressspace <addressspace name> -o jsonpath='{.status.endpointStatuses[?(@.name=="messaging")].serviceHost
-
Add code to create the connection, for example:
const mqtt = require('mqtt') const { MQTTPubSub } = require('@aerogear/graphql-mqtt-subscriptions') const client = mqtt.connect({ host: '<internal host name>', username: '<MessagingUser name>', password: '<MessagingUser password>', port: 5762, }) const pubsub = new MQTTPubSub({ client })
-
To encrypt all messages between your application and the AMQ Online broker, enable TLS, for example:
const mqtt = require('mqtt') const { MQTTPubSub } = require('@aerogear/graphql-mqtt-subscriptions') const host = '<internal host name>' const client = mqtt.connect({ host: host, servername: host, username: '<MessagingUser name>', password: '<MessagingUser password>', port: 5761, protocol: 'tls', rejectUnauthorized: false, }) const pubsub = new MQTTPubSub({ client })
-
-
To connect to an AMQ Online
Address
using the external hostname:The external hostname typically accept only accept TLS connections. -
Retrieve the external hostname:
oc get addressspace <addressspace name> -o jsonpath='{.status.endpointStatuses[?(@.name=="messaging")].externalHost
-
Connect to the external hostname, for example:
const mqtt = require('mqtt') const { MQTTPubSub } = require('@aerogear/graphql-mqtt-subscriptions') const host = '<internal host name>' const client = mqtt.connect({ host: host, servername: host, username: '<MessagingUser name>', password: '<MessagingUser password>', port: 443, protocol: 'tls', rejectUnauthorized: false, }) const pubsub = new MQTTPubSub({ client })
-
-
If you use TLS, note the following additional
mqtt.connect
options:-
servername
- when connecting to a message broker in OpenShift using TLS, this property must be set otherwise the connection will fail, because the messages are being routed through a proxy resulting in the client being presented with multiple certificates. By setting theservername
, the client will use Server Name Indication (SNI) to request the correct certificate as part of the TLS connection setup. -
protocol
- must be set to'tls'
-
rejectUnauthorizated
- must be set to false, otherwise the connection will fail. This tells the client to ignore certificate errors. Again, this is needed because the client is presented with multiple certificates and one of the certificates is for a different hostname than the one being requested, which normally results in an error. -
port
- must be set to 5761 for service hostname or 443 for external hostname.
-
Using environment variables for configuration
{org-name} recommends that you use environment variables for connection, for example:
const mqtt = require('mqtt')
const { MQTTPubSub } = require('@aerogear/graphql-mqtt-subscriptions')
const host = process.env.MQTT_HOST || 'localhost'
const client = mqtt.connect({
host: host,
servername: host,
username: process.env.MQTT_USERNAME,
password: process.env.MQTT_PASSWORD,
port: process.env.MQTT_PORT || 1883,
protocol: process.env.MQTT_PROTOCOL || 'mqtt',
rejectUnauthorized: false,
})
const pubsub = new MQTTPubSub({ client })
In this example, the connection options can be configured using environment variables, but sensible defaults for the host
, port
and protocol
are provided for local development.
Troubleshooting MQTT Connection Issues
Troubleshooting MQTT Events
The mqtt
module emits various events during runtime.
It recommended to add listeners for these events for regular operation and for troubleshooting.
client.on('connect', () => {
console.log('client has connected')
})
client.on('reconnect', () => {
console.log('client has reconnected')
})
client.on('offline', () => {
console.log('Client has gone offline')
})
client.on('error', (error) => {
console.log(`an error has occurred ${error}`)
})
Read the MQTT documentation to learn about all of the events and what causes them.
Troubleshooting MQTT Configuration Issues
If your application is experiencing connection errors, the most important thing to check is the configuration being passed into mqtt.connect
. Because your application may run locally or in OpenShift, it may connect using internal or external hostnames, and it may or may not use TLS. It is very easy to accidentally provide the wrong configuration.
The Node.js mqtt
module does not report any errors if parameters such as hostname
or port
are incorrect. Instead, it will silently fail and allow your application to start without messaging capabilities.
It may be necessary to handle this scenario in your application. The following workaround can be used.
const TIMEOUT = 10 // number of seconds to wait before checking if the client is connected
setTimeout(() => {
if (!client.connected) {
console.log(`client not connected after ${TIMEOUT} seconds`)
// process.exit(1) if you wish
}
}, TIMEOUT * 1000)
This code can be used to detect if the MQTT client hasn’t connected. This can be helpful for detecting potential configuration issues and allows your application to respond to that scenario.
Implementing real-time updates on on the client
A core concept of the GraphQL specification is an operation type called Subscription
, they provide a mechanism for real time updates.
For more information on GraphQL subscriptions see the Subscriptions documentation.
To do this GraphQL Subscriptions utilise websockets to enable clients to subscribe to published changes.
The architecture of websockets is as follows:
-
Client connects to websocket server.
-
Upon certain events, the server can publish the results of these events to the websocket.
-
Any currently connected client to that websocket receives these results.
-
The client can close the connection at any time and no longer receives updates.
Websockets are a perfect solution for delivering messages to currently active clients. To receive updates the client must be currently connected to the websocket server, updates made over this websocket while the client is offline are not consumed by the client. For this use case Push Notifications are recommended.
Voyager Client comes with subscription support out of the box including auto-reconnection upon device restart or network reconnect. To enable subscriptions on your client set the following paramater in the Voyager Client config object. A DataSyncConfig interface is also available from Voyager Client if you wish to use it.
Setting up a client to use subscriptions
To set up a client to use subscriptions:
-
Provide a
wsUrl
string in the config object as follows:const config = { wsUrl: "ws://<your_websocket_url>" }
where
<your_websocket_url>
is the full URL of the websocket endpoint of your GraphQL server. -
Use the object from step 1 to initialise Voyager Client:
const { createClient } = require("@aerogear/voyager-client"); const client = createClient(config)
Using Subscriptions
A standard flow to utilise subscriptions is as follows:
-
Make a network query to get data from the server
-
Watch the cache for changes to queries
-
Subscribe to changes pushed from the server
-
Unsubscibe when leaving the view where there is an active subscription
In the three examples below, subscribeToMore
ensures that any further updates received from the server force the updateQuery function to be called with subscriptionData
from the server.
Using subscribeToMore
ensures the cache is easily updated as all GraphQL queries are automatically notified.
For more information, see the subscribeToMore documentation.
getTasks() {
const tasks = client.watchQuery({
query: GET_TASKS
});
tasks.subscribeToMore({
document: TASK_ADDED_SUBSCRIPTION,
updateQuery: (prev, { subscriptionData }) => {
// Update logic here.
}
});
return tasks;
}
To allow Voyager Client to automatically generate the updateQuery
function for you, please see the Cache Update Helpers section.
You can then use this query in our application to subscribe to changes so that the front end is always updated when new data is returned from the server.
this.tasks = [];
this.getTasks().subscribe(result => {
this.tasks = result.data && result.data.allTasks;
})
Note that it is also a good idea to unsubscribe from a query upon leaving a page. This prevents possible memory leaks. This can be done by calling unsubscribe() as shown in the following example. This code should be placed in the appropriate place.
this.getTasks().unsubscribe();
Handling network state changes
When using subscriptions to provide your client with realtime updates it is important to monitor network state because the client will be out of sync if the server if updated when the the client is offline.
To avoid this, Voyager Client provides a NetworkStatus
interface which can be used along with the NetworkInfo
interface to implement custom checks of network status.
Use the following example to re-run a query after a client returns to an online state:
const { CordovaNetworkStatus, NetworkInfo } = require("@aerogear/voyager-client");
const networkStatus = new CordovaNetworkStatus();
networkStatus.onStatusChangeListener({
onStatusChange(networkInfo: NetworkInfo) {
const online = networkInfo.online;
if (online) {
client.watchQuery({
query: GET_TASKS
});
}
}
});