JMS 与 Active MQ 入门、Spring 整合 Active MQ

本贴最后更新于 1512 天前,其中的信息可能已经时移世异

JMS 与 Active MQ 入门

JMS 是什么

JMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,实际上是一套 api,它便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,ActiveMQ 而是这个规范的一个具体实现。JMS 是一种与厂商无关的 API,用来访问收发系统消息,它类似于 JDBC.

JMS 对象模型

  • 连接工厂:连接工厂负责创建一个 JMS 连接。
  • JMS 连接:JMS 连接(Connection)表示 JMS 客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
  • JMS 会话:JMS 会话(Session)表示 JMS 客户与 JMS 服务器之间的会话状态。JMS 会话建立在 JMS 连接上,表示客户与服务器之间的一个会话线程。
  • JMS 目的/ Broker:客户用来指定它生产的消息的目标和它消费的消息的来源的对象,一个消息中间件的实例。
  • JMS 生产者和消费者:生产者(Message Producer)和消费者(Message Consumer)对象由 Session 对象创建,用于发送和接收消息。

JMS 中的消息

JMS 消息由以下三部分组成:

  • 消息头:每个消息头字段都有相应的 getter 和 setter 方法。
  • 消息属性:如果需要除消息头字段以外的值,那么可以使用消息属性。
  • 消息体:JMS 定义的消息类型有 TextMessage、MapMessage、BytesMessage、StreamMessage 和 ObjectMessage。ActiveMQ 也有对应的实现。

JMS 消息模型

上次的《基于异步消息模式的通信》中已对此有所介绍,这里再详细说一下:

Point-to-Point(P2P) / 点对点

    消息通过称为队列的一个虚拟通道来进行交换。队列是生产者发送消息的目的地和接受者消费消息的消息源。
    每条消息通仅会传送给一个接受者。可能会有多个接受者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个接受者消费
    消息存在先后顺序。一个队列会按照消息服务器将消息放入队列中的顺序,把它们传送给消费者当消息已被消费时,就会从队列头部将它们删除。
    每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
    发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
    接收者在成功接收消息之后需向队列应答成功
    如果希望发送的每个消息都应该被成功处理的话,使用这个 P2P 模式。

Topic/ 主题(发布订阅(Pub/Sub) )

    消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费
    如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 topic 模型

消息的消费方式

  1. 同步消费:通过调用 消费者的 receive 方法从目的地中显式提取消息。receive 方法可以一直阻塞到消息到达。
  2. 异步消费:客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

Active MQ 入门

Active MQ 下载与启动

    运行后在浏览器中访问 http://127.0.0.1:8161/admin,即可看到 ActiveMQ 的管理控制台
ActiveMQ 中,61616 为服务端口,8161 为管理控制台端口。

使用原生 Active MQ

新建一个 maven 项目,导入 Active MQ 的 pom 依赖。

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.8.0</version>
</dependency>

消息生产端

public class JmsProducer {

    /*默认连接用户名*/
    private static final String USERNAME
            = ActiveMQConnection.DEFAULT_USER;
    /* 默认连接密码*/
    private static final String PASSWORD
            = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 默认连接地址*/
    private static final String BROKEURL
            = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final int SENDNUM = 3;

    public static void main(String[] args) {
        /* 连接工厂*/
        ConnectionFactory connectionFactory;
        /* 连接*/
        Connection connection = null;
        /* 会话*/
        Session session;
        /* 消息的目的地*/
        Destination destination;
        /* 消息的生产者*/
        MessageProducer messageProducer;

        /* 实例化连接工厂*/
        connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,
                BROKEURL);
        try {
            /* 通过连接工厂获取连接*/
            connection = connectionFactory.createConnection();
            /* 启动连接*/
            connection.start();
            /* 创建session
            * 第一个参数表示是否使用事务,第二次参数表示是否自动确认*/
            session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            /* 创建一个名为HelloWorld消息队列*/
            //destination = session.createTopic("HelloActiveMq");
            destination = session.createQueue("HelloActiveMqQueue");
            /* 创建消息生产者*/
            messageProducer = session.createProducer(destination);
            /* 循环发送消息*/
            for(int i=0;i<SENDNUM;i++){
                String msg = "发送消息"+i+" "+System.currentTimeMillis();
                TextMessage textMessage = session.createTextMessage(msg);
                System.out.println("标准用法:"+msg);
                messageProducer.send(textMessage);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

消息消费端

public class JmsConsumer {
    /*默认连接用户名*/
    private static final String USERNAME
            = ActiveMQConnection.DEFAULT_USER;
    /* 默认连接密码*/
    private static final String PASSWORD
            = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 默认连接地址*/
    private static final String BROKEURL
            = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        /* 连接工厂*/
        ConnectionFactory connectionFactory;
        /* 连接*/
        Connection connection = null;
        /* 会话*/
        Session session;
        /* 消息的目的地*/
        Destination destination;
        /* 消息的消费者*/
        MessageConsumer messageConsumer;

        /* 实例化连接工厂*/
        connectionFactory
                = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);

        try {
            /* 通过连接工厂获取连接*/
            connection = connectionFactory.createConnection();
            /* 启动连接*/
            connection.start();
            /* 创建session*/
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /* 创建一个名为HelloWorld消息队列*/
            //destination = session.createTopic("HelloActiveMq");
            destination = session.createQueue("HelloActiveMqQueue");
            /* 创建消息消费者*/
            messageConsumer = session.createConsumer(destination);
            Message message;
            while((message = messageConsumer.receive())!=null){
                System.out.println(((TextMessage)message).getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消息异步消费端

public class JmsConsumerAsyn {
    /*默认连接用户名*/
    private static final String USERNAME
            = ActiveMQConnection.DEFAULT_USER;
    /* 默认连接密码*/
    private static final String PASSWORD
            = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 默认连接地址*/
    private static final String BROKEURL
            = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        /* 连接工厂*/
        ConnectionFactory connectionFactory;
        /* 连接*/
        Connection connection = null;
        /* 会话*/
        Session session;
        /* 消息的目的地*/
        Destination destination;
        /* 消息的消费者*/
        MessageConsumer messageConsumer;

        /* 实例化连接工厂*/
        connectionFactory
                = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);

        try {
            /* 通过连接工厂获取连接*/
            connection = connectionFactory.createConnection();
            /* 启动连接*/
            connection.start();
            /* 创建session*/
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /* 创建一个名为HelloWorld消息队列*/
            //destination = session.createTopic("HelloActiveMq");
            destination = session.createQueue("HelloActiveMqQueue");

            /* 创建消息消费者*/
            messageConsumer = session.createConsumer(destination);
            /* 设置消费者监听器,监听消息*/
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        System.out.println(((TextMessage)message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Active MQ 在 Spring 中的使用

1、添加依赖

    首先我们先要搭建一个 Spring 的 maven 项目。
    然后我们在 pom 文件中添加 active mq 的依赖与:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.3.11.RELEASE</version>
</dependency>

2、配置文件 applicationContext.xml

命名空间的添加

xmlns:amq="http://activemq.apache.org/schema/core"
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd

消费者的命名空间要再额外添加如下:

xmlns:jms="http://www.springframework.org/schema/jms"
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd

ActiveMQ 连接工厂

<amq:connectionFactory id="amqConnectionFactory"
         brokerURL="tcp://127.0.0.1:61616" userName="" password="" />

Spring Caching 连接工厂

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="sessionCacheSize" value="100"></property>
</bean>

3、消息生产者配置以及代码的编写

Spring JmsTemplate 的消息生产者

<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory"></constructor-arg>
    <!-- 队列模式-->//true为发布订阅模式
    <property name="pubSubDomain" value="false"></property>
</bean>

然后便可以使用 JmsTemplate

@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;

public void send(String queueName,final String message){
	jmsTemplate.send(queueName, new MessageCreator() {
		public Message createMessage(Session session) throws JMSException {
			Message msg = session.createTextMessage(message);
			//TODO  应答
			return msg;
			}
		});

3、消息消费者配置以及代码的编写

定义 Queue 监听器

<jms:listener-container destination-type="queue" container-type="default"
                        connection-factory="connectionFactory" acknowledge="auto">
    <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
    <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
</jms:listener-container>

queueReceiver1 的编写

@Component
public class QueueReceiver1 implements MessageListener {
	public void onMessage(Message message) {
		try {
			String textMsg = ((TextMessage)message).getText();
			System.out.println("QueueReceiver1 accept msg : "+textMsg);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}
  • 消息队列
    40 引用 • 52 回帖 • 2 关注
  • ActiveMQ

    ActiveMQ 是 Apache 旗下的一款开源消息总线系统,它完整实现了 JMS 规范,是一个企业级的消息中间件。

    19 引用 • 13 回帖 • 628 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...