Using WebSockets in Dallinger Experiments

Dallinger provides some helpers to facilitate realtime communication between participants and the experiment using the WebSocket protocol.

The experiment server runs a WebSocket service located at the route /chat that implements a Publish-Subscribe pattern. Connecting to that service with a channel argument in the url will subscribe a client to all messages sent to the named channel. If the named “channel” doesn’t already exist it will be created on the server. Both clients (participants) and the experiment instance can subscribe and create channels.

The channel backend publishes all incoming messages to a redis queue. It also looks for new messages on the queue for and relays channel specific messages to all channel subscribers (generally either participants or the experiment itself).

When a client makes a WebSocket connection to the /chat?channel=<channel> route (see The Web API) it opens a persistent connection to the experiment over which it can send messages (to any channel) and will receive all messages published to the channel named in the initial request.

Additionally, Experiment classes can provide a channel attribute which will automatically subscribe the experiment class to the named channel. Experiments that specify a channel will also be subscribed to a control channel named “dallinger_control” to which client connection, disconnection, subscribe, and unsubscribe messages are automatically sent. Such experiments should implement a custom receive_message() method to receive and process incoming WebSocket messages.

Experiments may also send messages to all channel subscribers using the publish_to_subscribers() method.

It’s possible for an experiment class to subscribe to messages on additional WebSocket channels. To avoid duplicate subscriptions it’s generally best to create such subscriptions in your Experiment class’s on_launch() method which is only run at experiment launch time, or using the experiment’s background_tasks(). For example:

def on_launch(self):
    from dallinger.experiment_server.sockets import chat_backend
    chat_backend.subscribe(self, 'my_secondary_channel')

An experiment can create and subscribe to channels after launch, but would need to be careful to ensure each channel is only ever subscribed once per experiment run. This is likely to be difficult because the experiment potentially has many instances running concurrently across multiple processes and servers.

Websocket messages are strings consisting of a channel name followed by a : and then a message payload. The message payload is usually a string representing a JSON object.

Messages are handled asynchronously by the receive_message() method of the experiment class. Experiments which wish to override the default asynchronous handling of WebSocket messages (e.g. because they retain non-persisted state in the experiment instance that is needed to process the message) may override the send() method of the experiment class.

If your experiment implements synchronous handling of messages either using a custom send() or by sending the immediate flag in your message payload, it will need to ensure that it takes care to manage any database sessions. The dallinger.db.scoped_session_decorator can be used to wrap functions and the dallinger.db.sessions_scope contextmanager can provide more granular/repeated session management.

Client Implementation

The default experiment layout includes a basic websocket communication library which implements a ReconnectingWebSocket object that can be used to establish channel subscriptions, send messages to various channels, and receive messages on subscribed channels.

Typically experiments set up a WebSocket connection after completing the initial call to createAgent using code similar to this:

var broadcast_socket;
var open_socket = function (channel_id) {
    var ws_scheme = (window.location.protocol === "https:") ? 'wss://' : 'ws://';
    // Setup a websocket connection to the channel, passing our worker_id and participant_id
    socket = new ReconnectingWebSocket(
        ws_scheme + location.host + "/chat?channel=" + channel_id +"&worker_id=" + dallinger.identity.workerId + '&participant_id=' + dallinger.identity.participantId
    );
    // Once the connection is established, send an initial message to the channel
    socket.onopen(function () {
        socket.send(channel_id + ':{"message": "Hello world!"}');
    });
    // Handle any incoming messages
    socket.onmessage = function (msg) {
        // Ignore messages not from the channel subscribed channel
        if (msg.data.indexOf(channel_id + ':') !== 0) { return; }
        // Parse the payload
        var data = JSON.parse(msg.data.substring(channel_id.length + 1));
        // Example message data
        var type = data.type;
        // Take different actions based on message type
        switch(type) {
           ...
        }
    };
    return socket;
};
// Create the agent.
var create_agent = function() {
    dallinger.createAgent()
        .done(function (resp) {
            ...
            broadcast_socket = open_socket("broadcast_channel");
        })
        .fail(function (rejection) {
            ...
        });
};

When establishing a channel subscription using the /chat route, the client may include worker_id and participant_id values. Those values will be included in the automatically generated JSON messages alerting the experiment to WebSocket connection, disconnection, subscription, and un-subscription events over the “dallinger_control” channel.

Messages sent over the socket connection can be prefixed with any channel name, not just the channel to which the connection is subscribed. Additional subscriptions can be established by opening new websocket connections to the /chat route with different channel values.

Experiment Channel Setup

Many experiment use cases will only need a “broadcast channel” to which all clients subscribe. That subscription can be established when the experiment starts (i.e. when createAgent returns). This “broadcast channel” would be separate from the one set in the Experiment.channel attribute, which we will call the “experiment control channel”.

Clients will receive all messages sent to the “broadcast channel” by either the experiment or other clients. The messages will generally contain JSON payloads that indicate the messages’ purpose. For example, messages may have a type property to differentiate e.g. “state” messages sent by the experiment server from “chat” messages sent by other clients. Additionally, such “chat” messages might have room or recipient properties to allow clients to filter out messages not intended for them.

Generally, clients will send messages about their actions to the “experiment control channel”. Those messages will be processed by the experiment and will not be relayed to other clients, because clients are not generally subscribed to the “experiment control channel”.

The experiment sends messages to all clients over the “broadcast channel”, but generally does not subscribe to the “broadcast channel”. If an experiment needs to handle messages sent by clients over the “broadcast channel”, then it’s generally simplest for clients to send such messages both to the “broadcast channel” and to the “experiment control channel” (perhaps with an additional broadcast flag). It is possible to subscribe the experiment to the “broadcast channel”, but that would also require the experiment to handle/ignore the messages that the experiment itself sends over that channel.

Multiple Client Channels

If it’s important for an experiment to have participant and/or group specific channels, e.g. to ensure messages are only ever seen by their targets, or to reduce the total number of messages sent to or processed by clients, then clients can subscribe to multiple channels.

For example, after launch an experiment could broadcast a create_chatroom type message with a chatroom property set to e.g. “room_1” and an array of partcicpant_ids. Clients could then subscribe to the “room_1” channel using the /chat route only if their participant_id matches one of the values in participant_ids. That way only only the clients with the matching participant_ids would receive messages for “room_1”.

If these chat room messages need to be handled by the experiment code, then the clients could also send these messages to the “experiment control channel”, with an additional chatroom property to specify the channel. Alternatively, if the names of all chatrooms could be determined at experiment launch time, then duplicate messages can be avoided by having the experiment subscribe to all chatrooms in on_launch() or using background_tasks().

Similarly, if the experiment needs to send messages privately to specific participants, then every client could use the /chat route to subscribe to a unique channel like “participant_${participant_id}_channel”, to which the experiment instance could send private messages using self.publish_to_subscribers(payload, channel_name=channel) or redis_conn.publish(f”participant_${participant_id}_channel”, payload).