Sapan Diwakar

Software developer

Follow me on Twitter Check out my code on GitHub View some of my designs on Dribbble Take a look at my Linked In profile

Custom WebSocket implementation with ActionCable on Electron/Node

Are you using an ActionCable client on Electron app? You want some extra compression with messagepack? Read on!

NOTE: If you are looking for adding support for compression to your Rails backend, see the previous post.

Chromium already handles compressed websocket frames with the permessage-deflate protocol. But if you have a specific use case which warrants a custom websocket extension for compression, this post describes a way to use faye-websocket seamlessly with ActionCable supporting custom compression.

The first thing we need is to create a custom WebSocket wrapper than uses faye-websocket instead of the native WebSocket implementation.

import * as url from 'url';  
import * as _ from 'lodash';  
import FayeWebSocket = require('faye-websocket');  
import net = require('net');  
import PermessageDeflate = require('permessage-deflate');

export default class WebSocket {  
  public onmessage: (event: { data: string }) => void;
  public onopen: (event: any) => void;
  public onclose: (event: any) => void;
  public onerror: (event: any) => void;

  private socket: any;

  constructor(urlString, protocols) {
    this.socket = new FayeWebSocket.Client(urlString, protocols, {
      extensions: [PermessageDeflate]
    });
    this.socket.on('open', (event) => this.onopen && this.onopen(event));
    this.socket.on('message', event => this.onmessage && this.onmessage(event));
    this.socket.on('close', event => this.onclose && this.onclose(event));
  }

  close() {
    this.socket.close();
  }

  send(message: string) {
    this.socket.send(message);
  }

  public get protocol(): string { return this.socket.protocol; }
  public get readyState(): number { return this.socket.readyState; }
}

To use this implementation with ActionCable, do this before creating any connections:

ActionCable.WebSocket = WebSocket;  

The next step is to create and add our custom extension to support messagepack compression. If you followed the previous post, you already know the basics for creating an extension. We need a ClientSession, a ServerSession and a Session for generating offer, generating response and processing messages respectively.

Most of the boilerplate is from the permessage-deflate extension. The basic session class would look something like this:

export default class Session {  
  private options: any;
  protected acceptNoContextTakeover: boolean;
  protected requestNoContextTakeover: boolean;

  constructor(options: any) {
    this.options = options;
    this.acceptNoContextTakeover  = common.fetch(options, 'noContextTakeover', false);
    this.requestNoContextTakeover = common.fetch(options, 'requestNoContextTakeover', false);
  }

  close() {
    // Nothing to do
  }
}

Let's get the easy bits out. Processing incoming and outgoing messages is going to be very straightforward. So, we can go forward and add our processing code to the Session.

processIncomingMessage(message, callback) {  
  if (!msgpack) { return callback(null, message); }
  console.log('[PermessageMessagePack] Process incoming message', message.data);
  if (!message.rsv2) { return callback(null, message); }
  const decoded = _.attempt(() => JSON.stringify(msgpack.decode(message.data)));
  if (_.isError(decoded)) { return callback(null, message); }
  message.data = decoded;
  message.opcode = 2; // Mark data as binary
  callback(null, message);
}

processOutgoingMessage(message, callback) {  
  if (!msgpack) { return callback(null, message); }
  console.log('[PermessageMessagePack] Process outgoing message', message.data);
  const json = _.attempt(() => JSON.parse(message.data));
  if (_.isError(json)) { return callback(null, message); }
  message.data = msgpack.encode(json);
  message.rsv2 = true;
  callback(null, message);
}

The tricky bit here is understanding the rsv2 flag which needs to be set true when processing outgoing message. This flag indicates to the receiver that the message has passed through this protocol. Similarly, when processing incoming message, we need to make sure this flag is true which would tell us that the message was compressed with this protocol. The WebSocket protocol has three flags, rsv1, rsv2 and rsv3 for use for extensions. The rsv1 is already used by permessage-deflate so we need to use the second one here.

Next up, generating offers from a ClientSession:

export default class ClientSession extends Session {  
  private ownContextTakeover: boolean;
  private peerContextTakeover: boolean;

  generateOffer() {
    const offer: any = {};
    if (this.acceptNoContextTakeover) { offer.client_no_context_takeover = true; }
    if (this.requestNoContextTakeover) { offer.server_no_context_takeover = true; }
    return offer;
  }

  activate(params) {
    if (!common.validParams(params)) { return false; }
    if (this.requestNoContextTakeover && !params.server_no_context_takeover) { return false; }
    this.ownContextTakeover = !(this.acceptNoContextTakeover || params.client_no_context_takeover);
    this.peerContextTakeover = !params.server_no_context_takeover;
    return true;
  }
}

The final step for the extension, creating a server session.

export default class ServerSession extends Session {  
  private ownContextTakeover: boolean;
  private peerContextTakeover: boolean;

  private params: any;

  constructor(options, params) {
    super(options);
    this.params = params;
  }

  generateResponse() {
    var response: any = {};

    // https://tools.ietf.org/html/rfc7692#section-7.1.1.1
    this.ownContextTakeover = !this.acceptNoContextTakeover &&
                               !this.params.server_no_context_takeover;
    if (!this.ownContextTakeover) { response.server_no_context_takeover = true; }

    // https://tools.ietf.org/html/rfc7692#section-7.1.1.2
    this.peerContextTakeover = !this.requestNoContextTakeover &&
                                !this.params.client_no_context_takeover;
    if (!this.peerContextTakeover) { response.client_no_context_takeover = true; }

    return response;
  }
}

With this out of the way, we can now go forward and create the extension that handles creation of client and server sessions. All boilerplate code except that we now mark that this extension uses rsv2 field.

let Extension;

export class PermessageMessagePack {  
  private options: any;

  constructor(options) {
    _.extend(this, Extension);
    this.options = options;
  }
}

Extension = {  
  name: 'permessage-message-pack',
  type: 'permessage',
  rsv1: false,
  rsv2: true,
  rsv3: false,
  options: {},

  configure: function(options = {}) {
    common.validateOptions(_.extend(this.options, options), VALID_OPTIONS);
    return new PermessageMessagePack(options);
  },

  createClientSession: function() {
    return new ClientSession(this.options || {});
  },

  createServerSession: function(offers) {
    for (var i = 0, n = offers.length; i < n; i++) {
      if (common.validParams(offers[i])) { return new ServerSession(this.options || {}, offers[i]); }
    }
    return null;
  }
};

We can then use this Extension inside the custom WebSocket implementation. Just pass another parameter to the extensions option.

this.socket = new FayeWebSocket.Client(urlString, protocols, {  
  extensions: [
    PermessageMessagePack.configure({ noContextTakeover: true }),
    PermessageDeflate,
  ],
});