Hello everyone! This article is a continuation to the series about Event-Driven Architecture (you can find the first one here).
Since we already had a basic introduction to EDA, now we should be comfortable starting our first app. To do this, we will use a technology called RabbitMQ. So let’s talk a little about it before anything.
A little about RabbitMQ
RabbitMQ is an open-source software built to be a messenger server using a protocol named Advanced Message Queuing Protocol (AMQP). This technology allows us to simply and reliably handle message traffic.
In RabbitMQ, a message is composed basically by:
- Payload: is the message sent by the producer to consumers. Supports various data formats, like JSON, Strings, and MPEG;
Properties: has information about the message and your destiny. With that, RabbitMQ can know who will receive the message, among other functions like the payload description.
With these informations and the ones in our previous post previous post we are able to start a basic app.
Project Setup
For first project with RabbitMQ we will use:
- Rails Famework (v 7.0.4);
- The RSpec test suit;
- The gem bunny as our RabbitMQ client;
- The gem bunny-mock for helping us with tests;
- The RabbitMQ service.
The repository can be found 👉 here, and you can use the README steps to start the project.
Presenting the Project
If your setup is successful, you will be able to access the RabbitMQ panel (you can read more about the RabbiMQ management plugin here) at http://localhost:15672/
(the default login and password are guest
) and see an panel like this one:
The panel shows a lot of information like queues, message rates, etc, and they can be used to monitor informations about the messages’ trafic during app use.
Starting the implementation
At first, let’s use the Bunny Gem to help us to create a connection with the RabbitMQ service:
class RabbitMq::Connection
attr :connection, :channel
# hostname: defines the RabbitMQ hostname to be accessed (in our case based on the docker configured)
# automatically_recover: In cause of fail we try or not to recover
def initialize
@connection = Bunny.new(hostname: ENV['RABBITMQ_HOST'], automatically_recover: false)
end
def start
connection.start
end
def close
connection.close
end
def create_channel
channel ||= connection.create_channel
end
end
And now you have a connection with RabbitMQ, being able to start/finish connections and create a queue.
If you read our first post, you know the concept of producer
and consumer
, so let´s start to create ou first producer client to have a connection and send messages to our consumers by the RabbitMQ queues.
To help us with code reuse, lets create a Client base class with the commom use resources:
class RabbitMq::Client::Base
attr_reader :queue, :channel
def initialize
@channel = connection.channel
@queue = @channel.queue(ENV['DEFAULT_ROUTING_KEY'], durable: true)
end
private
def connection
@connection ||= RabbitMq::Connection.new.start
end
end
And now we can create the Producer/Publisher class with our publisher method:
Publisher
class RabbitMq::Client::Publisher < RabbitMq::Client::Base
def publish_message(msg_payload)
queue.publish(
msg_payload,
routing_key: ENV['DEFAULT_ROUTING_KEY']
)
p "published message #{msg_payload}"
end
end
In this basic structure, you can see our queue instance from the parent class
and the publish method being called to send the message to the queue. Looking at the parameters, our publish method receives the message payload and routing key that helps the exchange choose the queue to send the message.
Exchange is a subject that will be covered in our next post, but to put it simply, exchanges are responsible for routing messages to different queues, depending on the headers, bindings, etc. In our case, the exchange is not direcly chose so the RabbitMQ will use an default one.
Now that we have a producer class, let’s build our consumer class to create one or more instances to listen/consume the producer messages:
Consumer
class RabbitMq::Client::Receiver < RabbitMq::Client::Base
def listen_messages
puts "Consuming messages"
queue.subscribe(manual_ack: false, block: true) do |_, _, payload|
puts "📢 Received '#{payload}'"
end
end
end
In this class, you can see one basic implementation of a consumer. In this post, we will create the implementation of the consumer on the same app as the producer to see how the basic features of RabbitMQ work, so that on the next post we can see these two classes communicating between two different apps.
The method used to receive messages uses the same queue where the producer sends the messages (by the class heritage), so he can access the message of the queue. The method subscribes allow us to listen to the messages, receiving in our case 2 parameters:
- manual_ack: (ack means acknowledgment) with the value false, that makes the message receiving confirmation to RabbitMQ automatic.
block: keep the thread alive listening to the messages.
Tests
For the tests, we will use the RSpec tests suit and the gem bunny-mock to help us recreate the Bunny behavior.
Testing RabbitMq::Connection
methods and attributes:
require 'rails_helper'
RSpec.describe RabbitMq::Connection, type: :service do
bunny_instance = BunnyMock.new
before do
allow(Bunny)
.to receive(:new)
.and_return(bunny_instance)
end
describe '#connection' do
it 'have a Bunny connection instance' do
expect(described_class.new.connection.class).to eq(bunny_instance.class)
end
end
describe '#channel' do
it 'has a Bunny channel instance' do
instance_with_channel = described_class.new.start.channel
bunny_with_channel = bunny_instance.start.channel
expect(instance_with_channel.class).to eq(bunny_with_channel.class)
end
end
describe '.create_channel' do
it 'return a Bunny channel instance' do
result = described_class.new.create_channel
expect(result.class).to eq(BunnyMock::Channel)
end
end
describe '.close' do
it 'close the Bunny connection' do
result = described_class.new
result.close
expect(result.connection.status).to eq(:closed)
end
end
describe '.start' do
it 'start a Bunny connection' do
result = described_class.new
result.start
expect(result.connection.status).to eq(:connected)
end
end
end
Testing the RabbitMq::Client::Base
methods and attributes:
require 'rails_helper'
RSpec.describe RabbitMq::Client::Base, type: :service do
bunny_instance = BunnyMock.new
before do
allow(RabbitMq::Connection)
.to receive(:new)
.and_return(bunny_instance)
end
describe '#channel' do
it 'has a Bunny connection instance' do
instance = RabbitMq::Client::Base.new
expect(instance.channel.class).to eq(bunny_instance.start.channel.class)
end
end
describe '#queue' do
it 'has a Bunny queue instance' do
expect(RabbitMq::Client::Base.new.queue.class).to eq(bunny_instance.channel.queue.class)
end
it 'has correct params' do
queue = RabbitMq::Client::Base.new.queue
expect(queue.name).to eq(ENV['DEFAULT_ROUTING_KEY'])
expect(queue.opts[:durable]).to eq(true)
end
end
end
Testing the RabbitMq::Client::Publisher
methods and attributes:
require 'rails_helper'
RSpec.describe RabbitMq::Client::Publisher, type: :service do
bunny_instance = BunnyMock.new
before do
allow(Bunny)
.to receive(:new)
.and_return(bunny_instance)
end
it 'has Client::Base inheritance' do
expect(described_class).to be < RabbitMq::Client::Base
end
it 'has a Bunny channel instance' do
expect(described_class.new.channel.class)
.to eq(bunny_instance.start.channel.class)
end
describe '.publish_message' do
let(:msg_payload) { double(:message) }
setup do
ENV['DEFAULT_ROUTING_KEY'] = 'foo'
end
it 'calls queue.publish with correct params' do
instance = described_class.new
expect(instance.queue)
.to receive(:publish)
.with(
msg_payload,
routing_key: ENV['DEFAULT_ROUTING_KEY']
)
instance.publish_message(msg_payload)
end
it 'add the message to the queue' do
instance = described_class.new
instance.publish_message 'foo'
expect(instance.queue.message_count).to eq(1)
end
end
end
Testing the RabbitMq::Client::Receiver
methods and attributes:
require 'rails_helper'
RSpec.describe RabbitMq::Client::Receiver, type: :service do
bunny_instance = BunnyMock.new
before do
allow(Bunny)
.to receive(:new)
.and_return(bunny_instance)
end
it 'has Client::Base inheritance' do
expect(described_class).to be < RabbitMq::Client::Base
end
context '.listen_messages' do
it 'calls with correct params' do
instance = described_class.new
expect(instance.queue)
.to receive(:subscribe)
.with(manual_ack: false, block: true)
instance.listen_messages
end
it 'listen and remove sent messages from the queue' do
instance = described_class.new
instance.queue.publish 'foo'
instance.listen_messages
expect(instance.queue.message_count).to eq(0)
end
end
end
Using the implementation
Now is a good time to see our implementation working, so open the Rails console and execute these two commands to produce two messages in a row:
Client::Publisher.new.publish_message({ "data" => { "subject" => "hi, im an event" } }.to_json)
Client::Publisher.new.publish_message({ "data" => { "subject" => "hi, im another event" } }.to_json)
After this, the messages will be present in the queue and ready to be received by one or more listeners. Let’s create an instance of a receiver to see the two messages present in the queue:
Client::Receiver.new.listen_messages
With this command, you will see in the terminal the output:
You can also access the RabbitMQ panel and see the informations about the message traffic 🙂.
Conclusion
In this post, we built our first app, able to send messages to a queue, and implemented a listener class. With this, you’ll be able to create a simple application with real-time messages between services and build a resilient app with the message queue.
In our next post, we will build a small, but complete system using these concepts. See you 🙂.
References
RabbitMQ: What is and how to use?
RabbitMQ Exchanges, routing keys and bindings
We want to work with you. Check out our "What We Do" section!