For a recent project, I had the need of communicating with an external server over WebSocket using the STOMP protocol. After searching and failing to find any libraries in Elixir that could do this, I was contemplating falling back to Python’s stomp.py and using ErlPort to communicate with the process. While this would definitely have been an acceptable solution, it would also be an overly complex solution, especially since STOMP is a very easy protocol and there are already some libraries in Elixir that can parse and create STOMP messages.
STOMP Messages
As it turns out it was really easy to roll out my own WebSocket client implementation that communicated using STOMP. Each STOMP message has a simple structure like this:
COMMAND
header1:value1
header2:value2
Body^@
The header and the body could be anything depending on the interaction between the server and the client. And there are only a few client commands that STOMP supports:
SEND
SUBSCRIBE
UNSUBSCRIBE
BEGIN
COMMIT
ABORT
ACK
NACK
DISCONNECT
With that, while it is definitely possible to parse and create messages on your own, I suggest using an existing tested library that already does this out of the box. I used Elephant
, but you could use any other library or even roll out your own implementation.
WebSocket Client
Again, for creating a WebSocket client in Elixir, there are a couple of popular implementations. WebSockex
is the easiest to get started with, especially if you do not know Erlang. But if you really need a fully battle tested library, there is also gun
. I will be assuming the use of WebSockex
which is what I used and which will also be a lot easier to follow in a blog. Starting a Web Socket client is very easy:
defmodule MyApp.WebSocketClient do
use WebSockex
@doc """
Pass debug: true inside state if you want to enable traces
"""
def start_link(%{url: url} = state) do
opts = if(Map.get(state, :debug, false), do: [debug: [:trace]], else: [])
WebSockex.start_link(url, __MODULE__, state, opts)
end
end
CONNECT
The first step in the STOMP client is to send a frame to connect to the server. We can use handle_connect
for that:
@impl WebSockex
def handle_connect(_conn, state) do
WebSockex.cast(self(), {:send_message, %Elephant.Message{command: :connect, headers: ["accept-version": "1.2"]}})
{:ok, state}
end
On a successful connection, the server sends us a CONNECTED
response. Let us first set up our client to receive and parse STOMP messages using handle_frame
:
@impl WebSockex
def handle_frame({:text, text}, state) do
Logger.debug("Handle frame - #{to_string(text)}")
case Elephant.Message.parse(text) do
{:ok, %Elephant.Message{} = message, _more} ->
ack(message, state)
handle_stomp_message(message, state)
{:error, :invalid} ->
Logger.warn("Invalid frame received. Ignoring...")
{:ok, state}
{:incomplete, message} ->
Logger.warn("Incomplete frame received. Parsed so far - #{inspect(message)}")
{:ok, state}
end
end
def handle_frame(any, state) do
Logger.warn("Ignoring unknown frame - #{inspect(any)}")
{:ok, state}
end
Pretty self explanatory, we receive the frame, parse is using Elephant
and call our custom handle_stop_message
to handle this message. I am ignoring all incomplete and invalid frames, but you could also update this to terminate the WebSocket connection if you prefer.
CONNECTED
and SUBSCRIBE
With our setup in handle_frame
, the code to handle CONNECTED
is very simple. We will also subscribe to two channels as soon as the subscription opens. You will need to customize this step as per your requirements.
defp handle_stomp_message(%Elephant.Message{command: :connected}, state) do
subscribe(@subscription_id_something, "/something")
subscribe(@subscription_id_something_else, "/something/else")
{:ok, state}
end
defp subscribe(id, destination),
do: WebSockex.cast(self(), {:send_message, %Elephant.Message{command: :subscribe, headers: [destination: destination, id: id]}})
What is the WebSockex.cast
you ask? It is just like a regular GenServer.cast
that sends an asynchronous message to our process which could decide to handle it as it likes inside handle_cast
. Here, we are handling two kinds of messages, :send_messages
and :close.
@impl WebSockex
def handle_cast({:send_message, %Elephant.Message{} = message}, state), do: {:reply, frame(message), state}
def handle_cast(:close, state), do: {:close, state |> Map.put(:close, true)}
defp frame(%Elephant.Message{} = message), do: {:text, Elephant.Message.format(message)}
MESSAGE
The messages from our subscriptions on the server will be received through the MESSAGE
frames. Handling it will be very specific to each application, so here is a very simple generic block that assumes that body is JSON encoded and calls a custom handle_subscription_message
to handle that message. I will leave the implementation of handle_subscription_message
up to you.
defp handle_stomp_message(%Elephant.Message{command: :message, body: body} = message, state) do
case Elephant.Message.get_header(message, "subscription") do
{:ok, @subscription_id_something} ->
handle_subscription_message(:something, message, Jason.decode!(body), state)
{:ok, @subscription_id_something_else} ->
handle_subscription_message(:something_else, message, Jason.decode!(body), state)
other ->
Logger.warn("Unhandled subscription #{other} - #{inspect(message)}")
{:ok, state}
end
end
To reply to any of the frames, all we need is to either return {:reply, frame, state}
from the handle methods or use WebSockex.cast
with the {send_message, frame}
which will reach our custom handle_cast
implementation to send the message to the server.
ACK
You might have noticed an ack
in handle_frame
. This is just to inform the server that we have received a message. Here is the implementation:
defp ack(%Elephant.Message{} = message, state) do
case Elephant.Message.get_header(message, "messageId") do
{:ok, id} ->
WebSockex.cast(self(), {:send_message, message_send(%{destination: "acknowledge", id: id}, state)})
_else -> {:error, :id_not_found}
end
end
defp message_send(%{destination: destination, id: id, body: body}, state) do
%Elephant.Message{
command: :send,
headers: [
destination: destination,
messageId: id
],
body: if(body, do: Jason.encode!(body), else: "")
}
end
The important thing to note about this is that the acknowledge destination and the messageId
header field would probably be different depending on your server implementation.
DISCONNECT
Finally, once you are done with the connection, you will need to send a DISCONNECT
frame to the server to inform of your intention to close. Based on the STOMP protocol, after sending a DISCONNECT
, you should also wait for a RECEIPT
of the disconnection from the server before actually closing the connection.
# Disconnect
WebSockex.cast(self(), {:send_message, %Elephant.Message{
command: :disconnect,
headers: [receipt: @some_unique_id]
}})
# Handle receipt
defp handle_stomp_message(%Elephant.Message{command: :receipt} = message, state) do
id = to_string(@some_unique_id)
case Elephant.Message.get_header(message, "receipt-id") do
{:ok, ^id} ->
{:close, state |> Map.put(:close, true)}
other ->
Logger.warn("Invalid receipt message - #{other} - #{inspect(message)}")
{:ok, state}
end
end
Close
The reply of {:close, state}
from the above implementation is enough to initiate a closure of the socket. We also put a value of true for the close
key inside the state to mark that we have initiated a closure of the socket. This is required if you would like to trap exits and perform specific behaviour based on whether this was a normal or an abnormal close.
@impl WebSockex
def terminate(_reason, %{close: true}), do: exit(:normal)
def terminate(reason, state) do
# If you need to trap abnormal exits for any reason
Logger.debug("Socket Terminating: - #{inspect reason} - #{inspect state}")
exit(:normal)
end
Given that this is a very custom implementation of the WebSocket client, it is easy to follow this to create your own STOMP client.