Geeking out on things that excite us

Tags


Binary data over Phoenix sockets

23rd November 2016

TL;DR

How to send raw binary data through channels in the Phoenix framework and read the data on the client. We will be using the MessagePack format to deliver the payload, gzipping when it makes sense to do so.

MessagePack you say? Well, what's wrong with JSON?

Nothing! No, really! Except... it could be leaner. You see over here at Stoiximan we are delivering lots and lots of data to our customers and so even minor gains from each payload will translate to lots of bytes saved in the end- both from our customers' mobile data plans and our server loads. Plus all the cool kids use raw bytes and we should too.

You mentioned gzipping above. Everybody and their moms know that gzipped JSON data are smaller than their msgpacked+gzipped counterparts!

True! But our payloads are usually small enough that gzipping does not make sense. So most of the time the data we send will only be msgpacked, and msgpack sizes are smaller (generally speaking) than json sizes. If your data do not fit this description though and your app is chunky rather than chatty, then by all means use JSON. That's right, this does mean less work for you!

Anything else I should know before I do this?

The techniques described here will only work with fairly modern browsers, i.e. those supporting websockets (which allow for raw byte sending and receiving) and TypedArrays, so IE 10 and above is what we're targeting here. If you need to support older browsers you will have to connect with the longpolling transport and send json data (possibly gzipped+base64'd -i.e. text data- but you should be taking the 4:3 base64 expansion into account before gzipping), doing some detective work capability detection work to find out what's supported- none of which will be shown here to keep this post focused.

OK, you got me somewhat interested and I'm not scared by the extra work, how do I do it?

Go ahead and create a new folder named binary_data_over_phoenix_sockets/, cd inside and create a sample project with mix phoenix.new . --no-ecto from the command line. Press Y when asked to fetch dependencies. Open up mix.exs and add msgpack-elixir 1 as a dependency:

# file: mix.exs

defp deps do  
  [
    # ...other deps go here
    {:message_pack, "~> 0.2.0"}
  ]
end  

Then run mix deps.get to download it.

First thing we need to do now is create our own msgpack serializer that we'll use with phoenix transports, looking of course at the default WebSocketSerializer.ex implementation for inspiration. Create folder web/transports and add new file message_pack_serializer.ex

# file: web/transports/message_pack_serializer.ex

defmodule BinaryDataOverPhoenixSockets.Transports.MessagePackSerializer do  
  @moduledoc false

  @behaviour Phoenix.Transports.Serializer

  alias Phoenix.Socket.Reply
  alias Phoenix.Socket.Message
  alias Phoenix.Socket.Broadcast

  # only gzip data above 1K
  @gzip_threshold 1024

  def fastlane!(%Broadcast{} = msg) do
    {:socket_push, :binary, pack_data(%{
      topic: msg.topic,
      event: msg.event,
      payload: msg.payload
    })}
  end

  def encode!(%Reply{} = reply) do
    packed = pack_data(%{
      topic: reply.topic,
      event: "phx_reply",
      ref: reply.ref,
      payload: %{status: reply.status, response: reply.payload}
    })
    {:socket_push, :binary, packed}
  end

  def encode!(%Message{} = msg) do
    # We need to convert the Message struct into a plain map for MessagePack to work properly.
    # Alternatively we could have implemented the Enumerable behaviour. Pick your poison :)
    {:socket_push, :binary, pack_data(Map.from_struct msg)}
  end

  # messages received from the clients are still in json format;
  # for our use case clients are mostly passive listeners and made no sense
  # to optimize incoming traffic
  def decode!(message, _opts) do
    message
    |> Poison.decode!()
    |> Phoenix.Socket.Message.from_map!()
  end

  defp pack_data(data) do
    msgpacked = MessagePack.pack!(data, enable_string: true)
    gzip_data(msgpacked, byte_size(msgpacked))
  end

  defp gzip_data(data, size) when size < @gzip_threshold, do: data
  defp gzip_data(data, _size), do: :zlib.gzip(data)
end  

Anyone who bothered to click on the default implementation link will immediately see that the code is essentially the same, except now we're using MessagePack to serialize the payload (duh) and telling phoenix that payloads are :binary not :text data (which relates to websocket frame handling).

Now open the web/channels/user_socket.ex file and override the default serializer with our own

# file: web/channels/user_socket.ex
...
  transport :websocket, Phoenix.Transports.WebSocket,
    serializer: BinaryDataOverPhoenixSockets.Transports.MessagePackSerializer
...

We will also need a channel to do the actual talking, so go ahead and run in the command line mix phoenix.gen.channel Test which will create the file web/channels/test_channel.ex. Follow the suggestion and add this channel to user_socket.ex

# file: web/channels/user_socket.ex

# add this line above the transport
channel "test:lobby", BinaryDataOverPhoenixSockets.TestChannel  

Note: depending on your phoenix version, you may need to run mix phoenix.gen.channel Test lobby instead.

We should now add two callbacks that return some sample responses to showcase both plain and gzipped payload delivery. Open up test_channel.ex and add the following lines inside the TestChannel module:

# file: web/channels/test_channel.ex

def handle_in("small", _payload, socket) do  
  push socket, "small_reply", %{"small response that will only be msgpacked" => true}
  {:noreply, socket}
end

def handle_in("large", _payload, socket) do  
  push socket, "large_reply", %{"large response that will be msgpacked+gzipped" =>  1..1000 |> Enum.map(fn _ -> 1000 end) |> Enum.into([])}
  {:noreply, socket}
end  

And that wraps up the server portion of things! Run mix phoenix.server to fire up the app, open your modern browser and navigate to http://localhost:4000

The client

Time to get crackin' on the client. First things first: we need to gunzip data sent from the server (if gzipped), and we need to unpack the msgpacked message. Unfortunately there is no native browser support for any of these things, so we'll need to get a little creative.

Unzipping:

We will be using a stripped down version of imaya's zlib.js, since we only need the unzipping part. Go ahead and create the file gunzip.js under web/static/js/vendor/, adding the contents from here.

// file: web/static/js/vendor/gunzip.js

// Paste in here the contents from https://github.com/StoiximanServices/blog/blob/master/binary_data_over_phoenix_sockets/web/static/js/vendor/gunzip.js
Unpacking:

Again we only need to unpack messages on the client, no need for packing. Searching around did not produce anything that fit our needs 2, so we get to roll our own MessagePack decoder- yay! Don't worry, the MessagePack spec is pretty straight forward -no need for lookahead/lookbehind- so modeling the decoder with a state machine should not pose too much of a problem (not to mention that this should be fast as well!)
Create file web/static/js/msgpack.js and put the following inside:

// file: web/static/msgpack.js

let formats = {  
  positiveFixIntStart: 0x00,
  positiveFixIntEnd: 0x07F,
  fixMapStart: 0x80,
  fixMapEnd: 0x8F,
  fixArrStart: 0x90,
  fixArrEnd: 0x9F,
  fixStrStart: 0xA0,
  fixStrEnd: 0xBF,
  nil: 0xC0,
  none: 0xC1,
  bFalse: 0xC2,
  bTrue: 0xC3,
  bin8: 0xC4,
  bin16: 0xC5,
  bin32: 0xC6,
  ext8: 0xC7,
  ext16: 0xC8,
  ext32: 0xC9,
  float32: 0xCA,
  float64: 0xCB,
  uint8: 0xCC,
  uint16: 0xCD,
  uint32: 0xCE,
  uint64: 0xCF,
  int8: 0xD0,
  int16: 0xD1,
  int32: 0xD2,
  int64: 0xD3,
  fixExt1: 0xD4,
  fixExt2: 0xD5,
  fixExt4: 0xD6,
  fixExt8: 0xD7,
  fixExt16: 0xD8,
  str8: 0xD9,
  str16: 0xDA,
  str32: 0xDB,
  array16: 0xDC,
  array32: 0xDD,
  map16: 0xDE,
  map32: 0xDF,
  negativeFixIntStart: 0xE0,
  negativeFixIntEnd: 0xFF
}

/*
Decode returns two element [pos, data] arrays: index 0 holds the new position of the parser, and index 1 contains the parsed data. We carry around the original  
binary data array to avoid copying to new slices, while updating the parser position and recursively calling decode until we've consumed all the buffer.  
Missing from this implementation is extension support- add it if you need it.  
*/
let decode = function(binaryData, start){

  start = start || 0;
  let format = binaryData[start];

  if(format <= formats.positiveFixIntEnd){
    return [start + 1, format - formats.positiveFixIntStart];
  }
  if(format <= formats.fixMapEnd){
    let keyCount = format - formats.fixMapStart;
    return parseMap(binaryData, keyCount, start + 1);
  }
  if(format <= formats.fixArrEnd){
    let len = format - formats.fixArrStart;
    return parseArray(binaryData, len, start + 1);
  }
  if(format <= formats.fixStrEnd){
    let len = format - formats.fixStrStart;
    return parseUtf8String(binaryData, len, start + 1);
  }

  let pos, len;

  switch(format){
    case formats.nil:
      return [start + 1, null];
    case formats.bFalse:
      return [start + 1, false];
    case formats.bTrue:
      return [start + 1, true];
    case formats.bin8:
      [pos, len] = parseUint(binaryData, 8, start + 1)
      return parseBinaryArray(binaryData, len, pos);
    case formats.bin16:
      [pos, len] = parseUint(binaryData, 16, start + 1)
      return parseBinaryArray(binaryData, len, pos);
    case formats.bin32:
      [pos, len] = parseUint(binaryData, 32, start + 1)
      return parseBinaryArray(binaryData, len, pos);
    case formats.float32:
      return parseFloat(binaryData, 32, start + 1);
    case formats.float64:
      return parseFloat(binaryData, 64, start + 1);
    case formats.uint8:
      return parseUint(binaryData, 8, start + 1);
    case formats.uint16:
      return parseUint(binaryData, 16, start + 1);
    case formats.uint32:
      return parseUint(binaryData, 32, start + 1);
    case formats.uint64:
      return parseUint(binaryData, 64, start + 1);
    case formats.int8:
      return parseInt(binaryData, 8, start + 1);
    case formats.int16:
      return parseInt(binaryData, 16, start + 1);
    case formats.int32:
      return parseInt(binaryData, 32, start + 1);
    case formats.int64:
      return parseInt(binaryData, 64, start + 1);
    case formats.str8:
      [pos, len] = parseUint(binaryData, 8, start + 1);
      return parseUtf8String(binaryData, len, pos);
    case formats.str16:
      [pos, len] = parseUint(binaryData, 16, start + 1);
      return parseUtf8String(binaryData, len, pos);
    case formats.str32:
      [pos, len] = parseUint(binaryData, 32, start + 1);
      return parseUtf8String(binaryData, len, pos);
    case formats.array16:
      [pos, len] = parseUint(binaryData, 16, start + 1);
      return parseArray(binaryData, len, pos);
    case formats.array32:
      [pos, len] = parseUint(binaryData, 32, start + 1);
      return parseArray(binaryData, len, pos);
    case formats.map16:
      [pos, len] = parseUint(binaryData, 16, start + 1);
      return parseMap(binaryData, len, pos);
    case formats.map32:
      [pos, len] = parseUint(binaryData, 32, start + 1);
      return parseMap(binaryData, len, pos);
  }

  if(format >= formats.negativeFixIntStart && format <= formats.negativeFixIntEnd){
    return [start + 1, - (formats.negativeFixIntEnd - format + 1)]
  }

  throw new Error("I don't know how to decode format ["+format+"]");
}

function parseMap(binaryData, keyCount, start){  
  let ret = {};
  let pos = start;
  for(let i = 0; i < keyCount; i++){
    let [keypos, key] = decode(binaryData, pos);
    pos = keypos;
    let [valpos, value] = decode(binaryData, pos)
    pos = valpos;
    ret[key] = value;
  }
  return [pos, ret];
}

function parseArray(binaryData, length, start){  
  let ret = [];
  let pos = start;
  for(let i = 0; i < length; i++){
    let [newpos, data] = decode(binaryData, pos)
    pos = newpos;
    ret.push(data);
  }
  return [pos, ret];
}

function parseUint(binaryData, length, start){  
  let num = 0;
  let pos = start;
  let count = length;
  while (count > 0){
    count-= 8;
    num += binaryData[pos] << count
    pos++;
  }
  return [pos, num];
}

function parseInt(binaryData, length, start){  
  let [pos, unum] = parseUint(binaryData, length, start);
  let s = 64 - length;
  //https://github.com/inexorabletash/polyfill/blob/master/typedarray.js
  return [pos, (unum << s) >> s];
}

function parseBinaryArray(binaryData, length, start){  
  let m = binaryData.subarray || binaryData.slice;
  let pos = start + length;
  return [pos, m.call(binaryData, start, pos)];
}

function parseFloat(binaryData, length, start){  
  let bytecount = length / 8;
  let view = new DataView(new ArrayBuffer(length));
  for(let i = start; i < bytecount; i++){
    view.setUint8(i-start, binaryData[i]);
  }
  let fnName = "getFloat"+length;
  let result = view[fnName](0, false);
  return [start + bytecount, result]
}

function parseUtf8String(data, length, start){  
  //from https://gist.github.com/boushley/5471599
  var result = [];
  var i = start;
  var c = 0;
  var c1 = 0;
  var c2 = 0;

  // If we have a BOM skip it
  if (length >= 3 && data[i] === 0xef && data[i+1] === 0xbb && data[i+2] === 0xbf) {
    i += 3;
  }

  let mark = length + start;
  while (i < mark) {
    c = data[i];
    if (c < 128) {
      result.push(String.fromCharCode(c));
      i++;
    } else if (c > 191 && c < 224) {
      if( i+1 >= data.length ) {
        throw "UTF-8 Decode failed. Two byte character was truncated.";
      }
      c2 = data[i+1];
      result.push(String.fromCharCode( ((c&31)<<6) | (c2&63) ));
      i += 2;
    } else {
      if (i+2 >= data.length) {
        throw "UTF-8 Decode failed. Multi byte character was truncated.";
      }
      c2 = data[i+1];
      c3 = data[i+2];
      result.push(String.fromCharCode( ((c&15)<<12) | ((c2&63)<<6) | (c3&63) ));
      i += 3;
    }
  }
  return [mark, result.join('')];
}

let msgpack = {  
  decode: function(binaryArray){
    return decode(binaryArray)[1];
  }
}

export default msgpack  

Phew! That was a mouthful. We're now ready to intercept the socket's incoming messages and use our decoder for parsing the data.
Again, we're going to look to the default implementation for inspiration.
We'll start with abstracting the msgpack parsing into it's own module for reusability and then use it inside socket.js. So start by creating new file web/static/js/binarySocket.js and
add the following:

//  file: web/static/js/binarySocket.js

import msgpack from "./msgpack"

/*lots of console.log() statements for educational purposes in this file, don't forget to remove them in production*/

function convertToBinary(socket){

  let parentOnConnOpen = socket.onConnOpen;

  socket.onConnOpen = function(){
    //setting this to arraybuffer will help us not having to deal with blobs
    this.conn.binaryType = 'arraybuffer';
    parentOnConnOpen.apply(this, arguments);
  }

  //we also need to override the onConnMessage function, where we'll be checking
  //for binary data, and delegate to the default implementation if it's not what we expected
  let parentOnConnMessage = socket.onConnMessage;

  socket.onConnMessage = function (rawMessage){
    if(!(rawMessage.data instanceof window.ArrayBuffer)){
      return parentOnConnMessage.apply(this, arguments);
    }
    let msg = decodeMessage(rawMessage.data);
    let topic = msg.topic;
    let event = msg.event;
    let payload = msg.payload;
    let ref = msg.ref;

    this.log("receive", (payload.status || "") + " " + topic + " " + event + " " + (ref && "(" + ref + ")" || ""), payload);
    this.channels.filter(function (channel) {
      return channel.isMember(topic);
    }).forEach(function (channel) {
      return channel.trigger(event, payload, ref);
    });
    this.stateChangeCallbacks.message.forEach(function (callback) {
      return callback(msg);
    });
  }

  return socket;
}

function decodeMessage(rawdata){  
  if(!rawdata){
    return;
  }

  let binary = new Uint8Array(rawdata);
  let data;
  //check for gzip magic bytes
  if(binary.length > 2 && binary[0] === 0x1F && binary[1] === 0x8B){
    let inflate = new window.Zlib.Gunzip(binary);
    data = inflate.decompress();
    console.log('received', binary.length, 'Bytes of gzipped data,', data.length, 'Bytes after inflating');
  }
  else{
    console.log('received', binary.length, 'Bytes of plain msgpacked data');
    data = binary;
  }
  let msg = msgpack.decode(data);
  return msg;
}

export default {  
  convertToBinary
}

Then let's head over to web/static/js/socket.js and replace all contents with the following

// file: web/static/js/socket.js

import {Socket} from "phoenix"  
import binarySocket from "./binarySocket"

/*the type=msgpack param is only added to distinguish this connection
from the phoenix live reload connection in the browser's network tab*/  
let socket = new Socket("/socket", {params: {type: "msgpack"}})

socket = binarySocket.convertToBinary(socket);

socket.connect()

//lets join the lobby
let channel = socket.channel("test:lobby", {})

channel.on("small_reply", function(data){  
  console.log("small reply: server responded with", data);
})

channel.on("large_reply", function(data){  
  console.log("large reply: server responded with", data);
})

channel.join()  
  .receive("ok", resp => {
    console.log("Joined successfully", resp)
    channel.push("small")
    channel.push("large")
  })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket  

Lastly we'll need to import socket.js contents into our app.js file. Open up web/static/js/app.js and uncomment the following line:

// file: web/static/js/app.js

import socket from "./socket"  

Your browser should have refreshed quite a lot of times by now; let's open the dev tools (F12), head over to the Network tab, switch to websocket frames debugging while keeping the console window open and then refresh your page once more. Make sure you're debugging the type=msgpack websocket connection (the other one is for phoenix's own live reload feature).
What you should be seeing if everything worked is the binary frames sent from the server, the text (json) frames sent from the client (remember that all the work we did was for the server to be able to send msgpacked data), and the messages in the console happily notifying us everytime a response is received (heartbeats for the most part).


network tools: websocket debugging

That's all folks! We hope you've enjoyed the post and we wish you lots of happy elixir coding!

P.S. All code and tests are of course on github.


  1. We've identified some perf gains that could be had with minor (bitstring appending instead of splicing) and not-so-minor (encode to iodata) tweaks and we've put those in our fork. If you decide to use this fork, change the mix dependency to {:messagepack, github: "StoiximanServices/msgpack-elixir"} and make sure to use MessagePack.packiodata!/2 instead of MessagePack.pack!/2 in the messagepackserializer.ex module.

  2. The official js implementation is msgpack-lite which we found a little hard to follow and use: adding it as an npm dependency to the project didn't work because of its reliance on the Buffer class that is only available on the Node.js env. It was however used for reference testing our hand-rolled implementation.

View Comments