RabbitMQ
控制台
- 用户
- 数据库
- 授权
- Overview
- Ports and contexts
- amqp协议
- clustering
- http
- Connections
- Channels
- Exchanges
- Queues
- Admin
简单队列示例
RabbitMQ消息队列
1 | //获取MQ连接的工具类 |
生产者
1 | public class Send{ |
消费者
1 | public class Recv{ |
旧:监听消费者,while循环当有消息时return 新:channel被消费者监听时,回调 handleDelivery。
简单队列不足
- 耦合性高,生产者一一对应消费者,当出现多个消费者消费队列则不行;队列名变更,同时变更(生产者,消费者)。
- 消费者与业务结合,消费者接受消息进行处理,需要时间,就会在队列积压很多消息。
解决方案
- 工作队列:一个生产者对应多个消费者。
工作队列
生产者
1 | public class Send{ |
消费者more
1 | public class Recv{ |
简单队列不足
- 消费者1 & 2 处理的消息都是一样:轮询分发(round-robin),无论谁忙谁闲,都不会都给,都是你一个我一个均摊分发。
解决方案
使用basicQos(perfetch =1 ),消费者手动反馈(我已经做完了),即关闭自动应答ack改为手动。
公平分发Fair dispatch
消费者加入回执操作
1 | //需要发送确认消息 |
消息应答与持久化
boolean autoAck = true;
(自动确认模式),一旦rebbit消息分发给消费者,消息就会从内存中删除。
坏处:如果消费者突然失败,就会丢失正在处理的消息。
boolean autoAck = false
(默认,手动确认模式),如果消费者突然失败,就会将消息交付给其他消费者。
Message acknowlegment
rabbitmq挂了,消息也会丢失 。解决方案->持久化
boolean durable = false
channel.queueDeclare(QUEUE_NAME,durable)
// 报错原因:把durable把false改true不可,是因为已经定义了QUEUE_NAME=test_work_queue,这个队列是未持久化,不可重新定义已存在的队列。重建一个队列(改名字)就可定义持久化。
工作队列缺点
生产者发送的消息不能被消费者共享。
订阅模式
交互机模型
1 | /* -->|--queue -->c1 |
- 一个生产者多个消费者
- 每个消费者有自己的队列
- 生产者把消息发到交换机
- 队列要绑定到交换机
- 生产者发送 ->消息 -> 交换机 –> 队列:实现一个消息被多个消费者消费。
例如:商品名字改了,搜索引擎改,前台变。
生产者
1 | public class Recv{ |
消费者one
1 | public class Recv{ |
消费者two
1 | public class Recv{ |
现象:消费者1 消费者2 共享生产者发送的消息
交换机缺点
一方面接收生产者消息,另一方面向队列推送消息。但是不能把消息交给指定queue
- 无交换机
channel.basicPublish("","")
改变 -> 有交换机 没有路由
channel.exchangeDeclare(EXCHANGE_NAME,"fanout")
//不处理路由链
channel.basicPublish(EXCHANGE_NAME,"")
路由模式
路由模型
1 | /* | -error->|--queue -->c1 |
生产者
1 | public class Recv{ |
消费者
1 | public class Recv{ |
路由模式缺点
生产者路由key只能一个。不能多个匹配,即不能多对多。
主题模式
主题模型
模式匹配
- #匹配一个或多个
- *匹配一个
生产者:商品->发布(goods.add) 删除 修改 查询
消费者1:(goods.add)
消费者2:(goods.#)
生产者
1 | public class Recv{ |
消费者
1 | public class Recv{ |
在路由的基础上,消费者多了匹配符,简化消费者用多个通道来对应queue。
即,主题模式更加符合模块化接收,(从一个管子变成口子)
消息确认机制(事务+confirm)
通过持久化数据,解决消费者异常导致数据丢失问题。但是不知道消息发出去是否接收到。
两种解决方案
- AMQP实现事务机制(类似mysql)
- Confirm模式
AMQP实现事务机制
- txSelect:用于用户将当前channel设置成transation模式
- txCommit:用于提交事务
- txRoolback:回滚事务
生产者
1 | private static final String QUEUE_NAME = "test_queue_tx"; |
消费者
1 | public class Recv{ |
缺点:事务的大量提交,会降低生产者的吞吐量。