二、Rpc(grpc)服务和消息中间件(nsq)的集成 - FeifeiyuM/go-microservices-boilerplate GitHub Wiki

RPC(gRPC)服务和消息中间件(NSQ)的集成

随着服务的发展,会越来越复杂,除了 rest(http) 服务 之外, 越来越多的服务会同时接入 rpc 服务和消息中间件。本文将基于 gRPC 和 nsq 分别介绍如何接入 rpc 服务和 nsq 服务。

一、GRPC 服务简单接入

Golang 版本的 gRPC Quick Start 可以参见官方教程。本文就不介绍 proto 文件语法 Protocol Buffer,Protocol Buffer 编译器的安装和 Protocol Buffer 文件的编译。

在这里我们实现在 《基于 echo 搭建的简单 http 服务和工程组织结构》 一文中我们实现的用户注册功能。

1.1 proto 文件编辑 代码1

syntax = "proto3";

option go_package = ";pb";

package pb;

// 注册请求参数对象
message RegisterReq {
    // 姓名
    string name = 1;
    // 性别
    int32 gender = 2;
    // 地址
    string address = 3;
}

// 请求结果
message RegisterResp {
    string msg = 1;
}

// 服务方法定义
service Account {
    // rpc 注册方法定义
    rpc Register (RegisterReq) returns (RegisterResp) {}
}

代码1 生成通过命令 protoc --go_out=./pb --go_opt=paths=source_relative --go-grpc_out=./pb --go-grpc_opt=paths=source_relative ./account.proto 生成 pb 文件,并将其放置在当前目录的 pb 路径下。

1.2 简单 gRPC 服务代码实现

如下: 代码2


// 读取配置文件
type Config struct {
	RpcAddr string
}
func InitConfig() *Config {
	config := &Config{
		RpcAddr: ":8512",
	}
	return config
}

// 启动服务
type server struct {
	cfg *Config
	grpc *grpc.Server
}
func NewServer(cfg *Config) *server {
	return &server{
		cfg: cfg,
	}
}

func (s *server) Close() {
	if s.grpc != nil {
		s.grpc.GracefulStop()
	}
}

// grpc handler
type rpcHandler struct {
	pb.UnimplementedAccountServer
}
func NewRpcHandler() *rpcHandler {
	return &rpcHandler{}
}
// 账号注册
func (s *rpcHandler) Register(ctx context.Context, req *pb.RegisterReq) (*pb.RegisterResp, error) {
	if req == nil {
		return nil, nil
	}
	fmt.Printf("name: %s, gender: %d, address: %s", req.Name, req.Gender, req.Address)
	return &pb.RegisterResp{
		Msg:                  "ok",
	}, nil
}

func (s *server) InitGRpc() {
	s.grpc = grpc.NewServer()
	pb.RegisterAccountServer(s.grpc, NewRpcHandler())
}

func (s *server) Run() {
	// grpc 服务启动
	s.InitGRpc()
	go func() {
		lis, err := net.Listen("tcp", s.cfg.RpcAddr)
		if err != nil {
			panic(err)
		}
		if err = s.grpc.Serve(lis); err != nil {
			print(err)
		}
	}()
}

func StartServer() {
	// 初始配置文件
	cfg := InitConfig()
	// 启动服务
	srv := NewServer(cfg)
	srv.Run()

	// 开启系统信号接收通道
	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
	s := <- c

	switch s {
	case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
		srv.Close()
	case syscall.SIGHUP:
	default:
	}
}

func main() {
	StartServer()
}

代码2

我们通过对比《基于 echo 搭建的简单 http 服务和工程组织结构》 一文中的 ”实现一个最小的 http 服务” 代码实现发现,简单 gRPC 服务 实现和 简单的 http 服务 实现的代码组织结构可以非常相似,因此,我们可以非常快速地套用 《基于 echo 搭建的简单 http 服务和工程组织结构》中介绍的工程框架的组织,并将 gPRC api 和 rest api 之间的差异收敛于 interface(controller) 层,而 server, dao, model 层都可以相互复用。

二、 NSQ 消息服务(消费者)接入

NSQ是一个基于 Golang 语言的分布式实时消息平台,本文中将其与基于 Golang 的微服务框架集成作为例子。 关于 NSQ 架构的介绍可以详见NSQ官方介绍

Golang 的 NSQ 客户端我们将采用其官方的 package go-nsq

模仿代码3的结构,我们将 NSQ 服务也整理成 rpc 或者 rest 接口一样模式。消息的 topic 就相当于 rest 接口的路由,如下代码4

从 nsq-go 官方教程的介绍来说,nsq consumer 并不是像 echo 或者 grpc 一样, 一个 echo.Echo 或者 grpc.Server 对象可以注册多个 handler 函数。一个 nsq.Consumer 对象只能接受一个 handler 函数。因此我们为了实现 echo 或者 grpc 一样的代码组织逻辑,我们需要做一定封装。

2.1 NSQ 对象封装

我们可以将定义一个 NSQ 封装对象,对象中可以包含多个 nsq.Consumer 对象,并在该对象的基础上封装 topic,handler 的注册逻辑,以实现类似 echo 一样的路由配置, 如下: 代码3

// 定义 新的 NsqConsumer 对象
type NsqConsumer struct {
	// nsq lookupds 地址
	lookupds            []string  
	// nsq consumer config 
	lookupdPollInterval time.Duration 
	// nsq consumer config
	maxInFlight         int 
	// 注册的 consumers 对象
	consumers []*nsq.Consumer  
}
// 消息 handler 函数定义
// 参考了 grpc, echo 的 handler 函数。引入两个参数一个是 context 上下文,一个是 nsq 的消息体
type MqHandlerFunc func(ctx context.Context, msg *nsq.Message) error
// 初始化 Nsq consumer 对象
func NewNsqConsumer(lookupds []string, pollInterval time.Duration, maxInFlight int) *NsqConsumer {
	return &NsqConsumer{
		lookupds: lookupds,
		lookupdPollInterval: pollInterval,
		maxInFlight: maxInFlight,
	}
}
// 将 我们定义的 MqHandlerFunc 转换成 nsq.Consumer 内接受的 nsq.HandlerFunc
func (n *NsqConsumer) toNsqHandler(handlerFunc MqHandlerFunc) nsq.HandlerFunc {
	return func(msg *nsq.Message) error {
		ctx := context.TODO()
		return handlerFunc(ctx, msg)
	}
}
// 注册 topic 和 handler Func, 类似 echo 的路由配置
func (n *NsqConsumer) RegisterHandler(topic, channel string, handler MqHandlerFunc) error {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = n.lookupdPollInterval
	c, err := nsq.NewConsumer(topic, channel, cfg)
	if err != nil {
		return err
	}
	c.ChangeMaxInFlight(n.maxInFlight)
	c.AddHandler(n.toNsqHandler(handler))
	n.consumers = append(n.consumers, c)
	return nil
}
// 模拟 grpc 和 echo 的开启服务的逻辑
// 因为 consumer 启动的时候本身就才采用了 goroutine, 所以在 nsq Start 的时候就不用像 echo 和 grpc 一样用 goroutine
func (n *NsqConsumer) Start() error {
	for _, h := range n.consumers {
		if err := h.ConnectToNSQLookupds(n.lookupds); err != nil {
			return err
		}
	}
	return nil
}
// gracefully close consumer
func (n *NsqConsumer) Close() {
	for _, h := range n.consumers {
		h.Stop()
	}
}

代码3

2.2 nsq 服务 code 简单实现

有了上一节 NsqConsumer 对象的铺垫,我们接下来实现类似 简单 gRPC 服务 CODE 实现 这样的组织结构就相对清晰了, 简单实现如下 code4 所示

// 读取配置文件
type Config struct {
	HttpAddr string
	RpcAddr string
	NsqLookupd []string
}
func InitConfig() *Config {
	config := &Config{
		NsqLookupd: []string{"10.0.244.37:4161"},
	}
	return config
}
// 定义 mq handler 对象
type mqHandler struct {}

func NewMqHandler() *mqHandler {
	return &mqHandler{}
}
// 定义 recv hello 方法
func (m *mqHandler) recvHello(_ context.Context, msg *nsq.Message) error {
	if msg == nil {
		return errors.New("msg is null")
	}
	type param struct {
		Name string `json:"name"`
	}
	req := &param{}
	if err := json.Unmarshal(msg.Body, req); err != nil {
		return err
	}
	fmt.Printf("%s send a hello", req.Name)
	return nil
}

// 注册 topic 对应的 consumer
func (m *mqHandler) Register(c *NsqConsumer) {
	if err := c.RegisterHandler("test-recv_hello", "test", m.recvHello); err != nil {
		panic(err)
	}
}

// 初始化 代码3 中定义的 NsqConsumer
func (s *server) InitNsq() {
	s.nsq = NewNsqConsumer([]string(s.cfg.NsqLookupd), 10*time.Second, 2)
	mqHandler := NewMqHandler()
	mqHandler.Register(s.nsq)
}

// 启动服务
type server struct {
	cfg *Config
	nsq *NsqConsumer
}
func NewServer(cfg *Config) *server {
	return &server{
		cfg: cfg,
	}
}

func (s *server) Run() {
	// 启动 nsq consumber
	s.InitNsq()
	if err := s.nsq.Start(); err != nil {
		panic(err)
	}
}
// 关闭服务
func (s *server) Close() {
	if s.nsq != nil {
		s.Close()
	}
}

func StartServer() {
	// 初始配置文件
	cfg := InitConfig()
	// 启动服务
	srv := NewServer(cfg)
	srv.Run()

	// 开启系统信号接收通道
	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
	s := <- c

	switch s {
	case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
		srv.Close()
	case syscall.SIGHUP:
	default:
	}
}

func main() {
	StartServer()
}

代码4

三、示例代码:

按 《基于 echo 搭建的简单 http 服务和工程组织结构》 定义的工程框架,将 mq 和 grpc 集成进示例工程。 整合后工程地址