RabbitMQ

RabbitMQ

控制台

  1. 用户
  2. 数据库
  3. 授权
  4. Overview
  5. Ports and contexts
    • amqp协议
    • clustering
    • http
  6. Connections
  7. Channels
  8. Exchanges
  9. Queues
  10. Admin

简单队列示例

RabbitMQ消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   //获取MQ连接的工具类
public class ConnectionUtiles{
public static Connection getConnection(){
//连接工厂
ConnectionFactory factory = new ConnectionFactory();

//设置服务地址
factory.setHost("127.0.0.1");

//端口AHQP 5672
factory.setPort(5672);

//vhost
facotry.setVirtualHost("/newMySql");

//用户名
factory.setUsername("miki");
//密码
facotry.setPassword("miki666");

//获取连接
return factory.newConnection();
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Send{
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();

//队列 声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发送
String msg = "hello simpel";

channel.basePublish("",QUEUE_NAME,null,msg.getBytes());

//关闭
channel.close();
conneciton.close();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Recv{
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//=============老版本====================================================
//定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听
channel.basicConsumer(QUEUE_NAME,true,consumer);
while(true){
Doliver dalivery = consumer.nextDelivery();
String msgString = new String(dalivery.getBody());
sout(msgString);
}
//===================老版本=============================================


//===================新版本=============================================
//观察者模式 队列有数据 消费者DefaultConsumer lamdba -> channel 回调 handleDelivery
//这里重写方法
new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws IOException{
String msg = new String(body,"utf-8");
scout(msg);
}
};
//消费者监听队列 阻塞 -》 android Button 监听事件 addListener
channel.basicConsumer(QUEUE_NAME,true,consumer);

//===================新版本=============================================

}
}

旧:监听消费者,while循环当有消息时return 新:channel被消费者监听时,回调 handleDelivery。

简单队列不足

  • 耦合性高,生产者一一对应消费者,当出现多个消费者消费队列则不行;队列名变更,同时变更(生产者,消费者)。
  • 消费者与业务结合,消费者接受消息进行处理,需要时间,就会在队列积压很多消息。

解决方案

  • 工作队列:一个生产者对应多个消费者。

工作队列

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Send{
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException,TimeoutException{
/* |--c1
* p --Queue--|
* |--c2
*/
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for(int i = 0 ; i < 50 ; i++ ){
String msg = "hello" + i;
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*10); // 抛出异常
}
channel.close();
connection.close();
}
}

消费者more

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Recv{
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws IOException{
String msg = new String(body,"utf-8");
scout(msg);
try{
Thread.sleep(2000);
}catch(InterruptedExceptin e){
e.printStackTrace();
}finally{
scout("done");
}
}
};
boolean.autoAck =true;
channel.basicConsumer(QUEUE_NAME,autoAck,consumer);
}
}

//消费者2
同理

简单队列不足

  • 消费者1 & 2 处理的消息都是一样:轮询分发(round-robin),无论谁忙谁闲,都不会都给,都是你一个我一个均摊分发。

解决方案

使用basicQos(perfetch =1 ),消费者手动反馈(我已经做完了),即关闭自动应答ack改为手动。

公平分发Fair dispatch

消费者加入回执操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//需要发送确认消息
public class Recv{
//~~~~~~~~获取连接~~~~~~~
//~~~~~~~获取channel~~~~~~~~
//~~~~~~~~声明队列~~~~~~~
//每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一次数据
channel.basicQos(1);

//Channel channel = new DefaultConsumer(channel) ~~~
finally{
scout("done");
//手动回执
channel.basicAck(envelope.getDeliveryTag(),fasle);
}
};
boolean autoAck = true;
channel.basciConsumer(QUEUE_NAME,autoAck,consumer);
}

消息应答与持久化

boolean autoAck = true; (自动确认模式),一旦rebbit消息分发给消费者,消息就会从内存中删除。

坏处:如果消费者突然失败,就会丢失正在处理的消息。

boolean autoAck = false(默认,手动确认模式),如果消费者突然失败,就会将消息交付给其他消费者。

Message acknowlegment

​ rabbitmq挂了,消息也会丢失 。解决方案->持久化

boolean durable = false

channel.queueDeclare(QUEUE_NAME,durable) // 报错原因:把durable把false改true不可,是因为已经定义了QUEUE_NAME=test_work_queue,这个队列是未持久化,不可重新定义已存在的队列。重建一个队列(改名字)就可定义持久化。

工作队列缺点

生产者发送的消息不能被消费者共享。

订阅模式

交互机模型

1
2
3
4
/*             -->|--queue -->c1
* p --X--|
* -->|--queue -->c2
*/
  1. 一个生产者多个消费者
  2. 每个消费者有自己的队列
  3. 生产者把消息发到交换机
  4. 队列要绑定到交换机
  5. 生产者发送 ->消息 -> 交换机 –> 队列:实现一个消息被多个消费者消费。

例如:商品名字改了,搜索引擎改,前台变。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Recv{
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明 交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//发送消息
String msg = "hello";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
scout("Send" +msg);

//close
channel.close();
connection.close();
}

消费者one

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Recv{
private static final String QUEUE_NAME = "test_exchange_fanout_email";
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"")
//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws IOException{
String msg = new String(body,"utf-8");
scout(msg);
try{
Thread.sleep(2000);
}catch(InterruptedExceptin e){
e.printStackTrace();
}finally{
scout("done");
}
}
};
boolean.autoAck =true;
channel.basicConsumer(QUEUE_NAME,autoAck,consumer);
}
}

消费者two

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Recv{
private static final String QUEUE_NAME = "test_exchange_fanout_sms";
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"")
//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws IOException{
String msg = new String(body,"utf-8");
scout(msg);
try{
Thread.sleep(2000);
}catch(InterruptedExceptin e){
e.printStackTrace();
}finally{
scout("done");
}
}
};
boolean.autoAck =true;
channel.basicConsumer(QUEUE_NAME,autoAck,consumer);
}
}

现象:消费者1 消费者2 共享生产者发送的消息

交换机缺点

一方面接收生产者消息,另一方面向队列推送消息。但是不能把消息交给指定queue

  • 无交换机

channel.basicPublish("","")

改变 -> 有交换机 没有路由

channel.exchangeDeclare(EXCHANGE_NAME,"fanout") //不处理路由链

channel.basicPublish(EXCHANGE_NAME,"")

路由模式

路由模型

1
2
3
4
5
/*            |  -error->|--queue -->c1
* p --X :key--|
* | -info->|
* |-warning->| --queue -->c2
* | -error->|

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Recv{
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明 交换机 + 路由 direct
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//发送消息
String msg = "hello";
String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
scout("Send" +msg);

//close
channel.close();
connection.close();
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Recv{
private static final String QUEUE_NAME = "test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_i";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
//绑定多个
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");

//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws IOException{
String msg = new String(body,"utf-8");
scout(msg);
try{
Thread.sleep(2000);
}catch(InterruptedExceptin e){
e.printStackTrace();
}finally{
scout("done");
}
}
};
boolean.autoAck =true;
channel.basicConsumer(QUEUE_NAME,autoAck,consumer);
}
}

路由模式缺点

生产者路由key只能一个。不能多个匹配,即不能多对多。

主题模式

主题模型

1565530396700

模式匹配

  • #匹配一个或多个
  • *匹配一个

生产者:商品->发布(goods.add) 删除 修改 查询

消费者1:(goods.add)

消费者2:(goods.#)

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Recv{
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明 交换机 + 路由 direct
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//发送消息
String msg = "商品 ..";
String routingKey = "goods.add";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
scout("Send" +msg);

//close
channel.close();
connection.close();
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Recv{
private static final String QUEUE_NAME = "test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_i";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");

//消费者2---------------------------------------
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");
//消费者2---------------------------------------
channel.basicQos(1);
//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws IOException{
String msg = new String(body,"utf-8");
scout(msg);
try{
Thread.sleep(2000);
}catch(InterruptedExceptin e){
e.printStackTrace();
}finally{
scout("done");
}
}
};
boolean.autoAck =true;
channel.basicConsumer(QUEUE_NAME,autoAck,consumer);
}
}

在路由的基础上,消费者多了匹配符,简化消费者用多个通道来对应queue。

即,主题模式更加符合模块化接收,(从一个管子变成口子)

消息确认机制(事务+confirm)

通过持久化数据,解决消费者异常导致数据丢失问题。但是不知道消息发出去是否接收到。

两种解决方案

  1. AMQP实现事务机制(类似mysql)
  2. Confirm模式

AMQP实现事务机制

  1. txSelect:用于用户将当前channel设置成transation模式
  2. txCommit:用于提交事务
  3. txRoolback:回滚事务

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 private static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明 交换机 + 路由 direct
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发送消息
String msg = "hello tx ..";
try{
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
//------异常------
// int x = 1/0;
//------异常------
channel.txCommit();
scout("Send" +msg);
}catch(Exception e){
channel.txRollback();
scout("tx rollback");
}

//close
channel.close();
connection.close();
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Recv{
private static final String QUEUE_NAME = "test_queue_tx";

public static void main(String[] args){
//拿到连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channle = connection.createChannel();
//队列 声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//定义消费者
Consumer consumer = boolean.autoAck =true;
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws IOException{
scout(new String(body,"utf-8"));
}
});

}
}

缺点:事务的大量提交,会降低生产者的吞吐量。

Confirm模式

实现原理

0%