NSQ golang MQ 3 - KerwinKoo/KerwinKoo.github.io GitHub Wiki

NSQ代码阅读

NSQ分三大模块:nsqd、nsqlookupd、nsqadmin,从哪个开始着手很重要。

本人阅读源码习惯从功能少的、尽量闭环的程序着手。纵观上面三个模块,最提纲挈领,最有桥梁作用的应该是nsqlookupd,但nsqlookupd仍然有很多的功能开发,在没有搞清楚其功能之前,不适合直接阅读。因此,我把第一个阅读的源码定为nsq-to-file

nsq-to-file

注意:本地源码地址为:github.com/nsqio/nsq/apps/nsq_to_file

nsq-to-file作为NSQ的一种客户端,其实是在go-nsq的基础上做的功能开发。

代码

两个文件:nsq_to_file.gostrftime.go,均为main package。

nsq_to_file.go

先找到main函数

func main() {
	cfg := nsq.NewConfig()

	// TODO: remove, deprecated
	flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
	flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")

	flag.Parse()

第一个函数调用flag.Parse()是golang标准库中的命令行参数解析,可以追出允许使用什么命令行参数。

nsq_to_file flag参数功能解析

nsq_to_file肚子opt参数设置:

var (
	showVersion = flag.Bool("version", false, "print version string")

	channel     = flag.String("channel", "nsq_to_file", "nsq channel")
	maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")

	outputDir      = flag.String("output-dir", "/tmp", "directory to write output files to")
	datetimeFormat = flag.String("datetime-format", "%Y-%m-%d_%H", "strftime compatible format for <DATETIME> in filename format")
	filenameFormat = flag.String("filename-format", "<TOPIC>.<HOST><REV>.<DATETIME>.log", "output filename format (<TOPIC>, <HOST>, <PID>, <DATETIME>, <REV> are replaced. <REV> is increased when file already exists)")
	hostIdentifier = flag.String("host-identifier", "", "value to output in log filename in place of hostname. <SHORT_HOST> and <HOSTNAME> are valid replacement tokens")
	gzipLevel      = flag.Int("gzip-level", 6, "gzip compression level (1-9, 1=BestSpeed, 9=BestCompression)")
	gzipEnabled    = flag.Bool("gzip", false, "gzip output files.")
	skipEmptyFiles = flag.Bool("skip-empty-files", false, "Skip writing empty files")
	topicPollRate  = flag.Duration("topic-refresh", time.Minute, "how frequently the topic list should be refreshed")
	topicPattern   = flag.String("topic-pattern", ".*", "Only log topics matching the following pattern")

	rotateSize     = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
	rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")

	nsqdTCPAddrs     = app.StringArray{}
	lookupdHTTPAddrs = app.StringArray{}
	topics           = app.StringArray{}

	// TODO: remove, deprecated
	gzipCompression = flag.Int("gzip-compression", 3, "(deprecated) use --gzip-level, gzip compression level (1 = BestSpeed, 2 = BestCompression, 3 = DefaultCompression)")
)

consumer-opt

最近两个是reader-optconsumer-opt,其实功能相同,建议使用consumer-opt。

这个命令行功能是为go-nsq设置config参数,因为nsq_to_file是在其基础上开发出来的。而且参数说明中也说了,有可能已经被设置好了的。

以下是config结构体的定义(文件:github.com/nsqio/go-nsq/config.go):

type Config struct {
	initialized bool

	// used to Initialize, Validate
	configHandlers []configHandler

	DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`

	// Deadlines for network reads and writes
	ReadTimeout  time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
	WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`

	// LocalAddr is the local address to use when dialing an nsqd.
	// If empty, a local address is automatically chosen.
	LocalAddr net.Addr `opt:"local_addr"`

	// Duration between polling lookupd for new producers, and fractional jitter to add to
	// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
	// restart at the same time
	//
	// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
	// reconnection attempts
	LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
	LookupdPollJitter   float64       `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`

	// Maximum duration when REQueueing (for doubling of deferred requeue)
	MaxRequeueDelay     time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
	DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`

	// Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms.
	BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"`
	// Maximum amount of time to backoff when processing fails 0 == no backoff
	MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
	// Unit of time for calculating consumer backoff
	BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`

	// Maximum number of times this consumer will attempt to process a message before giving up
	MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

	// Duration to wait for a message from a producer when in a state where RDY
	// counts are re-distributed (ie. max_in_flight < num_producers)
	LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`

	// Duration between redistributing max-in-flight to connections
	RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`

	// Identifiers sent to nsqd representing this client
	// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
	ClientID  string `opt:"client_id"` // (defaults: short hostname)
	Hostname  string `opt:"hostname"`
	UserAgent string `opt:"user_agent"`

	// Duration of time between heartbeats. This must be less than ReadTimeout
	HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
	// Integer percentage to sample the channel (requires nsqd 0.2.25+)
	SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`

	// To set TLS config, use the following options:
	//
	// tls_v1 - Bool enable TLS negotiation
	// tls_root_ca_file - String path to file containing root CA
	// tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates
	// tls_cert - String path to file containing public key for certificate
	// tls_key - String path to file containing private key for certificate
	// tls_min_version - String indicating the minimum version of tls acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2')
	//
	TlsV1     bool        `opt:"tls_v1"`
	TlsConfig *tls.Config `opt:"tls_config"`

	// Compression Settings
	Deflate      bool `opt:"deflate"`
	DeflateLevel int  `opt:"deflate_level" min:"1" max:"9" default:"6"`
	Snappy       bool `opt:"snappy"`

	// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
	OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
	// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
	//
	// WARNING: configuring clients with an extremely low
	// (< 25ms) output_buffer_timeout has a significant effect
	// on nsqd CPU usage (particularly with > 50 clients connected).
	OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`

	// Maximum number of messages to allow in flight (concurrency knob)
	MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`

	// The server-side message timeout for messages delivered to this client
	MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`

	// secret for nsqd authentication (requires nsqd 0.2.29+)
	AuthSecret string `opt:"auth_secret"`
}

从这里看出,使用golang默认标识的方式,已经设置好了参数的key(即opt后的值)和default值了。暂且不看各功能的含义(太多了),我们先看如何设置。

原文解释:

The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no longer mutable (they are copied).

Use Set(option string, value interface{}) as an alternate way to set parameters.

Set的代码:

// Set takes a comma separated value and follows the rules in Config.Set
// using the first field as the option key, and the second (if present) as the value
func (c *ConfigFlag) Set(opt string) (err error) {
	parts := strings.SplitN(opt, ",", 2)
	key := parts[0]

	switch len(parts) {
	case 1:
		// default options specified without a value to boolean true
		err = c.Config.Set(key, true)
	case 2:
		err = c.Config.Set(key, parts[1])
	}
	return
}

可以看出,对命令行的要求是“key,value”,即使用“,”分隔。

如设置timeout的启动命令:

./nsq_to_file --lookupd-http-address=127.0.0.1:4161 --consumer-opt="read_timeout,100s"

继续main函数代码阅读

channel

	if *showVersion {	//显示版本信息
		fmt.Printf("nsq_to_file v%s\n", version.Binary)
		return
	}

	if *channel == "" {	//要求必须设置channel名称
		log.Fatal("--channel is required")
	}

showVersion没什么好说的,关键是channelopt。

通过之前的准备工作,了解到NSQ消息分发是无单点的分布式拓扑结构,其中Topic和channel是其重要的组成单元,一个Topic下的信息会同时发送给不同的channel中,而同一个channel下的多个消费者中,只有一个可以获得此channel下的消息。

做一个测试:

再开启一个消费者终端,与之前不同的是,此终端监听Channel为"test-channel":

nsq_to_file --lookupd-http-address=127.0.0.1:4161 --channel="test-channel" --output-dir=tmp

向NSQ-http端口发送消息:

curl -d "hello world test5" "http://127.0.0.1:4151/put?topic=test"

可以看到输出文件中,有两行hello world test5,这是因为channel nsq_to_file(默认channel)和test-channel中都会有此条消息,而虽然channel nsq_to_file中有三个消费者在等待接收,但只有一个消费者获取到,其余继续等待。test-channel中的消费者获取信息后,也会写到/tmp目录下的文件中,因此两条消息均会被记录。

明白channel的作用后,在继续阅读channel的使用代码之前,将main后面几个同类型opt判定也看完:

	var topicsFromNSQLookupd bool	//判断消息来源是否为NSQ-LOOKUPD

	if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
		log.Fatal("--nsqd-tcp-address or --lookupd-http-address required.")
	}
	if len(nsqdTCPAddrs) != 0 && len(lookupdHTTPAddrs) != 0 {
		log.Fatal("use --nsqd-tcp-address or --lookupd-http-address not both")
	}
	//要求必须从TCP端口与HTTP端口中必须且只能监听其中一个,不可同时监听,也不可都不监听

	if *gzipLevel < 1 || *gzipLevel > 9 {
		log.Fatalf("invalid --gzip-level value (%d), should be 1-9", *gzipLevel)
	}

	// TODO: remove, deprecated
	if hasArg("gzip-compression") { 
		log.Printf("WARNING: --gzip-compression is deprecated in favor of --gzip-level")
		switch *gzipCompression {
		case 1:
			*gzipLevel = gzip.BestSpeed
		case 2:
			*gzipLevel = gzip.BestCompression
		case 3:
			*gzipLevel = gzip.DefaultCompression
		default:
			log.Fatalf("invalid --gzip-compression value (%d), should be 1,2,3", *gzipCompression)
		}
	}	//根据之前设置的压缩等级进行压缩

	cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
	cfg.MaxInFlight = *maxInFlight	//允许缓存的消息条数上限
⚠️ **GitHub.com Fallback** ⚠️