springboot 2.1.6 + rabbitmq 整合之道

本贴最后更新于 1620 天前,其中的信息可能已经物是人非

1. 导入 rabbitmq

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 在 application.yml 中添加 rabbitmq 配置

spring:
  rabbitmq:
    host: localhost 		# rabbitmq的IP地址或对应的域名
    username: admin 	# rabbitmq登录账号
    password: 123456 	# rabbitmq 登录密码
    virtual-host: /
    publisher-returns: true
    connection-timeout: 15000
    template:
      mandatory: true
    listener:
      simple:
        concurrency: 5 #监听最小为5
        max-concurrency: 10 #监听最大为10

3.注入队列

@Configuration
public class RabbitConfig {
    @Bean
    public Queue Queue() {
        return new Queue("zzm-rabbitmq");
    }
}

注意:zzm-rabbitmq 为队列名称

4.创建消息生产者

@Component
public class RabbitProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 10000L)
    public void send() {
        rabbitTemplate.convertAndSend("zzm-rabbitmq", "Hello rabbitmq !~");
    }
}

注意:通过 zzm-rabbitmq 发送一个字符串也可以发送一个 object,在生成环境中,绝大多数是一个 object

5.创建消费者

@Component
public class RabbitConsumer {

    @RabbitHandler
    @RabbitListener(queues = "zzm-rabbitmq")
    public void process(@Payload String foo) {
        System.out.println(new Date() + ": " + foo);
    }

}

@RabbitListener(queues = "zzm-rabbitmq")这里里面要注意一下,这里要写你刚才在 Queue 里面的那个队列名称

6.启动主类

1. 在启动类上面加入@EnableScheduling,表示启动定时任务

2. 修改Producer生产者类中的 @Scheduled(fixedDelay = 10000L)里面的数值

3. 控制台打印:Mon Oct 21 16:55:43 GMT+08:00 2019: Hello rabbitmq !~ 表示成功

4. 到此,一个简单的springboot + rabbitmq整合应用就弄好了;

5. 由于rabbitmq引用了ExChange概念,ExChange有四种类型:Direct、 Topic、 Headers 、Fanout;其中Headers使用是最少的了,Direct是最简单的;

6. 下面我们来介绍一下topic和Fanout。

6.1 简单说一下 ExChange 中的四种类型

direct:exchange在和queue进行binding时会设置routingkey,将消息发送到exchange时会设置对应的routingkey,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。

fanout:直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。(广播)

topic:此类型exchange和direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'*','#'。

其中'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词。

header:其路由的规则是根据header来判断,其中的header就是binding时的arguments参数:

7. Topic Exchange

7.1 配置 Topic 规则

import com.example.amqbrabbitmq.common.MQConst;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitConfig {

    @Bean
    public Queue queueMessage1() {
        return new Queue(MQConst.TOPIC_QUEUENAME1);
    }

    @Bean
    public Queue queueMessage2() {
        return new Queue(MQConst.TOPIC_QUEUENAME2);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(MQConst.TOPIC_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage1, TopicExchange exchange) {
        // 将队列1绑定到名为topicKey.A的routingKey
        return BindingBuilder.bind(queueMessage1).to(exchange).with(MQConst.TOPIC_KEY1);
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessage2, TopicExchange exchange) {
        // 将队列2绑定到所有topicKey.开头的routingKey
        return BindingBuilder.bind(queueMessage2).to(exchange).with(MQConst.TOPIC_KEYS);
    }
}

7.2 配置消费者

import com.example.amqbrabbitmq.common.MQConst;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {

    @RabbitHandler
    @RabbitListener(queues = MQConst.TOPIC_QUEUENAME1)
    public void process1(Object message) {
        System.out.println("queue:topic.message1,message:" + message);
    }

    @RabbitHandler
    @RabbitListener(queues = MQConst.TOPIC_QUEUENAME2)
    public void process2(Object message) {
        System.out.println("queue:topic.message2,message:" + message);
    }
}

7.3 配置生产者

import com.example.amqbrabbitmq.common.MQConst;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
public class TopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 10000L)
    public void send() {
        rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEYS, "我是TOPIC_KEYS");
        rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEY1, "我是TOPIC_KEY1");
    }
}

7.4 启动主类,控制台输出:

queue:topic.message2,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-FFUIrKtXJAJI_ecGEo0NSA, consumerQueue=topic_queuename2])
queue:topic.message2,message:(Body:'我是TOPIC_KEYS' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.#, deliveryTag=1, consumerTag=amq.ctag-1VqDmcyXJi1_ZhD3GJcefA, consumerQueue=topic_queuename2])
queue:topic.message1,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-dDQ5i2_SYPcAYdCTHmjDxg, consumerQueue=topic_queuename1])
queue:topic.message1,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-fgs6LkKQo9x2dXw9El6OEQ, consumerQueue=topic_queuename1])
queue:topic.message2,message:(Body:'我是TOPIC_KEYS' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.#, deliveryTag=1, consumerTag=amq.ctag-rX6y-N4JGfg_FB1xzHsBiw, consumerQueue=topic_queuename2])
queue:topic.message2,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-cdPvOG38zLth8qBNxAaDug, consumerQueue=topic_queuename2])

8. Fanout Exchange

8.1 配置 Fanout 规则

import com.example.amqbrabbitmq.common.MQConst;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitConfig {
    @Bean
    public Queue MessageA() {
        return new Queue(MQConst.FANOUT_QUEUENAME1);
    }

    @Bean
    public Queue MessageB() {
        return new Queue(MQConst.FANOUT_QUEUENAME2);
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(MQConst.FANOUT_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageA).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageB).to(fanoutExchange);
    }
}

8.2 配置消费者

import com.example.amqbrabbitmq.common.MQConst;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutConsumer {

    @RabbitHandler
    @RabbitListener(queues = MQConst.FANOUT_QUEUENAME1)
    public void process1(String message){
        System.out.println("queue:fanout.message1,message:" + message);
    }

    @RabbitHandler
    @RabbitListener(queues = MQConst.FANOUT_QUEUENAME2)
    public void process2(String message){
        System.out.println("queue:fanout.message2,message:" + message);
    }
}

8.3 配置生产消息

import com.example.amqbrabbitmq.common.MQConst;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
public class FanoutProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 10000L)
    public void send() {
        rabbitTemplate.convertAndSend(MQConst.FANOUT_EXCHANGE,"", "我是FANOUT_QUEUENAME2");
    }
}

8.4 启动主类 控制台输出:

queue:fanout.message2,message:我是FANOUT_QUEUENAME2
queue:fanout.message1,message:我是FANOUT_QUEUENAME2
queue:fanout.message2,message:我是FANOUT_QUEUENAME2

总结:

  一般direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。header类型用的比较少,但还是知道一点好。

源码链接:

https://gitee.com/zzmedu/amqb-rabbitmq.git

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 395 关注
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    938 引用 • 1456 回帖 • 163 关注
  • 消息队列
    40 引用 • 52 回帖 • 2 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...