NSQ

NSQ说明

NSQ是一个分布式的实时的消息队列,基于GO语言开发,它的分布式是无中心节点的结构,据说是高可用,上亿的消息毫无压力,并且部署非常简单,之前项目里也有用,没详细读过代码,下面来说下NSQ的结构和实现

NSQ的组成部分

主要由以下三个daemon进程组成。

  • lookupd

维护nsqd节点以及他的topicchannel,方便client可以获取nsqd,也就是nsq里面的Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type RegistrationDB struct {
sync.RWMutex
registrationMap map[Registration]Producers
}
type Registration struct {
Category string
Key string
SubKey string
}
type Registrations []Registration
type PeerInfo struct {
lastUpdate int64
id string
RemoteAddress string `json:"remote_address"`
Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
}
type Producer struct {
peerInfo *PeerInfo
tombstoned bool
tombstonedAt time.Time
}
type Producers []*Producer

flow

这是自己阅读源码理解画的流程图~

lookupd里面的map是代表RegistrationDB的一个结构,代表所有nsqd节点上的topicchannel。在nsqd上面创建的topicchannel都会同步到lookupd的这个结构里面。大意可以通过理解下流程图

  • nsqd

nsqd

  • nsqadmin

基于web的一个可视化的监控页面,可以实时查看队列消息以及各节点信息

nsqadmin

NSQ消息流向

类似这种有GC的语言不用管理内存一般的程序不用关心内存这个问题相对来说轻松了许多。不过尽量还是要关注下这块,及时释放掉对向对内存的持有

msgflow

这是一个消息从topicconsumer的一个消息流向图。

  • topicchannel默认的消息队列(channel)有一个golang基于内存的channel另一个则是一个基于文件系统的FIFO磁盘队列。后一种为了解决前一个channel满的时候的备胎,哈哈~~,默认是基于文件的,另外还有一种topic名字前缀为#ephemeral时这个备份的内存就基于内存了(长度为0的channel),这就是nsq支持的三种channel

  • 发送给consumer的消息也分为两种,一种是in-flight(在处理中,已经发送给consumer)和deferred(延迟消息,还有一个优先级)消息。消息保存在以消息idkeymap里,在和consumer交互的时候都是有ACK(“OK”)回包,所以收到ACK包后会从in-flight字典中移除,同样延迟消息到时后同样会被移除掉

通信协议

说下nsqdlookupdconsumer之间的通信的消息格式,每个服务都监听两个端口,一个用来支持短链接http,另一个是长链接tcp实现通信需求

  • http

http协议就不讲了,基本都是http的get post

  • tcp

    • client&lookupd to nsqd

      1
      2
      3
      4
      5
      6
      7
      8
      type Command struct {
      Name []byte
      Params [][]byte
      Body []byte
      }
      //PUB topicName\n7msgtest"
      //名称+空格分格符+参数(多个以空格分开)+"\n"+Body长度+Body内容
    • nsqd to client

      1
      2
      3
      4
      5
      [x][x][x][x][x][x][x][x][x][x][x][x]...
      | (int32) || (int32) || (binary)
      | 4-byte || 4-byte || N-byte
      ------------------------------------...
      size frame ID data

      frame ID类型:

      1
      2
      3
      4
      5
      const (
      frameTypeResponse int32 = 0
      frameTypeError int32 = 1
      frameTypeMessage int32 = 2
      )
    • lookupd to nsqd

      1
      2
      3
      4
      5
      [x][x][x][x][x][x][x]...
      | (int32) || (binary)
      | 4-byte || N-byte
      ----------------------
      size data