
























RabbitMQ is an enterprise grade open source messaging and streaming broker. In this blog, you will learn some basic concepts of RabbitMQ and how to use it in a Spring Boot application. Enjoy!
Before diving into the programmatic details, first some concepts need to be explained. Do realize that in this blog only the surface is scratched from what is possible with RabbitMQ. A detailed overview can be found in the official RabbitMQ documentation.
Several protocols are supported by RabbitMQ. In this blog, the AMQP 0-9-1 protocol will be used. AMQP stands for Advanced Message Queuing Protocol. RabbitMQ receives messages from a publisher, a producing application, and routes them to consumers, applications which processes the messages.
A publisher publishes messages to an exchange (like a mailbox). The exchange then routes the messages to queues using bindings. RabbitMQ then delivers the messages to the consumers which are subscribed to the queues.
The process is shown in the figure below.

In the examples in the remainder of this blog, you will make use of a Topic Exchange. There are different exchange types, but for the sake of simplicity, only one will be used. A topic exchange routes messages to one or many queues, based on a message routing key. Topic exchanges are commonly used for multicast routing of messages.
Sources used in this blog are available at GitHub in module topics.
Prerequisites for reading this blog are:
In order to get started, you navigate to the Spring Initializr and add the following dependencies:
You will build the following:
In order to send a general and a specific message, two http end points are created in the MessageController.
@RestController
public class MessageController {
private MessageService messageService;
public MessageController(MessageService messageService) {
this.messageService = messageService;
}
@RequestMapping(
method = RequestMethod.POST,
value = "send-general"
)
public ResponseEntity<Void> sendGeneralMessage(@RequestBody String message) {
messageService.sendMessage("event.general.message", message);
return new ResponseEntity<>(HttpStatus.CREATED);
}
@RequestMapping(
method = RequestMethod.POST,
value = "send-specific"
)
public ResponseEntity<Void> sendSpecificMessage(@RequestBody String message) {
messageService.sendMessage("event.specific.message", message);
return new ResponseEntity<>(HttpStatus.CREATED);
}
}
The requests are forwarded to a MessageService.sendMessage method, which takes a routingKey and the message as arguments. The message is taken from the http request body, the routingKey is hardcoded. Remember that the routingKey determines to which queue the message will be routed. In the service, you make use of Spring Boot’s RabbitTemplate in order to send the message to RabbitMQ.
@Service
public class MessageService {
private RabbitTemplate rabbitTemplate;
public MessageService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String routingKey, String message) {
rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE_NAME, routingKey, message);
}
}
Consumer A will consume general messages. The queue needs to be bound to the Topic Exchange with the routing key.
Create a RabbitMqConfig class with:
TopicExchange bean with name events.exchange.Queue bean for consumer A with name consumer-a.queue.TopicExchange with the routing key for the general messages.Do note that the name of the queue in method bindingConsumerA needs to match the queueConsumerA bean name.
Configuration
public class RabbitMqConfig {
public static final String QUEUE_CONSUMER_A = "consumer-a.queue";
public static final String TOPIC_EXCHANGE_NAME = "events.exchange";
public static final String ROUTING_KEY_GENERAL_MESSAGE = "event.general.*";
@Bean
TopicExchange eventsExchange() {
return new TopicExchange(TOPIC_EXCHANGE_NAME);
}
@Bean
public Queue queueConsumerA() {
return new Queue(QUEUE_CONSUMER_A, false);
}
@Bean
Binding bindingConsumerA(Queue queueConsumerA, TopicExchange exchange) {
return BindingBuilder.bind(queueConsumerA).to(exchange).with(ROUTING_KEY_GENERAL_MESSAGE);
}
}
Next thing to do, is to consume the messages from queue A. Create a Component named ReceiverA. Annotate the method for processing the messages with @RabbitListener and connect it to queue A. When receiving the message, just print it to the console.
@Component
public class ReceiverA {
@RabbitListener(queues = RabbitMqConfig.QUEUE_CONSUMER_A)
public void receiveMessage(String message) {
System.out.println("Queue Consumer A received <" + message + ">");
}
}
In order to run the application, you will need RabbitMQ. Since you have added the Docker Compose Support to the project earlier, you can just add a compose.yaml in the root of the repository.
services:
rabbitmq:
image: rabbitmq:3.13-management-alpine # Stable, lightweight, includes management UI
container_name: rabbitmq
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management console
environment:
RABBITMQ_DEFAULT_USER: secret
RABBITMQ_DEFAULT_PASS: myuser
Also add the connection parameters for RabbitMQ to the application.properties file.
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=secret spring.rabbitmq.password=myuser
Start the application.
You will notice that RabbitMQ is started automatically.
Send a general message.
curl -X POST http://localhost:8080/send-general \ -H "Content-Type: text/plain" \ -d "This is a general message"
The console log will print the following.
Queue Consumer A received <This is a general message>
Stop the application.
Consumer B will process general messages, but also specific messages. Add to the RabbitMqConfig the queue for consumer B, and bind it to the exchange with respectively the general message routing key and the specific message routing key.
@Configuration
public class RabbitMqConfig {
public static final String QUEUE_CONSUMER_A = "consumer-a.queue";
public static final String QUEUE_CONSUMER_B = "consumer-b.queue";
public static final String TOPIC_EXCHANGE_NAME = "events.exchange";
public static final String ROUTING_KEY_GENERAL_MESSAGE = "event.general.*";
public static final String ROUTING_KEY_SPECIFIC_MESSAGE = "event.specific.*";
...
@Bean
public Queue queueConsumerB() {
return new Queue(QUEUE_CONSUMER_B, false);
}
@Bean
Binding bindingConsumerBGeneral(Queue queueConsumerB, TopicExchange exchange) {
return BindingBuilder.bind(queueConsumerB).to(exchange).with(ROUTING_KEY_GENERAL_MESSAGE);
}
@Bean
Binding bindingConsumerBSpecific(Queue queueConsumerB, TopicExchange exchange) {
return BindingBuilder.bind(queueConsumerB).to(exchange).with(ROUTING_KEY_SPECIFIC_MESSAGE);
}
}
Consumer B is created just like consumer A. Create a ReceiverB class in order to receive the queue B messages.
@Component
public class ReceiverB {
@RabbitListener(queues = RabbitMqConfig.QUEUE_CONSUMER_B)
public void receiveMessage(String message) {
System.out.println("Queue Consumer B received <" + message + ">");
}
}
Start the application.
Send a general message.
curl -X POST http://localhost:8080/send-general \ -H "Content-Type: text/plain" \ -d "This is a general message"
The message is now received by Consumer A and Consumer B.
Queue Consumer B received <This is a general message> Queue Consumer A received <This is a general message>
Send a specific message.
curl -X POST http://localhost:8080/send-specific \ -H "Content-Type: text/plain" \ -d "This is a specific message"
The message is only received by Consumer B.
Queue Consumer B received <This is a specific message>
Do also take a look at the RabbitMQ management console which is accessible at http://localhost:15672/.
Here you can see the exchanges, the queues, the bindings, etc.
In this blog, you learned some basics of RabbitMQ using the AMQP 0-9-1 protocol. You learned how easy it is to integrate this within your Spring Boot application.
Subscribe to get the latest posts sent to your email.
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。