MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
JMS
定义了统一的接口,来对消息操作进行统一;限定了必须使用Java语言;规定了两种消息模型。AMQP
通过规定协议来统一数据交互的格式;只是协议,不规定实现方式,因此是跨语言的;AMQP的消息模型更加丰富。RabbitMQ:两大功能,业务解耦和,流量削峰。
RabbitMQ是基于AMQP的一款消息管理系统: 官网: http://www.rabbitmq.com/ 官方教程:http://www.rabbitmq.com/getstarted.html 官网下载地址:http://www.rabbitmq.com/download.html 1、如何解决消息丢失?
ack(消费者确认)持久化生产者确认(publisher confirm):生产者发送消息后,等待mq的ACK,如果没有收到或者收到失败信息,则重试。如果收到成功消息则业务结束。可靠消息服务(可选):对于部分不支持生产者确认的消息队列,可以发送消息前,将消息持久化到数据库,并记录消息状态,后续消息发送、消费等过程都依赖于数据库中消息状态的判断和修改。2、如何避免消息堆积
通过同一个队列多消费者监听,实现消息的争抢,加快消息消费速度。
3、如何保证消息的有序性
大部分业务对消息的有序性要求不高,如果遇到对时序要求较高的业务,分两种情况来处理:
业务同时对并发要求不高:
保证消息发送时有序同步发送保证消息发送被同一个队列接收保证一个队列只有一个消费者,可以有从机(待机状态),实现高可用。实现主从(Zookeeper集群选主)业务同时对并发要求较高:
满足上述第一个场景的条件可以有多个队列有时序要求的一组消息,通过hash方式分派到一个固定队列4、如何避免消息重复消费?
保证接口幂等即可,那么如何保证接口幂等呢?
某些接口天生幂等,例如查询请求某些接口天生不幂等,比如新增,还有某些接口的修改功能 能根据具体的业务或状态来确定的,在消费端通过业务判断是否执行过RabbitMQ的访问端口:
15672::web访问 5672:java程序操作RabbitMQ的端口
登录用户:guest(默认) 密码:guest
Sprin有很多不同的项目,其中就有对AMQP的支持:http://projects.spring.io/spring-amqp
Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。
把所有交换机,队列和routingKey的名称定义
/** * 消息队列 交换机和路由 */ public class MQConstants { public static final class Exchange { /** * 商品服务交换机名称 */ public static final String ITEM_EXCHANGE_NAME = "ly.item.exchange"; } public static final class RoutingKey { /** * 商品上架的routing-key */ public static final String ITEM_UP_KEY = "item.up"; /** * 商品下架的routing-key */ public static final String ITEM_DOWN_KEY = "item.down"; } public static final class Queue{ /** * 搜索服务,商品上架的队列 */ public static final String SEARCH_ITEM_UP = "search.item.up.queue"; /** * 搜索服务,商品下架的队列 */ public static final String SEARCH_ITEM_DOWN = "search.item.down.queue"; /** * 静态页面,商品上架的队列 */ public static final String PAGE_ITEM_UP = "page.item.up.queue"; /** * 静态页面,商品下架的队列 */ public static final String PAGE_ITEM_DOWN = "page.item.down.queue"; } }消息生产者:
<!--RabbitMQ消息队列支持--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> spring: # RabbitMQ rabbitmq: host: 127.0.0.1 username: 116 password: 116 virtual-host: /116 # 操作的队列 template: retry: # 重试,消息发送失败会重试 enabled: true # 开启重试 initial-interval: 10000ms # 第一次十秒重试 max-interval: 80000ms # 最后一次重试是八十秒 multiplier: 2 # 重试翻倍率 publisher-confirms: true # 消息持久化发送消息需注入:
@Autowired private AmqpTemplate amqpTemplate; /* 商品上下架 索引库和静态页面详情的增删 采用异步消息方式 * 上架:在索引库中添加一条记录,新增一个静态页面,向消息队列中发送一条消息 * 下架:把索引库对应数据删除,删除一个静态页面,向消息队列发送一条消息 * 顺序为:交换机 - routingKey(路由) - 消息 * */ // true 上架,false 下架 String routingKey = saleable ? MQConstants.RoutingKey.ITEM_UP_KEY : MQConstants.RoutingKey.ITEM_DOWN_KEY; // 交换机的名称 发送的消息 发送的id amqpTemplate.convertAndSend(MQConstants.Exchange.ITEM_EXCHANGE_NAME,routingKey,id);消息消费者:
静态页面同步:
<!--RabbitMQ消息队列支持--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> spring: # RabbitMQ rabbitmq: host: 127.0.0.1 username: 116 password: 116 virtual-host: /116 # 操作的队列 /** * 静态页面 消息队列 监听 */ @Component public class PageListener { @Autowired private PageService pageService; /** * 新增静态页面(上架) * @param id 每个商品的id (spu)要与消息发送的一样 */ @RabbitListener(bindings = @QueueBinding( // 监听的队列 并且创建队列 ,durable开启持久化 value = @Queue(value = MQConstants.Queue.PAGE_ITEM_UP,durable = "true"), // 指定交换机 默认持久化 exchange = @Exchange(value = MQConstants.Exchange.ITEM_EXCHANGE_NAME,type = ExchangeTypes.TOPIC), // 指定交换机与当前队列之间的通信规则 key = MQConstants.RoutingKey.ITEM_UP_KEY )) public void addPage(Long id){ pageService.createStaticItemPage(id); } /** * 删除 静态页面 (下架) * @param id */ @RabbitListener(bindings = @QueueBinding( // 监听的队列,并且创建队列 开启持久化 value = @Queue(value = MQConstants.Queue.PAGE_ITEM_DOWN,durable = "true"), // 指定交换机 exchange= @Exchange(value = MQConstants.Exchange.ITEM_EXCHANGE_NAME,type = ExchangeTypes.TOPIC), // 指定交换机和队列的通信规则 key = MQConstants.RoutingKey.ITEM_DOWN_KEY )) public void delPage(Long id){ pageService.deleteStaticPage(id); } }搜索索引库同步:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> ```java spring: # RabbitMQ rabbitmq: host: 127.0.0.1 username: 116 password: 116 virtual-host: /116 # 操作的队列 /** * 搜索服务 消息队列 监听 */ @Component public class SearchListener { @Autowired private SearchService searchService; /** * 添加 索引库 商品索引(上架) * @param id */ @RabbitListener(bindings = @QueueBinding( // 监听的队列 并且创建队列 ,durable开启持久化 value = @Queue(value = MQConstants.Queue.SEARCH_ITEM_UP,durable = "true"), // 指定交换机 默认持久化 exchange = @Exchange(value = MQConstants.Exchange.ITEM_EXCHANGE_NAME,type = ExchangeTypes.TOPIC), // 指定交换机与当前队列之间的通信规则 key = MQConstants.RoutingKey.ITEM_UP_KEY )) public void addIndex(Long id){ searchService.addIndex(id); } /** * 删除 索引库 商品索引(下架) * @param id */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = MQConstants.Queue.SEARCH_ITEM_DOWN,durable = "true"), exchange = @Exchange(value = MQConstants.Exchange.ITEM_EXCHANGE_NAME,type = ExchangeTypes.TOPIC), key = MQConstants.RoutingKey.ITEM_DOWN_KEY )) public void delIndex(Long id){ searchService.delIndex(id); } }