"前提 通过 API 获取的前提是你的 rabbitmq_management处于开启状态,也就是能通过 http://host:15672 访问 web 管理端。RabbitMQ 的安装与开启rabbitmq_management可以参考RabbitMQ 安装、基础。 访问地址 上网找了一下发现没有几个说清楚了怎么直接 .."

RabbitMQ 通过 http API 获取队列数

前提

通过 API 获取的前提是你的 rabbitmq_management处于开启状态,也就是能通过 http://host:15672 访问 web 管理端。RabbitMQ 的安装与开启rabbitmq_management可以参考RabbitMQ 安装、基础

访问地址

上网找了一下发现没有几个说清楚了怎么直接用 HTTP 请求去获取队列数。其实完全不用去网上找的,因为 RabbitMQ 自己就提供了 HTTP API 手册,比如我本地的 API 手册地址为:http://localhost:15672/api

获取队列详情 API 为

http://host:15672/api/queues/Vhost_name/queue_name

将上面的host换成 RabbitMQ 部署地址,Vhost_name换成队列所在的虚拟主机名,queue_name换成队列名。也可以将Vhost_namequeue_name去掉通过http://host:15672/api/queues直接获取所有队列信息。

访问结果是 json 串
12555121d30e577861521691.webp

可以看到队列相关的所有信息都有记录。

注意

虚拟主机名Virtual host在设置的时候不要带/,不然会访问不到

{"error":"Object Not Found","reason":"\"Not Found\"\n"}

之前就是被这个坑了好久,明明按照 API 写的格式来的,就是访问不到。

代码

package com.ameya.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import sun.misc.BASE64Encoder;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
 * @author: Ant
 * @Date: 2018/11/23 13:28
 * @Description:
 */
@Component
public class MQUtils {
    private static final Logger logger = LoggerFactory.getLogger(MQUtils.class);

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.apiport}")
    private String port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtualHost}")
    private String virtualHost;

    /**
     * 队列任务总数
     *
     * @param queueName
     * @return
     */
    public int getMessageCount(String queueName) throws IOException {
        String apiMessage = getApiMessage(queueName);
        if (Objects.equals(apiMessage, "")) {
            logger.error("请求 RabbitMQ API 时出错!!");
            return 0;
        }
        JSONObject jsonObject = JSON.parseObject(apiMessage);
        return Integer.parseInt(jsonObject.get("messages").toString());
    }

    /**
     * 队列 ready 任务数
     *
     * @param queueName
     * @return
     */
    public int getMessageReadyCount(String queueName) throws IOException {
        String apiMessage = getApiMessage(queueName);
        if (Objects.equals(apiMessage, "")) {
            logger.error("请求 RabbitMQ API 时出错!!");
            return 0;
        }
        JSONObject jsonObject = JSON.parseObject(apiMessage);
        return Integer.parseInt(jsonObject.get("messages_ready").toString());
    }

    /**
     * 队列 unack 数 MQ
     *
     * @param queueName
     * @return
     */
    public int getMessagesUnacknowledgedCount(String queueName) throws IOException {
        String apiMessage = getApiMessage(queueName);
        if (Objects.equals(apiMessage, "")) {
            logger.error("请求 RabbitMQ API 时出错!!");
            return 0;
        }
        JSONObject jsonObject = JSON.parseObject(apiMessage);
        return Integer.parseInt(jsonObject.get("messages_unacknowledged").toString());
    }

    /**
     * 获取队列消息总数、ready 消息数、unack 消息数
     *
     * @param queueName
     * @return Map<String,Integer>
     */
    public Map<String, Integer> getMQCountMap(String queueName) throws IOException {
        String apiMessage = getApiMessage(queueName);
        JSONObject jsonObject = JSON.parseObject(apiMessage);
        Map<String, Integer> map = new HashMap<>();
        map.put("messages", Integer.parseInt(jsonObject.get("messages").toString()));
        map.put("messages_ready", Integer.parseInt(jsonObject.get("messages_ready").toString()));
        map.put("messages_unacknowledged", Integer.parseInt(jsonObject.get("messages_unacknowledged").toString()));
        return map;
    }

    public String getApiMessage(String queueName) throws IOException {
        // 发送一个 GET 请求
        HttpURLConnection httpConn = null;
        BufferedReader in = null;

        String urlString = "http://" + host + ":" + port + "/api/queues/" + virtualHost + "/" + queueName;
        URL url = new URL(urlString);
        httpConn = (HttpURLConnection) url.openConnection();
        // 设置用户名密码
        String auth = username + ":" + password;
        BASE64Encoder enc = new BASE64Encoder();
        String encoding = enc.encode(auth.getBytes());
        httpConn.setDoOutput(true);
        httpConn.setRequestProperty("Authorization", "Basic" + encoding);
        // 建立实际的连接
        httpConn.connect();
        // 读取响应
        if (httpConn.getResponseCode() == HttpURLConnection.HTTP_OK) {
            StringBuilder content = new StringBuilder();
            String tempStr = "";
            in = new BufferedReader(new InputStreamReader(httpConn.getInputStream()));
            while ((tempStr = in.readLine()) != null) {
                content.append(tempStr);
            }
            in.close();
            httpConn.disconnect();
            return content.toString();
        } else {
            httpConn.disconnect();
            return "";
        }
    }
}


  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    2169 引用 • 7426 回帖 • 1019 关注
  • 消息队列
    16 引用 • 19 回帖
感谢    关注    收藏    赞同    反对    举报    分享
1 回帖    
请输入回帖内容...
  • nobt   1 感谢      

    不错,这个可以当做项目经验里面自我钻研的一个小例子

       感谢    赞同    反对    举报    折叠    分享    评论    回复