


















Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景:
AMQP 是 Advanced Message Queuing Protocol 的简称,它是一个面向消息中间件的开放式标准应用层协议。AMQP定义了这些特性:
RabbitMQ 是以 AMQP 协议实现的一种中间件产品,也称为面向消息的中间件,它可以支持多种操作系统,多种编程语言,几乎可以覆盖所有主流的企业级技术平台。下面介绍 RabbitMQ 的基本概念:
Exchange:消息交换机,是消息第一个到达的地方,消息通过他指定的路由规则分发到不同的消息队列中去,有如下几种类型:
快速入门
我们通过在 Spring Boot 应用中整合 RabbitMQ ,实现一个简单的发送、接收消息的示例:
创建项目
创建一个 Spring Boot 项目,命名为 spring-boot-rabbitmq,并增加 spring-boot-starter-amqp 依赖,pom.xml 文件内容如下:
<?xmlversion="1.0"encoding="UTF-8"?>
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.lixue.bus</groupId>
<artifactId>spring-boot-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-rabbitmq</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.12.RELEASE</version>
<relativePath/><!--lookupparentfromrepository-->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
创建配置
在 src/main/resources 目录中创建 application.yml 配置文件,并增加 RabbitMQ 相关配置,内容如下:
#配置应用名称
spring:
application:
name:rabbitmq-bus
#配置RabbitMQ信息
rabbitmq:
#配置连接信息
addresses:192.168.2.215:5672
username:lixue
password:liyong
virtual-host:/
#开启发送确认模式
publisher-confirms:true
消息生产者
创建消息生产者 Sender 类,通过注入 AmqpTemplate 接口的实例(org.springframework.amqp.rabbit.core.RabbitTemplate)来实现消息的发送,AmqpTemplate 接口定义了一套针对 AMQP 协议的基础操作,Spring Boot 会根据配置来注入其具体实现。
package org.lixue.bus;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class Sender{
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context="Hello"+new Date();
//将消息发送到路由Key=send的队列
amqpTemplate.convertAndSend("send",context);
}
}
消息消费者
创建消息消费者 Receiver 类,通过注入 AmqpTemplate 接口的实例(org.springframework.amqp.rabbit.core.RabbitTemplate)来实现消息的接收处理,也可以使用 @RabbitListener 和 @RabbitHandler 注解 来指定消息接收的队列和消息处理方法,使用注解处理消息如下:
package org.lixue.bus;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues="send")
public class Receiver{
@RabbitHandler
public void process(String message){
System.out.println("receive:"+message);
}
}
如果不使用注解,可以通过 AmqpTemplate 接口的实例来获取队列的消息,代码如下:
package org.lixue.bus;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Receiver{
@Autowired
private AmqpTemplate amqpTemplate;
public void receive(){
String val=(String)amqpTemplate.receiveAndConvert("send");
if(val!=null){
System.out.println("receive:"+message);
}
}
}
创建配置类
创建 RabbitMQ 的配置类 RabbitMQConfig,用来配置队列、交换器、路由等高级信息,代码如下:
package org.lixue.bus;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig{
/**
*配置队列相关
*/
@Bean
public Queue newQueue(){
return new Queue("send");
}
@Bean
public Queue newQueue2(){
return new Queue("send2");
}
/**
*配置交换器相关
*/
@Bean
public Exchange newDirectExchange(){
return new DirectExchange("directExchange",true,true);
}
@Bean
public Exchange newTopicExchange(){
return new TopicExchange("topicExchange",true,true);
}
@Bean
public Exchange newFanoutExchange(){
return new FanoutExchange("fanoutExchange",true,true);
}
/**
*配置队列和交换器绑定
*/
@Bean
public Binding newDirectBinding(){
return BindingBuilder.bind(newQueue()).to(newDirectExchange()).with("send").noargs();
}
@Bean
public Binding newDirectBinding1(){
return BindingBuilder.bind(newQueue2()).to(newDirectExchange()).with("send2").noargs();
}
}
测试验证
创建消息提供者的单元测试,在单元测试中执行消息发送方法,并执行单元测试,如果使用注解的方式接收消息,则不需要做额外处理,Spring Boot 启动后会接收消息,如下:
package org.lixue.bus;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.*;
@RunWith(SpringRunner.class)
@SpringBootTest
publi cclass SenderTest{
@Autowired
private Sender sender;
@Test
public void send() throws Exception{
for(inti=0;i<1000000;i++){
sender.send(i);
}
}
}
日志输出如下:
2018-05-04 16:23:39.085 INFO 20176 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.2.215:5672]
2018-05-04 16:23:39.294 INFO 20176 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#232532cf:0/SimpleConnection@7c7f9a81 [delegate=amqp://lixue@192.168.2.215:5672/, localPort= 63144]
2018-05-04 16:23:39.300 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (directExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2018-05-04 16:23:39.300 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (topicExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2018-05-04 16:23:39.301 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (fanoutExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2018-05-04 16:23:39.476 INFO 20176 --- [ main] org.lixue.bus.SenderTest : Started SenderTest in 4.536 seconds (JVM running for 6.026)
receive:Hello Fri May 04 16:23:39 CST 2018
receive:Hello Fri May 04 16:23:39 CST 2018
receive:Hello Fri May 04 16:23:39 CST 2018
receive:Hello Fri May 04 16:23:39 CST 2018
本文版权归作者 李雪(博客地址:https://www.cnblogs.wiki)所有,欢迎转载和商用,请在文章页面明显位置给出原文链接并保留此段声明,否则保留追究法律责任的权利,其他事项,可留言咨询。
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。