三、服务框架中间件(拦截器)的实现 - FeifeiyuM/go-microservices-boilerplate GitHub Wiki

三、微服务框架中间件(拦截器)的实现

web 框架的中间件(middleware)大家都非常熟悉,例如 python 的 Django 框架, Node 的 KOA, Golang 的 gin, echo 都有中间件的概念,他们的实现方式可能各有差异,但是其核心原理都是通过闭包函数和者高阶构成的函数链,在 KOA 形象得将其称为洋葱模型,echo 将其称为 "function chained"。

一、中间件实现原理

中间件函数本质上就是一个函数的套娃结构,中间件函数一般都是闭包函数,其接受 handler 作为参数,返回值也是 handler 函数。 例如 代码1 所示

type handlerFunc func(param interface{}) error

// 中间件函数 1
func middlewareFunc1(handler handlerFunc) handlerFunc {
    return func(param interface{}) error {
        // do something
        err := handler(param)
        // do something
        return err
    }
} 
// 中间件函数 2
func middlewareFunc2(handler handlerFunc) handlerFunc {
    return func(param interface{}) error {
         // do something
        err := handler(param)
        // do something
        return err
    }
}
// 中间件函数 3
func middlewareFunc3(handler handlerFunc) handlerFunc {
    return func(param interface{}) error {
         // do something
        err := handler(param)
        // do something
        return err
    }
}

code1

我们通过观察 code1 中的三个 middlewareFunc1, middlewareFunc2, middlewareFunc3 观察所示,是否发现中间件函数外层是一个高阶函数,其接受 handler 函数作为参数,有返回了一个新的 handler 函数。中间件内层就是实现了返回了新的 handler 函数,其本身又是一个闭包函数。 通过分析器特定,我们可以发现 上述三个函数是否可以串联起来相互调用。例如 code2 所示。

package main

import "fmt"
// 定义 handler 函数格式
type handlerFunc func(param interface{}) error
// 中间件函数1
func middlewareFunc1(handler handlerFunc) handlerFunc {
	return func(param interface{}) error {
		fmt.Println("middlewareFunc1 start...")
        // 执行handler
		err := handler(param)
		fmt.Println("middlewareFunc1 end...")
		return err
	}
}
// 中间件函数2
func middlewareFunc2(handler handlerFunc) handlerFunc {
	return func(param interface{}) error {
		fmt.Println("middlewareFunc2 start...")
        // 执行handler
		err := handler(param)
		fmt.Println("middlewareFunc2 end...")
		return err
	}
}
// 中间件函数3
func middlewareFunc3(handler handlerFunc) handlerFunc {
	return func(param interface{}) error {
		fmt.Println("middlewareFunc3 start...")
        // 执行handler
		err := handler(param)
		fmt.Println("middlewareFunc3 end...")
		return err
	}
}
// 真正的 handler 函数
func handler(param interface{}) error {
	fmt.Printf("this is the true handler: param %v \n", param)
	return nil
}

func main() {
	// 将真正的 handler 函数用中间件进行封装,以实现相关功能
	newHandler := middlewareFunc1(middlewareFunc2(middlewareFunc3(handler)))
	// 调用包装后的 handler 函数
	_ = newHandler("hello")
}

code2 分析下 code2 中的代码,在 main 函数中有一段比较特殊(丑陋)的代码,就是 middlewareFunc 函数们相互嵌套,middlewareFunc1 接收 middlewareFunc2 的返回值作为,参数,middlewareFunc2 又接收 middlewareFunc3 的返回值作为参数,middlewareFunc3 接收真正的 handler 函数作为参数。 当然,如果我们有更多的 middlewareFuncs 还能继续嵌套,是不是又俄罗斯套娃的既视感。

code2 中输出的结果又是怎么样的呢? 如 result1 所示, 先输出的是最外层的 middlewareFunc1 中的 start, 最后输出的也是 middlewareFunc1 的 end。往里面一层就是 middlewareFunc2 的 start 和 end。 最中间的输出是真正 handler 函数的。这样是不是又有剥洋葱的感觉,一层一层往里面剥,最中心的才是我们真正的核心逻辑。

middlewareFunc1 start...
middlewareFunc2 start...
middlewareFunc3 start...
this is the true handler: param hello 
middlewareFunc3 end...
middlewareFunc2 end...
middlewareFunc1 end...

result1
但是这样的写法相对丑陋,如果有更多的中间件函数时,这样的嵌套将会很深,参考 koa 或 echo 的实现样式,我们可以使用 use 方法来加载中间件,这个将会在 nsq 中间件开发一节中介绍。

中间件原理到这里我们已经基本描述完成,我们可以对比 echo 或者其他的 web 框架,其大致的原理是否如此。

二、echo 中间件的实现

echo 的中间件 是比较丰富的,例如有Basic Auth, CORS, Logger 等。这些是都开箱即用的。

当然,echo 也允许我们自定义实现 middleware, 其官方文档coookbook 中也有介绍, 其 handler 函数为 echo.HandlerFunc, 定义为 func(c echo.Context) error,同我们上面的code1, code2 中定义的类似。参数 echo.Context 就是 echo 对 http 请求对象上下文的封装。

接下来我们简单介绍实现个获取 request_id 的中间件函数, 如 code3

func EchoRequestID(next echo.HandlerFunc) echo.HandlerFunc {
	return func(c echo.Context) error {
		r := c.Request()
        // 先尝试从 request id 中获取
		rID := r.Header.Get("X-Request-Id")
		if rID == "" {
            // 如果没有就通过新建一个 uuid 来作为 request_id
			uid, _ := uuid.NewUUID()
			rID = uid.String()
		}
        // 将 request_id 塞入 上下文 的 context 对象
        // 为什么加入到 context 对象中,在介绍日志的一节中会解释
		r = r.WithContext(context.WithValue(r.Context(), "_requestID", rID))
		c.SetRequest(r)
        // 执行 handler 
		return next(c)
	}
}

func hello(c echo.Context) error {
	return c.String(http.StatusOK, "Hello, World!")
}

func main() {
	e := echo.New()
	// 注册中间件
	e.Use(middleware.Logger())
	// Routes
	e.GET("/", hello)
	// Start server
	e.Logger.Fatal(e.Start(":1323"))
}

code3

三、gRPC 实现中间件

Golang 的gRPC 的中间件服务的中间件,我们可以采用成熟的框架,go-grpc-middleware,其将中间件函数称为 拦截器(Interceptors), 采用这种叫法的web框架也不少,例如 JAVA 的 spring boot。这样称呼中间件函数也挺合理的,中间件函数的执行,就相当于对一个请求的进入和返回流程的拦截,以处理一些公共事务。

code3 一样,我们来实现下提取 grpc 请求的 request_id 的中间件,如 code4 所示。

// grpc server
type server struct {
	pb.UnimplementedAccountServer
}
// 账号注册
func (s *rpcHandler) Register(ctx context.Context, req *pb.RegisterReq) (*pb.RegisterResp, error) {
	if req == nil {
		return nil, nil
	}
	fmt.Printf("name: %s, gender: %d, address: %s", req.Name, req.Gender, req.Address)
	return &pb.RegisterResp{
		Msg:                  "ok",
	}, nil
}
// grpc 中间件实现
func UnaryRequestIDInterceptor() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		md, ok := metadata.FromIncomingContext(ctx)
		if !ok {
			md = metadata.Pairs()
		}
		var requestID string
		if val, ok := md["x-request-id"]; ok {
			requestID = val[0]
		} else {
			uid, _ := uuid.NewUUID()
			requestID = uid.String()
		}
		ctx = context.WithValue(ctx, "_requestID", requestID)
		return handler(ctx, req)
	}
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	// 初始化服务的时候,将中间件函数 串联起来
	s := grpc.NewServer(
		grpc.UnaryInterceptor(
			grpc_middleware.ChainUnaryServer(
				UnaryRequestIDInterceptor(),
			),
		),
	)
	pb.RegisterMemberServer(s, &server{})
	// 启动服务
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

code4

分析 code4, 本例中选用的 handler 定义同《RPC(gRPC)服务和消息中间件(NSQ)的集成》中的一样,我们在此就不作讨论。 在 go-grpc-middleware 框架下,grpc 中间件函数的定义为 func() grpc.UnaryServerInterceptor, 这个定义同我们上面介绍的 “一、中间件实现原理”, “二、echo 中间件的实现” 中介绍的中间函数不太一致,没有入参。不需要传入 grpc.UnaryServerInterceptor 作为参数。

其主要门道,我们可以再中间件注册函数 grpc_middleware.ChainUnaryServer() 中发现,例如 code5 所示。

// will see context changes of one and two.
func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
	n := len(interceptors)

	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		// 转换层标准的中间件函数结构
		chainer := func(currentInter grpc.UnaryServerInterceptor, currentHandler grpc.UnaryHandler) grpc.UnaryHandler {
			return func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
				return currentInter(currentCtx, currentReq, info, currentHandler)
			}
		}
		// 将中间件函数串连起来
		chainedHandler := handler
		for i := n - 1; i >= 0; i-- {
			chainedHandler = chainer(interceptors[i], chainedHandler)
		}

		return chainedHandler(ctx, req)
	}
}

code5

code5 中的 chainer 函数将 grpc.UnaryServerInterceptor 转换成了我们之前讨论的中间件结构,grpc.UnaryHandler 即是函数的入参格式,也是函数返回格式,因此 grpc 中间件定义函数的本质同 echo 中的定义是一样的,只是 grpc_middleware 中多做了一层封装而已,ChainUnaryServer 函数同 echo.Use 函数实现功能有一定类似。

四、NSQ 实现中间件

Golang NSQ 客户端框架中是不支持中间件,如果要实现类似 echo 和 go-grpc-middleware 类似中间件的支持。我们可以在《RPC(gRPC)服务和消息中间件(NSQ)的集成》中的 “2.1 NSQ 对象封装” 一节中定义的 NsqConsumer 对象(结构体)的基础上继续封装,例如 code6

// 定义 新的 NsqConsumer 对象
type NsqConsumer struct {
	// nsq lookupds 地址
	lookupds            []string  
	// nsq lookupd 轮询间隔
	lookupdPollInterval time.Duration  
	// 一次最多接收多少条消息
	maxInFlight         int
	// 注册的 consumers 对象
	consumers []*nsq.Consumer  
}
// 消息 handler 函数定义
// 参考了 grpc, echo 的 handler 函数。引入两个参数一个是 context 上下文,一个是 nsq 的消息体
type MqHandlerFunc func(ctx context.Context, msg *nsq.Message) error
// 初始化 Nsq consumer 对象
func NewNsqConsumer(lookupds []string, pollInterval time.Duration, maxInFlight int) *NsqConsumer {
	return &NsqConsumer{
		lookupds: lookupds,
		lookupdPollInterval: pollInterval,
		maxInFlight: maxInFlight,
	}
}

code6

4.1 NSQ 中间件封装实现

参考“一、中间件实现原理”中介绍的实现思路,按中间件函数的入参和返回都是 handler 函数的原则,我们可以给出中间件函数的定义:func(handler MqHandlerFunc) MqHandlerFunc。为了传递上下文信息,我们引入一个 context.Context 对象, context.Context 对象在 Golang 中是作为一个上下文对象,可以携带传递请求的上下文信息比如:过期时间,request_id 等。其跟随着请求执行的流程在各个函数中传递,我们可以发现 Golang 中很多函数的第一个参数类型都是 context.Context。 所以最终的 中间件函数定义为 func(ctx context.Context, handler MqHandlerFunc) MqHandlerFunc。

在 “一、中间件实现原理” 提到,通过函数嵌套的方式将中间件函数串联起来比较丑陋,扩展性也不强,如果有是个中间需要串联难道就写十遍。想比较优雅的实现中间件的串联,其实在 code5 中已经有例子了,通过一个数组将所有中间件函数存储起来,然后通过遍历的方式将函数串联起来。将中间件填入数组,我们可以参考 echo 中的实现, 定义一个 Use 方法来实现。因此,我们可以将《RPC(gRPC)服务和消息中间件(NSQ)的集成》中的 “2.1 NSQ 对象封” 中的 NsqConsumer 重新完整实现如下 code7

// MqHandlerFunc 定义消息处理函数格式
type MqHandlerFunc func(ctx context.Context, msg *nsq.Message) error
// 中间函数格式
type middleware func(handler MqHandlerFunc) MqHandlerFunc
// NsqConsumer nsq consumer
type NsqConsumer struct {
	lookupds            []string
	lookupdPollInterval time.Duration
	maxInFlight         int
	consumers           []*nsq.Consumer
	middlewares         []middleware // 定义数据存储中间件函数
}
// 初始化 Nsq consumer 对象
func NewNsqConsumer(lookupds []string, pollInterval time.Duration, maxInFlight int) *NsqConsumer {
	return &NsqConsumer{
		lookupds:            lookupds,
		lookupdPollInterval: pollInterval,
		maxInFlight:         maxInFlight,
	}
}
// 将中间存入数组
func (n *NsqConsumer) Use(mids ...middleware) {
	for _, mid := range mids {
		n.middlewares = append(n.middlewares, mid)
	}
}
// 将中间件串联起来,
func (n *NsqConsumer) wrap(h MqHandlerFunc) MqHandlerFunc {
	// 这边为什么采用数组倒序?
	// 一般情况下,我们期望最先加入中间件数组的函数时最先被调用的(位于洋葱模型的最外层)
	// wrap 函数中传入 handler 函数是最原始的 handler 函数,没有被中间件函数包裹的
	// 按照 code2 中的  newHandler = middlewareFunc1(middlewareFunc2(middlewareFunc3(handler))) 来看
	// 接受最原始的 handler 函数的中间件函数肯定是最后加入中间数组的入参,依次类推,因此中间件数组迭代的时候是倒序的
	for i := len(n.middlewares) - 1; i >= 0; i-- {
		mid := n.middlewares[i]
		h = mid(h)
	}
	return h
}
// 将 我们定义的 MqHandlerFunc 转换成 nsq.Consumer 内接受的 nsq.HandlerFunc
func (n *NsqConsumer) toNsqHandler(handlerFunc MqHandlerFunc) nsq.HandlerFunc {
	return func(msg *nsq.Message) error {
		ctx := context.TODO()
		return handlerFunc(ctx, msg)
	}
}
// 注册 topic 和 handler Func, 类似 echo 的路由配置
func (n *NsqConsumer) RegisterHandler(topic, channel string, handler MqHandlerFunc) error {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = n.lookupdPollInterval
	c, err := nsq.NewConsumer(topic, channel, cfg)
	if err != nil {
		return err
	}
	c.ChangeMaxInFlight(n.maxInFlight)
	c.AddHandler(n.toNsqHandler(handler))
	n.consumers = append(n.consumers, c)
	return nil
}
// 模拟 grpc 和 echo 的开启服务的逻辑
// 因为 consumer 启动的时候本身就采用了 goroutine, 所以在 nsq Start 的时候就不用像 echo 和 grpc 一样用 goroutine
func (n *NsqConsumer) Start() error {
	for _, h := range n.consumers {
		if err := h.ConnectToNSQLookupds(n.lookupds); err != nil {
			return err
		}
	}
	return nil
}
// gracefully close consumer
func (n *NsqConsumer) Close() {
	for _, h := range n.consumers {
		h.Stop()
	}
}

code7

4.2 NSQ 中间件代码实现

按照 二,三节中的惯例,我们给 nsq consumer 实现一个提取 request_id 的中间件。每个 nsq 消息都有一个消息id,我们可以把这个 id 作为当前消息消费过程的 request_id。完整代码如 code8 所示。

// 中间实现
func MqRequestId(handler MqHandlerFunc) MqHandlerFunc {
   return func(ctx context.Context, msg *nsq.Message) (err error) {
   	ctx = context.WithValue(ctx, "_requestID", fmt.Sprintf("%s", msg.ID))
   	return handler(ctx, msg)
   }
}
// handler 定义
type mqHandler struct {}
func NewMqHandler() *mqHandler {
   return &mqHandler{}
}
// handler 函数
func (m *mqHandler) recvHello(_ context.Context, msg *nsq.Message) error {
   type param struct {
   	Name string `json:"name"`
   }
   req := &param{}
   if err := json.Unmarshal(msg.Body, req); err != nil {
   	return err
   }
   fmt.Printf("%s send a hello", req.Name)
   return nil
}
func(m *mqHandler) Register(c *NsqConsumer) error {
   // 注册 将 topic 和 handler 行数绑定起来
   if err := c.RegisterHandler("test-recv_hello", "test", m.recvHello); err != nil {
   	return err
   }
   return nil
}

func main() {
   consumer := NewNsqConsumer([]string{"127.0.0.1:4161"}, 10*time.Second, 2)
   // 加入中间件函数
   consumer.Use(MqRequestId)
   // 初始化 handler
   mqHandler := NewMqHandler()
   mqHandler.Register(consumer)
   // 开启consumer
   consumer.Start()
   
   // 防止进程退出
   c := make(chan interface{}, 1)
   <- c
}

code8

五、常用中间件讨论

在服务中,常见的中间件函数有:

  • 1、请求标准日志(比如打印请求和返回结果的body) 输出函数;
  • 2、提取或生成请求 request_id 的函数;
  • 3、Golang 中还有其特有的进程崩溃的异常捕获 recovery 函数;
  • 4、请求跨域设置的函数;
  • 5、如果引入链路追踪的还有链路追踪初始化的中间件函数;
  • 6、请求 token 解析函数;

其实在服务中,如果每个请求都须处理的公共逻辑,都可以封装成中间件函数来处理。例如,在本框架中,echo, grpc, nsq 的 handler 都有 error 对象的返回。因此,我们可以封装一个错误处理的中间件函数,以处理 error 对象中携带的逻辑,比如是否发送邮件等。

对于 6、中 token 解析是否映入这个中间件函数持保留意见,并不是每个接口都需要token, 良好的封装的情况下,token 解析放在 handler 函数中,这个开发成本应该是在可接受的范围,而不是不管是否需要所有接口都去解析 token,毕竟解密相对来说是个比较消耗资源的操作,没必要一股脑每个接口都做这个事情。

六、代码示例

基于前章示例代码的基础上,我们将中间件集成入示例工程。我们引入中的中间件有:错误处理,标准日志输出,recovery, request_id, 链路追踪(tracing), gRPC 的参数校验。 整合后工程地址