In this post, we will explore how event-driven architecture can make your app more responsive to users and decouple your modules for a better developer experience.

Event-Driven Architecture

We will also look into several methods to implement event-driven architecture with Elixir. Elixir is particularly good for this because of the advanced and concise message-passing APIs that it offers and BEAM’s outstanding support for concurrency.

But first: what is event-driven architecture, exactly?

Event-Driven Architecture: An Introduction

Event-driven architecture is where events control the behavior and flow of your application. The main components of the architecture are event producers, event buses, and event consumers.

An event could be anything that represents a change of state in the system. For example, in an e-commerce application, the purchase of a product by the user could produce a sold event which the consumer can then process to update inventory.

An event-based architecture allows applications to act on events as they occur. Different parts of an application work and develop relatively independently in well-crafted event-based design. Organizations can assign separate teams to focused parts of the application and streamline the workflow. This also creates a clear boundary between different parts of an application, assisting in future scalability exercises.

Event-driven architecture has mainly gained popularity with microservice-based products but can also be used for a monolith.

Things are always clearer with an example, so let’s look at one now.

Building Blocks of Event-Driven Architecture

Let’s discuss each building block in detail, using an e-commerce application as an example.

Imagine that a user makes a new purchase on a website. The new order is an event generated by the part that controls the ordering: the event producer.

The event can be pushed onto an event bus. The event bus could be anything, such as:

  • A table in the database.
  • An in-memory event queue inside the app.
  • An external tool like RabbitMQ or Apache Kafka.

The event consumers interested in this type of event can subscribe to the event bus. The event is delivered to them, and they do some processing on top of it.

For example, an inventory management system would subscribe to the new order event and update the inventory of the product. Another system could also pick the same event in the application - for example, a fulfillment service might process that event and create a delivery route for the product.

Benefits of Event-Driven Architecture

There are several advantages of using an event-driven architecture instead of one that processes everything sequentially.

Event-driven architecture allows us to build several independent parts of an application to work off the same event and do different focused tasks. This can be advantageous for a couple of reasons:

  • One team needs to focus on only one part.
  • The application code for small parts can be simple.

Another advantage of the design is that it allows us to deliver a snappy interface to the user. As an example from the above application, a user needs to make an order. The application can continue processing all other non-interactive tasks, like updating its inventory and interacting with the delivery application without the user’s attention.

This also makes adding new processing steps in the event pipeline very easy. For example, let’s say we need an additional task to be performed from an event. We only need to add a new consumer to handle the event without touching any other parts of the application.

Finally, it is much easier to scale each individual module if they are decoupled than to scale a whole application together. This is even more beneficial when you have a part that takes much more resources than its counterparts in the event processing pipeline.

But event-driven architecture does not come without disadvantages when not properly thought out. If applied to very simple problems, it can lead to complex workflows that are slow and difficult to debug.

Let’s explore some simple ways event-driven architecture can be implemented with Elixir, without the need to write complex pieces of code.

Synchronous Event-Driven Architecture in Elixir

The simplest (and the most inefficient) way to run the above flow would be to do everything synchronously in the user request.

So if you have an Orders module that processes a user’s order request, the synchronous implementation could look like this:

defmodule Orders do
  def create_order(attrs) do
    {:ok, order} = save_order(attrs)
    {:ok, _inventory} = update_inventory(order)
    {:ok, _delivery} = create_delivery(order)
    {:ok, order}
  end
end

We can improve this to more easily scale for new event consumers:

defmodule Orders do
  @event_consumers [
    {Inventory, :handle_event},
    {Delivery, :handle_event},
  ]

  def create_order(attrs) do
    {:ok, order} = save_order(attrs)

    event = %Orders.Event{type: :new_order, payload: order}
    @event_consumers
    |> Enum.each(fn {module, func} ->
      apply(module, func, [event])
    end)

    {:ok, order}
  end
end

With this implementation, all we need to do to add new consumers is to add the specification in the @event_consumers array, and those consumers can work independently.

While the synchronous approach works well for a small number of consumers, it has a disadvantage. Creating an order might take a long time because you will need to wait for the inventory to update and the delivery to be created.

These are all internal tasks that can be performed without user interaction and moved from the synchronous chain.

Let’s see how we can streamline event-driven architecture further using GenServer.

Using GenServer for Event-Driven Architecture in Elixir

For this implementation, we will run a separate process for each consumer. These will subscribe to an event from a producer and run their tasks concurrently.

For the event bus, we can use Phoenix.PubSub. Note that it is possible for apps that don’t use Phoenix to use Registry as a PubSub directly.

Let’s look at the producer first.

defmodule Orders do
  def create_order(attrs) do
    {:ok, order} = save_order(attrs)
    event = %Orders.Event{type: :new_order, payload: order}
    Phoenix.PubSub.broadcast(:my_app, "new_order", event)
    {:ok, order}
  end
end

On a new order, we create the event struct and use Phoenix.PubSub.broadcast/3 to broadcast that event on the bus. As you can see, it is much simpler than the previous implementation where the Orders module processed tasks from the other module serially.

The consumers can then subscribe to the new_order topic and implement the handle_info/2 to be notified every time a new event is published by the producer.

defmodule Inventory do
  use GenServer

  def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)

  def init(_opts) do
    Phoenix.PubSub.subscribe(:my_app, "new_order")
  end

  def handle_info(%Orders.Event{type: :new_order, payload: order}, state) do
    state = consume(state, order.product)
    {:noreply, state}
  end
end

The Delivery module will be very similar to the above, so I am skipping it here.

As you can see, this is much better than the previous implementations. Inventory and Delivery modules can independently subscribe to the new_order topic. The Orders module broadcasts to this topic on new orders and events are delivered to the subscribed processes.

You could even distribute this between multiple nodes and Phoenix.PubSub (with a PG, Redis, or another adapter), spreading the events to all nodes.

Great, right? Not really. There are several issues with this approach:

  • PubSub provides a real-time broadcast without message queueing, so if one of the subscriber processes is down, it might miss the broadcasts.
  • If the subscriber does some heavy work, it might not be able to keep up with the incoming messages, resulting in timeouts and consequently crashing the process tree.
  • If the subscriber experiences an error when processing a message, it is considered consumed and won’t be retried later.

So this approach is a bad one to follow for our current use case.

However, this approach still has its use-cases. It can be used for tasks that aren’t critical, or that can be corrected with the next message: for example, if a task computes the suggestions for a user’s next purchases based on their last purchase. While this would also need to be triggered on a new order, this isn’t exactly critical (for a traditional e-commerce website) and can re-compute suggestions for a user on their next purchase.

Event-Driven Implementation Using GenStage in Elixir

In the previous section, we saw a great implementation of our event-driven system using GenServer. But it didn’t come without its limitations. Let’s see how GenStage fares.

GenStage makes a clear distinction between producers, consumers, and producer_consumers, and each process has to pick one when it starts (in its init/1). In our case, both Inventory and Deliver are consumers, and Orders is a producer.

This is where things start to get a little complicated. GenStage has a concept of demand. Each consumer can issue a demand of how many events it can handle. The producer needs to send those events to the consumer. Let’s see a basic producer in action.

defmodule Orders do
  use GenStage

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_opts) do
    {:producer, :some_state_which_does_not_currently_mattere}
  end

  def create_order(pid, attrs) do
    GenStage.cast(pid, {:create_order, attrs})
  end

  def handle_cast({:create_order, attrs}, state) do
    {:ok, order} = save_order(attrs)
    {:noreply, [%Orders.Event{type: new_order, payload: order}], state}
  end

  def handle_demand(_demand, state), do: {:noreply, [], state}
end

The meat of our code is in handle_cast, where we save the order and return a tuple like {:noreply, events, new_state}. The new events are stored in an internal GenStage buffer and dispatched to the consumers as they make new demands (or immediately, if there are consumers with unmet demand).

Let’s check out a sample implementation of the consumer:

defmodule Inventory do
  use GenStage

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_opts) do
    {:consumer, [], subscribe_to: [Orders]}
  end

  def handle_events(events, _from, state) do
    state = Enum.reduce(events, state, & handle_event(&1, &2))
    {:noreply, [], state}
  end

  def handle_event(%Orders.Event{type: :new_order, payload: order}, state) do
    new_state = update_inventory(order)
    new_state
  end
end

In the consumer, first notice that we have a subscribe_to inside init/1. This automatically subscribes Inventory to any events published by Orders. Please check the GenStage documentation for additional options available in init.

Here, inside handle_events/3, most of the work happens, which is automatically called by GenStage as soon as new events become available. We handle the new_order event here, updating the inventory and returning a new state.

With this simple implementation, we get several benefits that outpace the GenServer implementation:

  • Automatic buffering of events inside GenStage’s internal buffer when the producer has new events without any available consumer.

Even if consumers are down when some events are produced, we are still guaranteed to receive them when the consumer comes back up.

Check out Genstage’s guide on buffering demand for advanced buffering logic.

  • Automatic distribution of work on multiple consumers.

If you have heavy consumer tasks, you can start multiple consumer processes. The default DemandDispatcher for GenStage will distribute the work evenly across all processes.

See GenStage.Dispatcher for other dispatch strategies to distribute events to all consumers or partition distribution to consumers based on a hash function.

But as with GenServer or synchronous implementation, using GenStage does not come without its problems.

If a consumer crashes while it is processing an event, GenStage will consider the event delivered and will not send it again when the consumer comes back up.

So make sure that you properly track crashes by using a monitoring service like AppSignal. AppSignal is quick to install for your Elixir app and helps you monitor performance as well as track errors. Here’s an example of an error tracking dashboard that AppSignal provides:

You can set up notifications for crashes. Cache such events in a persistent store once they are delivered to the consumer, to get them back when you recover after a crash.

Be very cautious about producing too many events without enough consumers, though. While GenStage offers automatic buffering of events, this buffer has a (configurable) maximum size and a practical maximum size constrained by the server’s memory.

If you don’t control the frequency with which events are produced, consider using an external data store like Redis or Postgres to buffer events.

Wrap Up: Event-Driven Architecture in Elixir - Going Beyond GenStage

In this post, we examined three approaches to implementing an event-driven system in Elixir: synchronous, using GenServer, and finally, using GenStage. We took a look at some of the advantages and disadvantages of each approach.

The simple GenStage example can be a starting point for implementing complex event-driven data processing pipelines that span multiple nodes. I suggest you read the great GenStage documentation for more information.

If you are looking for an even higher-level abstraction, Broadway is a good starting point. It is built on top of GenStage and offers several additional features, including consuming data from external queues like Amazon SQS, Apache Kafka, and RabbitMQ.

Until next time, happy coding!


This article was originally posted on AppSignal Blog