RabbitMQ- 从基础到实战(4)— 消息的交换(中)

本贴最后更新于 2306 天前,其中的信息可能已经物是人非

1.简介

本章节和官方教程相似度较高,英文好的可以移步官方教程

在上一章的例子中,我们创建了一个消费者,生产日志消息,广播给两个消费者,对消息进行不同的处理。这一节,我们将对它进行扩展,实现一些更加高级的功能,例如:使消费者 A 只接受 error 级别的日志保存到硬盘,消费者 B 接收所有级别的消息进行打印。

本文中涉及到的所有概念(包括前面几章),都将摒弃个人经验,以官方文档为基础进行讲解,在书写本文的同时,也是我对 RabbitMQ 的重新学习。

2.绑定

回顾一下上一章的队列绑定代码

// 把刚刚获取的队列绑定到 logs 这个交换中心上,
channel.queueBind(queueName, "logs", "");

这段代码在消费者中,为什么生产者没有?因为在 RabbitMQ 中消息是发送到交换中心(exchange)的,这在上一张已经重点强调过。

上述代码可以理解成,queueName 这个队列对 logs 这个 exchange 中的消息感兴趣,routingKey 是""

在发送消息的 basicPublish 方法中,也有一个参数叫做 routingKey,没错,他们是有关联的,下面会介绍

在不同的 exchange 类型中,routingKey 扮演的角色也相应的不同,比如上一章我们使用的 fanout(扇出,多贴切的名字,想象一下 WOW 中盗贼的刀扇)将忽略 routingKey,所有绑定在 fanout 类型的 exchange 上的队列,都将接收到该 exchange 上的所有消息。

3.Direct Exchange

fanout 类型的 exchange 没有给我们太多的灵活性,direct 类型的 echange 非常简单,会匹配消息发布时的 routingKey 和 queue 的 routingKey,完全相等则把消息放入该队列。

如上图,Q1 绑定了 orange,Q2 绑定了 black 和 green,就可以实现不同级别的日志用不同的消费者进行处理

我们看到 Q2 绑定了两个 routingKey,难道第二次绑定不会把第一次绑定覆盖掉吗?

实践出真正,我们来试一下

  1. 声明一个名为 logs 的 exchange,类型换为 direct,让它通过 routingKey 的完全匹配去分发消息
  2. 然后把消息发送到名为 logs 的 exchange 上,routingKey 是外面传进来的

改造一下发送方法,轮流发送 info 和 error 信息

给 Consummer 队列绑定两个 routingKey

激动人心的时刻到来了,跑一把

Duang,报错了

报错信息:

inequivalent arg 'type' for exchange 'logs' in vhost '/': received 'direct' but current is 'fanout', class-id=40, method-id=10)

大意就是 logs exchange 已经被声明称 fanout 了,不能再声明成 direct 类型,RabbitMQ 的队列声明方法和 exchange 声明方法都是幂等的,如果没有,就创建,如果有,参数相同,就不管,如果有了还用不同的参数重新声明,就报错

进入 RabbitMQ 控制台把 logs 删除,重新执行

逆袭成功,消费一下看看

成功了,一个队列可以绑定多个 routingKey,这里注意先启动消费者,因为前面的代码里我们用的是临时队列,断开连接后,队列就删除了,如果先启动生产者,exchange 接到消息后发现没有队列对它感兴趣,就任性的把消息给丢掉了。

一个队列可以绑定多个 routingKey,反之,一个 routingKey 也可以绑定多个队列,如下图,感兴趣的朋友可以自己试一下

如果绑定在一个 direct 类型的 exchange 上的队列都使用同一个 routingKey,那它就是一个 fanout

4.实战

要实现本章的需求,即 Q1 只接收 error 级别的日志写到硬盘上,Q2 接收 error 和 info 级别的日志打印出来

用 direct 类型的 exchange 来实现这个需求非常简单,Q1 绑定 error,Q2 绑定 error 和 info 即可,缺点是 Q2 需要绑定 N 个 routingKey,N=日志级别数量,我们可以用一些编程的技巧来规避它

Sender 的代码上面已经改好了,把 exchange 换为 direct,注意删除原 exchange,不再赘述

Q2 绑定所有日志级别,我们用一个 Enum 来规避手动绑定

定义一个 Enum

1 public enum LogType{ 2 error,info; 3 }

用 foreach 语法糖进行循环绑定

1 //绑定所有类型
2 for(LogType logType: LogType.values()){ 3 channel.queueBind(queueName, "logs", logType.name()); 4 }

foreach 是单线程的,这里也可以装个逼用一下 JAVA8 的 lambda,由于 lambda 是并行处理,所以外围的 try catch 无效,需要在内层重新抓取异常,而且不能抛出,反而显得代码很不美好,装逼失败

1 IntStream.range(0, LogType.values().length).forEach(n->{ 2 try { 3 channel.queueBind(queueName, "logs", LogType.values()[n].name()); 4 } catch (IOException e) { 5 e.printStackTrace(); 6 } 7 });

把另外一个 Consumer 改成只绑定 error 队列

1 channel.queueBind(queueName, "logs", LogType.error.name());

然后,改造一下发送消息的地方,一开始我们用了一个 while 还有一组 if else,看起来比较挫,别人看你代码的时候,就不会觉得你很厉害,这样和你不写博客一样,对你的工作是没有好处的,我们把它改的高端一点

1 while(true){ 2 boolean info = ++i%2==0; 3 String type = info?LogType.info.name():LogType.error.name(); 4 sender.sendMessage(type +" message: "+i, type); 5 Thread.sleep(1000); 6 }

对比一下

是不是觉得自己厉害了很多?这就是编码的艺术(得意脸)

好了,这一章没有太多内容,跑一下看看结果

左边的 Consumer1,消费了 info 和 error 级别的日志,右边的 Consumer2,只消费了 error 级别的日志

5.结束语

这一章主要是介绍了 RabbitMQ 中 direct 类型的 exchange,下一章将跟着官方教程的进度继续介绍 topic 类型的 exchange,以及下下章介绍用 RabbitMQ 实现 RPC 调用。之后则会介绍 RabbitMQ 与 Spring 的集成等与真实开发环境更相关的技术。

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 395 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...