Event-Driven Architecture #2: Our First App

Hello everyone! This article is a continuation to the series about Event-Driven Architecture (you can find the first one here).
Since we already had a basic introduction to EDA, now we should be comfortable starting our first app. To do this, we will use a technology called RabbitMQ. So let’s talk a little about it before anything.

A little about RabbitMQ

RabbitMQ is an open-source software built to be a messenger server using a protocol named Advanced Message Queuing Protocol (AMQP). This technology allows us to simply and reliably handle message traffic.

In RabbitMQ, a message is composed basically by:

  • Payload: is the message sent by the producer to consumers. Supports various data formats, like JSON, Strings, and MPEG;

  • Properties: has information about the message and your destiny. With that, RabbitMQ can know who will receive the message, among other functions like the payload description.

With these informations and the ones in our previous post previous post we are able to start a basic app.

Project Setup

For first project with RabbitMQ we will use:

The repository can be found 👉 here, and you can use the README steps to start the project.

Presenting the Project

If your setup is successful, you will be able to access the RabbitMQ panel (you can read more about the RabbiMQ management plugin here) at http://localhost:15672/ (the default login and password are guest) and see an panel like this one:

The panel shows a lot of information like queues, message rates, etc, and they can be used to monitor informations about the messages’ trafic during app use.

Starting the implementation

At first, let’s use the Bunny Gem to help us to create a connection with the RabbitMQ service:

class RabbitMq::Connection
  attr :connection, :channel

  # hostname: defines the RabbitMQ hostname to be accessed (in our case based on the docker configured)
  # automatically_recover: In cause of fail we try or not to recover

  def initialize
    @connection = Bunny.new(hostname: ENV['RABBITMQ_HOST'], automatically_recover: false)
  end

  def start
    connection.start
  end

  def close
    connection.close
  end

  def create_channel
    channel ||= connection.create_channel
  end
end

And now you have a connection with RabbitMQ, being able to start/finish connections and create a queue.

If you read our first post, you know the concept of producer and consumer, so let´s start to create ou first producer client to have a connection and send messages to our consumers by the RabbitMQ queues.

To help us with code reuse, lets create a Client base class with the commom use resources:

class RabbitMq::Client::Base
  attr_reader :queue, :channel

  def initialize
    @channel = connection.channel
    @queue = @channel.queue(ENV['DEFAULT_ROUTING_KEY'], durable: true)
  end

  private

  def connection
    @connection ||= RabbitMq::Connection.new.start
  end
end

And now we can create the Producer/Publisher class with our publisher method:

Publisher

class RabbitMq::Client::Publisher < RabbitMq::Client::Base

  def publish_message(msg_payload)
    queue.publish(
      msg_payload, 
      routing_key: ENV['DEFAULT_ROUTING_KEY']
    )
    p "published message #{msg_payload}"
  end

end

In this basic structure, you can see our queue instance from the parent class and the publish method being called to send the message to the queue. Looking at the parameters, our publish method receives the message payload and routing key that helps the exchange choose the queue to send the message.

Exchange is a subject that will be covered in our next post, but to put it simply, exchanges are responsible for routing messages to different queues, depending on the headers, bindings, etc. In our case, the exchange is not direcly chose so the RabbitMQ will use an default one.

Now that we have a producer class, let’s build our consumer class to create one or more instances to listen/consume the producer messages:

Consumer

class RabbitMq::Client::Receiver < RabbitMq::Client::Base
  def listen_messages
    puts "Consuming messages"

    queue.subscribe(manual_ack: false, block: true) do |_, _, payload|
      puts "📢 Received '#{payload}'"
    end
  end
end

In this class, you can see one basic implementation of a consumer. In this post, we will create the implementation of the consumer on the same app as the producer to see how the basic features of RabbitMQ work, so that on the next post we can see these two classes communicating between two different apps.

The method used to receive messages uses the same queue where the producer sends the messages (by the class heritage), so he can access the message of the queue. The method subscribes allow us to listen to the messages, receiving in our case 2 parameters:

  • manual_ack: (ack means acknowledgment) with the value false, that makes the message receiving confirmation to RabbitMQ automatic.

  • block: keep the thread alive listening to the messages.

Tests

For the tests, we will use the RSpec tests suit and the gem bunny-mock to help us recreate the Bunny behavior.

Testing RabbitMq::Connection methods and attributes:

require 'rails_helper'

RSpec.describe RabbitMq::Connection, type: :service do
  bunny_instance = BunnyMock.new

  before do
    allow(Bunny)
      .to receive(:new)
      .and_return(bunny_instance)
  end

  describe '#connection' do
    it 'have a Bunny connection instance' do
      expect(described_class.new.connection.class).to eq(bunny_instance.class)
    end
  end

  describe '#channel' do
    it 'has a Bunny channel instance' do
      instance_with_channel = described_class.new.start.channel
      bunny_with_channel = bunny_instance.start.channel

      expect(instance_with_channel.class).to eq(bunny_with_channel.class)
    end
  end

  describe '.create_channel' do
    it 'return a Bunny channel instance' do
      result = described_class.new.create_channel

      expect(result.class).to eq(BunnyMock::Channel)
    end
  end

  describe '.close' do
    it 'close the Bunny connection' do
      result = described_class.new
      result.close

      expect(result.connection.status).to eq(:closed)
    end
  end

  describe '.start' do
    it 'start a Bunny connection' do
      result = described_class.new
      result.start

      expect(result.connection.status).to eq(:connected)
    end
  end
end

Testing the RabbitMq::Client::Base methods and attributes:

require 'rails_helper'

RSpec.describe RabbitMq::Client::Base, type: :service do
  bunny_instance = BunnyMock.new

  before do
    allow(RabbitMq::Connection)
      .to receive(:new)
      .and_return(bunny_instance)
  end

  describe '#channel' do
    it 'has a Bunny connection instance' do
      instance = RabbitMq::Client::Base.new

      expect(instance.channel.class).to eq(bunny_instance.start.channel.class)
    end
  end

  describe '#queue' do
    it 'has a Bunny queue instance' do
      expect(RabbitMq::Client::Base.new.queue.class).to eq(bunny_instance.channel.queue.class)
    end

    it 'has correct params' do
      queue = RabbitMq::Client::Base.new.queue

      expect(queue.name).to eq(ENV['DEFAULT_ROUTING_KEY'])
      expect(queue.opts[:durable]).to eq(true)
    end
  end
end

Testing the RabbitMq::Client::Publisher methods and attributes:

require 'rails_helper'

RSpec.describe RabbitMq::Client::Publisher, type: :service do
  bunny_instance = BunnyMock.new

  before do
    allow(Bunny)
      .to receive(:new)
      .and_return(bunny_instance)
  end

  it 'has Client::Base inheritance' do
    expect(described_class).to be < RabbitMq::Client::Base
  end

  it 'has a Bunny channel instance' do
    expect(described_class.new.channel.class)
      .to eq(bunny_instance.start.channel.class)
  end

  describe '.publish_message' do
    let(:msg_payload) { double(:message) }

    setup do
      ENV['DEFAULT_ROUTING_KEY'] = 'foo'
    end

    it 'calls queue.publish with correct params' do
      instance = described_class.new

      expect(instance.queue)
        .to receive(:publish)
        .with(
          msg_payload,
          routing_key: ENV['DEFAULT_ROUTING_KEY']
        )

      instance.publish_message(msg_payload)
    end

    it 'add the message to the queue' do
      instance = described_class.new
      instance.publish_message 'foo'

      expect(instance.queue.message_count).to eq(1)
    end
  end
end

Testing the RabbitMq::Client::Receiver methods and attributes:

require 'rails_helper'

RSpec.describe RabbitMq::Client::Receiver, type: :service do
  bunny_instance = BunnyMock.new

  before do
    allow(Bunny)
      .to receive(:new)
      .and_return(bunny_instance)
  end

  it 'has Client::Base inheritance' do
    expect(described_class).to be < RabbitMq::Client::Base
  end

  context '.listen_messages' do

    it 'calls with correct params' do
      instance = described_class.new
      expect(instance.queue)
        .to receive(:subscribe)
        .with(manual_ack: false, block: true)

      instance.listen_messages
    end

    it 'listen and remove sent messages from the queue' do
      instance = described_class.new

      instance.queue.publish 'foo'
      instance.listen_messages

      expect(instance.queue.message_count).to eq(0)
    end
  end
end

Using the implementation

Now is a good time to see our implementation working, so open the Rails console and execute these two commands to produce two messages in a row:

Client::Publisher.new.publish_message({ "data" => { "subject" => "hi, im an event" } }.to_json)

Client::Publisher.new.publish_message({ "data" => { "subject" => "hi, im another event" } }.to_json)

After this, the messages will be present in the queue and ready to be received by one or more listeners. Let’s create an instance of a receiver to see the two messages present in the queue:

Client::Receiver.new.listen_messages

With this command, you will see in the terminal the output:

You can also access the RabbitMQ panel and see the informations about the message traffic 🙂.

Conclusion

In this post, we built our first app, able to send messages to a queue, and implemented a listener class. With this, you’ll be able to create a simple application with real-time messages between services and build a resilient app with the message queue.

In our next post, we will build a small, but complete system using these concepts. See you 🙂.

References

What is RabbitMQ?

RabbitMQ documentation

RabbitMQ: What is and how to use?

RabbitMQ Exchanges, routing keys and bindings

We want to work with you. Check out our "What We Do" section!