高性能消息队列 NSQ---GO--demo

前言

  关于 NSQ 是什么,NSQ 是做什么的,怎么启动,网上的资料太多太多,详情请移步分布式实时消息平台 NSQ,里面讲的很详细,还附带 demo。

客户端/生产者(producer)

  NSQ 发送消息非常简单,分两步完成:

func main() {
	cfg := nsq.NewConfig()
	nsqd := "127.0.0.1:4150"
	producer, err := nsq.NewProducer(nsqd, cfg)
	if err != nil {
		log.Fatal(err)
	}
	if err := producer.Publish("test", []byte("Hello NSQ")); err != nil {
		log.Fatal("publish error:" + err.Error())
	}
}

服务端/消费者(consumer)

  消费者用于接收指定 topic 中的消息,实现需分为 3 步:

func main() {
	cfg := nsq.NewConfig()
	c, err := nsq.NewConsumer(topic, channel, cfg)
	if err != nil {
		panic(err)
	}
	c.AddHandler(&ConsumerT{})

	if err := c.ConnectToNSQD(address); err != nil {
		panic(err)
	}
}
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
	fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
	return nil
}

消费者生产者搭配使用

  我上面所写的 demo 虽实现了最基本的 NSQ 的功能,但是对于一个 demo 来说,整体不够直观。理想中的状态应该是:服务端一直处于执行状态,客户端发送消息时,服务端接受并处理。
  改造后服务端:

package main

import (
	"fmt"

	"github.com/nsqio/go-nsq"
)

type ConsumerT struct{}

func main() {
	InitConsumer("test", "ch1", "127.0.0.1:4150")
	select {}
}

func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
	fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
	return nil
}

func InitConsumer(topic string, channel string, address string) {
	cfg := nsq.NewConfig()
	c, err := nsq.NewConsumer(topic, channel, cfg)
	if err != nil {
		panic(err)
	}
	c.AddHandler(&ConsumerT{})

	if err := c.ConnectToNSQD(address); err != nil {
		panic(err)
	}
}

  客户端这边呢,倒是没什么具体要更改的,可是每次都需要重复去运行才能发送消息也确实麻烦,所以做了点小更改,让客户端也一直处于运行状态,通过命令行的输入来发送消息,具体如下:

package main

import (
	"bufio"
	"fmt"
	"github.com/nsqio/go-nsq"
	"log"
	"os"
	"strings"
)

func main() {
	cfg := nsq.NewConfig()
	nsqd := "127.0.0.1:4150"
	producer, err := nsq.NewProducer(nsqd, cfg)
	if err != nil {
		log.Fatal(err)
	}

	reader := bufio.NewReader(os.Stdin)
	fmt.Println("Simple Shell")
	fmt.Println("---------------------")

	for {
		fmt.Print("-> topic: ")
		topic, _ := reader.ReadString('\n')
		topic = strings.Replace(topic, "\n", "", -1)
		fmt.Print("-> message: ")
		message, _ := reader.ReadString('\n')
		message = strings.Replace(message, "\n", "", -1)
		fmt.Println("消息发送中\n")

		if err := producer.Publish(topic, []byte(message)); err != nil {
			log.Fatal("publish error:" + err.Error())
		}

	}
}

源码请访问本人 GitHub 下载:https://github.com/InkDP/nsq-demo

  • NSQ
    2 引用 • 3 回帖
  • 消息队列
    32 引用 • 42 回帖 • 2 关注
  • 学习

    “梦想从学习开始,事业从实践起步” —— 习近平

    122 引用 • 428 回帖 • 1 关注
  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    403 引用 • 1298 回帖 • 687 关注
3 回帖
请输入回帖内容...
  • csfwff

    doge 虽然看不懂,但是觉得很厉害的样子

    1 回复
  • InkDP

    虽然你看不懂,但是谢谢你帮我暖贴

  • 2501224066

    TIM 图片 20190819154837e73bf6b7.gif