Implementing Publisher-Subscriber Pattern Using JavaScript, NodeJS and WebSockets

Recently I ticked off an item from my bucket list:
Do a quick and dirty implementation of the Publisher-Subscriber pattern.

For the uninitiated, Publisher-Subscriber pattern is a quite common messaging pattern. Following is a brief explanation of this pattern in very plain language. You can find a more detailed explanation including various possible implementation here.

In the Publisher-Subscriber pattern, publishers publish messages without bothering about who receives it. Similarly, the subscribers can be blissfully unaware of publishers, just consuming messages from topics of interest to them. This pattern is quite commonly confused with other similar patterns like a message queue and observer pattern. However, in both of these, publisher (or producer) is aware of the subscriber (or consumers). 

Decoupling

This decoupling of the publishers and subscribers is achieved by introducing an intermediary, also known as a broker. Publishers publish the message to this broker, typically under a certain topic. Subscribers can subscribe to specific categories on this broker. Whenever a new message arrives under a certain topic, broker delivers them to subscribers subscribed to that topic.

A real-life example of Publisher-Subscriber pattern would be cable television. Different media companies create content and dump them on their respective channels. Viewers then consume content from the channels of interests to them, say sports or movies channel. Cable services here act as the broker.

Now, let’s get our hand dirty by implementing this pattern using NodeJS and client-side JavaScript. We would need a few extra packages too to get the ball rolling:

  • ws – A robust but simple implementation of WebSocket Server/Client for NodeJS
  • express – To serve static HTML content (publishers, subscribers)

Don’t be worried if you came across WebSockets for the first time or have never used them. WebSockets is a communication protocol, which allows two-way communication between server and client (browser in our case), as opposed to one-way protocols like HTTP. In our case, WebSockets allows us to both send messages from server to browser (subscribers) and from browser to the server (publishers). There are other ways and supporting protocols for doing this two-way communication, like long polling using Comet or Bayeux protocols, but none of them is as simple and straight-forward as WebSockets.

const WebSocketServer = require('ws').Server;
const express = require('express');
const path = require('path');
const server = require('http').createServer();
const PubSubManager = require('./pubsub');

const app = express();
const pubSubManager = new PubSubManager();

app.use(express.static(path.join(__dirname, '/public')));
const wss = new WebSocketServer({ server: server });
wss.on('connection', (ws, req) => {
    console.log(`Connection request from: ${req.connection.remoteAddress}`);
    ws.on('message', (data) => {
        console.log('data: ' + data);
        const json = JSON.parse(data);
        const request = json.request;
        const message = json.message;
        const channel = json.channel;

        switch (request) {
            case 'PUBLISH':
                pubSubManager.publish(ws, channel, message);
                break;
            case 'SUBSCRIBE':
                pubSubManager.subscribe(ws, channel);
                break;
        }
    });
    ws.on('close', () => {
        console.log('Stopping client connection.');
    });
});

server.on('request', app);
server.listen(8080, () => {
    console.log('Server listening on http://localhost:8080');
});

We first prepare the our WebSocket server using  ws. It provides multiple ways of creating WebSocket server. The approach which we use here is running it over an existing NodeJS server. We also run an Express app on the same server. As WebSockets are designed to work over HTTP ports and support HTTP proxies, we exploit it to our advantage by the same endpoint for both our WebSocket server and Express app. All normal HTTP requests are forwarded to Express app, which currently serves only the static content: static HTML files containing code for publisher and subscriber. All WebSocket specific requests are transferred over to our WebSocket server.
The server can receives data from the client on successful connection and respond back. It differentiates between the publishers and subscribers on the basis of a key provided in the incoming data (request).

class PubSubManager {
    constructor() {
        this.channels = {
            weather: {
                message: '',
                subscribers: []
            },
            sports: {
                message: '',
                subscribers: []
            }
        }
        this.brokerId = setInterval(() => { this.broker() }, 1000);
    }
    subscribe(subscriber, channel) {
        console.log(`subscribing to ${channel}`);
        this.channels[channel].subscribers.push(subscriber);
    }

    removeBroker() {
        clearInterval(this.brokerId);
    }

    publish(publisher, channel, message) {
        this.channels[channel].message = message;
    }

    broker() {
        for (const channel in this.channels) {
            if (this.channels.hasOwnProperty(channel)) {
                const channelObj = this.channels[channel];
                if (channelObj.message) {
                    console.log(`found message: ${channelObj.message} in ${channel}`);

                    channelObj.subscribers.forEach(subscriber => {
                        subscriber.send(JSON.stringify({
                            message: channelObj.message
                        }));
                    });
                    
                    channelObj.message = '';
                }
            }
        }
    }
}
module.exports = PubSubManager;

The Publisher-Subscriber pattern is implemented here using the PubSubManager class. This class provides APIs to publish a message to certain channel (topic), add subscribers to certain channel, a data structure to hold the channels and its messages and subscribers and finally a broker which continuously polls (naively using setInterval) this data structure for any new message. If any new message is published to any channel, it’s delivered by the broker on the following poll to this channel’s subscribers and removed immediately from the store.

<!DOCTYPE html>
<html>

<head>
    <style>
        body {
            font-family: Tahoma, Geneva, sans-serif;
        }

        div {
            display: inline;
        }
    </style>
    <script>
        function publish() {
            var message = document.getElementById('message').value;
            var channel = document.getElementById('channel').value;

            var host = window.document.location.host.replace(/:.*/, '');
            var ws = new WebSocket('ws://' + host + ':8080');
            ws.onopen = function () {
                ws.send(JSON.stringify({
                    request: 'PUBLISH',
                    message: message,
                    channel: channel
                }));
                ws.close();
            };
        }
    </script>
</head>

<body>
    <h1>Publisher</h1>
    <input type="text" id="channel" placeholder="Channel (weather, sports etc.)" />
    <input type="text" id="message" placeholder="What you want to publish?" />
    <button onclick="publish()">Publish</button>
</body>

</html>

In our implementation, the publisher is provided by pub.html, served statically by express. It can publish message under any channel (currently limited to weather and sports). To do so, it creates a connection with our WebSocket server, sends the channel, message and a request key to identify itself as publisher. It then immediately closes the connection. The server, on the other hand, parses this data and publishes the message under relevant topic through PubSubManager’s publish method.

<!DOCTYPE html>
<html>

<head>
    <style>
        body {
            font-family: Tahoma, Geneva, sans-serif;
        }

        div {
            display: inline;
        }
    </style>
    <script>
        function subscribe() {
            var message = document.getElementById('message');
            var channel = document.getElementById('channel').value;
            var host = window.document.location.host.replace(/:.*/, '');
            var ws = new WebSocket('ws://' + host + ':8080');
            ws.onopen = function () {
                ws.send(JSON.stringify({
                    request: 'SUBSCRIBE',
                    message: '',
                    channel: channel
                }));
                ws.onmessage = function(event){
                    data = JSON.parse(event.data);
                    message.innerHTML = data.message;
                };
            };
        }
    </script>
</head>

<body>
    <h1>Subscriber</h1>
    <input type="text" id="channel" placeholder="Channel (weather, sports etc.)" />
    <button onclick="subscribe()">Subscribe</button>
    <div>
        <h1>Message:</h1>
        <div id="message"></div>
    </div>
</body>

</html>

The subscriber implementation, provided by sub.html, works quite similarly to pub.html except the following differences:

  • No message is sent to the server, only the channel of interest. The server subscribes it to the relevant channel using the subscribe method of PubSubManager.
  • The connection with the server is kept indefinitely active, to allow any incoming update from the WebServer. The subscription happens through the subscribe method of PubSubManager by the server.

You can get the complete demo app here. Following are the steps to run the demo:

  • Go to app folder and get the server running:
    • cd pubsub
    • npm install
    • node index.js
  • By default the server runs on http://localhost:8080.
  • Open the subscriber at http://localhost:8080/sub.html. Input channel to subscribe to (say weather). Click Subscribe.
  • Open the publisher at http://localhost:8080/pub.html. Input channel (say weather) and message ( say Cloudy with brief showers). Press Publish.
  • The subscriber receives message under Message heading.
  • You can run multiple publishers and subscribers over different channels (currently limited to weather and sports).

There are many improvements possible in this demo implementation, right from a securing WebSocket server to persisting message queue. However, the goal here was to create a quick and working implementation. Please feel free to improve and build upon it. For those who love challenges, I would suggest implementing own WebSocket server using NodeJS (Next item on my bucket list 😉 ). You can get started with this MDN link and this RFC. 

Related useful links:

Leave a Reply