For a recent project, I had the need of communicating with an external server over WebSocket using the STOMP protocol. After searching and failing to find any libraries in Elixir that could do this, I was contemplating falling back to Python’s stomp.py and using ErlPort to communicate with the process. While this would definitely have been an acceptable solution, it would also be an overly complex solution, especially since STOMP is a very easy protocol and there are already some libraries in Elixir that can parse and create STOMP messages.

STOMP Messages

As it turns out it was really easy to roll out my own WebSocket client implementation that communicated using STOMP. Each STOMP message has a simple structure like this:

COMMAND
header1:value1
header2:value2

Body^@

The header and the body could be anything depending on the interaction between the server and the client. And there are only a few client commands that STOMP supports:

  • SEND
  • SUBSCRIBE
  • UNSUBSCRIBE
  • BEGIN
  • COMMIT
  • ABORT
  • ACK
  • NACK
  • DISCONNECT

With that, while it is definitely possible to parse and create messages on your own, I suggest using an existing tested library that already does this out of the box. I used Elephant, but you could use any other library or even roll out your own implementation.

WebSocket Client

Again, for creating a WebSocket client in Elixir, there are a couple of popular implementations. WebSockex is the easiest to get started with, especially if you do not know Erlang. But if you really need a fully battle tested library, there is also gun. I will be assuming the use of WebSockex which is what I used and which will also be a lot easier to follow in a blog. Starting a Web Socket client is very easy:

defmodule MyApp.WebSocketClient do
  use WebSockex

  @doc """
  Pass debug: true inside state if you want to enable traces
  """
  def start_link(%{url: url} = state) do
    opts = if(Map.get(state, :debug, false), do: [debug: [:trace]], else: [])
    WebSockex.start_link(url, __MODULE__, state, opts)
  end
end

CONNECT

The first step in the STOMP client is to send a frame to connect to the server. We can use handle_connect for that:

@impl WebSockex
def handle_connect(_conn, state) do
  WebSockex.cast(self(), {:send_message, %Elephant.Message{command: :connect, headers: ["accept-version": "1.2"]}})
  {:ok, state}
end

On a successful connection, the server sends us a CONNECTED response. Let us first set up our client to receive and parse STOMP messages using handle_frame:

@impl WebSockex
def handle_frame({:text, text}, state) do
  Logger.debug("Handle frame - #{to_string(text)}")
  case Elephant.Message.parse(text) do
    {:ok, %Elephant.Message{} = message, _more} ->
      ack(message, state)
      handle_stomp_message(message, state)
    {:error, :invalid} ->
      Logger.warn("Invalid frame received. Ignoring...")
      {:ok, state}
    {:incomplete, message} ->
      Logger.warn("Incomplete frame received. Parsed so far - #{inspect(message)}")
      {:ok, state}
  end
end
def handle_frame(any, state) do
  Logger.warn("Ignoring unknown frame - #{inspect(any)}")
  {:ok, state}
end

Pretty self explanatory, we receive the frame, parse is using Elephant and call our custom handle_stop_message to handle this message. I am ignoring all incomplete and invalid frames, but you could also update this to terminate the WebSocket connection if you prefer.

CONNECTED and SUBSCRIBE

With our setup in handle_frame, the code to handle CONNECTED is very simple. We will also subscribe to two channels as soon as the subscription opens. You will need to customize this step as per your requirements.

defp handle_stomp_message(%Elephant.Message{command: :connected}, state) do
  subscribe(@subscription_id_something, "/something")
  subscribe(@subscription_id_something_else, "/something/else")
  {:ok, state}
end

defp subscribe(id, destination),
    do: WebSockex.cast(self(), {:send_message, %Elephant.Message{command: :subscribe, headers: [destination: destination, id: id]}})

What is the WebSockex.cast you ask? It is just like a regular GenServer.cast that sends an asynchronous message to our process which could decide to handle it as it likes inside handle_cast. Here, we are handling two kinds of messages, :send_messages and :close.

@impl WebSockex
def handle_cast({:send_message, %Elephant.Message{} = message}, state), do: {:reply, frame(message), state}
def handle_cast(:close, state), do: {:close, state |> Map.put(:close, true)}

defp frame(%Elephant.Message{} = message), do: {:text, Elephant.Message.format(message)}

MESSAGE

The messages from our subscriptions on the server will be received through the MESSAGE frames. Handling it will be very specific to each application, so here is a very simple generic block that assumes that body is JSON encoded and calls a custom handle_subscription_message to handle that message. I will leave the implementation of handle_subscription_message up to you.

defp handle_stomp_message(%Elephant.Message{command: :message, body: body} = message, state) do
  case Elephant.Message.get_header(message, "subscription") do
    {:ok, @subscription_id_something} ->
      handle_subscription_message(:something, message, Jason.decode!(body), state)
    {:ok, @subscription_id_something_else} ->
      handle_subscription_message(:something_else, message, Jason.decode!(body), state)
    other ->
      Logger.warn("Unhandled subscription #{other} - #{inspect(message)}")
      {:ok, state}
  end
end

To reply to any of the frames, all we need is to either return {:reply, frame, state} from the handle methods or use WebSockex.cast with the {send_message, frame} which will reach our custom handle_cast implementation to send the message to the server.

ACK

You might have noticed an ack in handle_frame. This is just to inform the server that we have received a message. Here is the implementation:

defp ack(%Elephant.Message{} = message, state) do
  case Elephant.Message.get_header(message, "messageId") do
    {:ok, id} ->
      WebSockex.cast(self(), {:send_message, message_send(%{destination: "acknowledge", id: id}, state)})
    _else -> {:error, :id_not_found}
  end
end

defp message_send(%{destination: destination, id: id, body: body}, state) do
  %Elephant.Message{
    command: :send,
    headers: [
      destination: destination,
      messageId: id
    ],
    body: if(body, do: Jason.encode!(body), else: "")
  }
end

The important thing to note about this is that the acknowledge destination and the messageId header field would probably be different depending on your server implementation.

DISCONNECT

Finally, once you are done with the connection, you will need to send a DISCONNECT frame to the server to inform of your intention to close. Based on the STOMP protocol, after sending a DISCONNECT, you should also wait for a RECEIPT of the disconnection from the server before actually closing the connection.

# Disconnect
WebSockex.cast(self(), {:send_message, %Elephant.Message{
  command: :disconnect,
  headers: [receipt: @some_unique_id]
}})

# Handle receipt 
defp handle_stomp_message(%Elephant.Message{command: :receipt} = message, state) do
  id = to_string(@some_unique_id)
  case Elephant.Message.get_header(message, "receipt-id") do
    {:ok, ^id} ->
      {:close, state |> Map.put(:close, true)}
    other ->
      Logger.warn("Invalid receipt message - #{other} - #{inspect(message)}")
      {:ok, state}
  end
end

Close

The reply of {:close, state} from the above implementation is enough to initiate a closure of the socket. We also put a value of true for the close key inside the state to mark that we have initiated a closure of the socket. This is required if you would like to trap exits and perform specific behaviour based on whether this was a normal or an abnormal close.

@impl WebSockex
def terminate(_reason, %{close: true}), do: exit(:normal)
def terminate(reason, state) do
  # If you need to trap abnormal exits for any reason 
  Logger.debug("Socket Terminating: - #{inspect reason} -  #{inspect state}")
  exit(:normal)
end

Given that this is a very custom implementation of the WebSocket client, it is easy to follow this to create your own STOMP client.