消息部件的设计 - kennethjiang/Wolke GitHub Wiki

定义

  • 消息部件(Message-Driven Component):完全由消息驱动的部件.一般是以daemon运行的进程.在启动时和消息队列服务器建立连接,监听一个或多个消息队列.当从队列里收到消息时,根据消息内容进行相应处理.处理结果以消息的形式送进(另外一个)消息队列.
  • 消息的结构(scheme):消息发送方和接受方为了能够互相沟通,消息内容(Payload)应该遵循一定的结构.这个结构可以是松散定义,如json,或是明确而清晰地定义,如xsd.

设计目标

  1. Traceability。消息总线最大的不利在于追踪和查找问题的难度。因此我们的设计要能够使如下的工作变得容易:
    • 察看每一条消息的结构(scheme)
    • 查找某一条消息被哪一个或几个部件接受了;
    • 在一个工作流中,消息是如何在不同的部件间流动。比如,当用户递交了建立一个MySQL服务器的请求,哪些消息被依次触发,这些消息流过哪些部件,处理结果如何;
    • 针对特定字串的查询。例如,查找包含用户ID 1234567的所有消息;
    • 察看每一条消息的消息体。
  2. 向上兼容性。这在项目初期不是问题。但是产品上线后,当后继版本上线时,消息结构可能发生变化。如何保证和前一个版本的兼容?如果不兼容,如何保证新版本上线时不会引起服务中断。

设计原则

针对以上设计目标,我们必须遵从的设计原则是:

  1. 在系统范围内定义每个消息的结构
  2. 每个消息部件都必须在固定的位置(很可能是某个配置文件)定义以下信息:
    • 该部件接收的消息
    • 该部件发送的消息
    • 这些消息的结构。因为消息的结构是在整个系统内共享,这里可以只存放引用。
  3. 定义这些信息的位置必须在所有部件间保持一致。
  4. 必须建立一个共享模块来完成以下功能:
    • 连接消息服务器,侦听接收消息。
    • 发送消息至消息服务器
    • 序列化(Serialize)内存对象
    • 反序列化(Deserialize)消息
    • logging

设计备选1

  1. 消息采用xml格式, 用xsd定义scheme
  2. 选用一个能够把内存对象序列华/反序列化的开源工具.该工具必须具备根据xsd进行数据格式校验的功能.

设计备选2

  1. 消息采用json格式
  2. 缺点是无法对数据格式做校验.

设计备选3

  1. 用RabbitMQ实现消息队列服务

设计备选4

视需要而定,在消息队列机制上模拟同步调用.例如,监视部件可能会决定发送"请报告虚拟机13423的状态"消息,然后监听回应消息.如果有一个底层模块能够把这个过程包装成一个同步方法调用,将会极大的简化上层应用.

这个设计最大的问题在于它不符合分布式系统的原则.无共享原则决定了发送方不能确定消息将会被一个还是多个部件接收,或者根本未被接收.提供模拟同步调用的机制可能会让开发人员较少的考虑如何处理错误情况.

并且,使用模拟同步调用的部件是很难满足可重入原则的.

RabbitMQ的RPC在一定程度上实现了这个功能.

设计备选5

  1. 我在寻找一个工具能集中的控制消息在系统中的流转.这个工具要具有以下功能:
    • 以清晰的控制文件定义消息流.例如,在收到消息A,如果消息A表示成功,发送消息B,否则发送消息C
    • 能作强大的消息数据转换.例如,把接收到的消息A里/Result/Instnaces/Instance[1]/IP放到将要发出的消息B里的/Params/Node/IP
    • 有简单的工作流定义,如if-else,并发,跳转,子流程等等.
    • 提供监视和查错功能.
    • 轻量级
  2. ruote似乎是一个合适的工作流引擎.最大的问题似乎是不具有消息数据转换功能.
  3. 其它比较流行的工作流/BPEL引擎包括jBPM,Enhydra Shark, Apache ODE, YAWL.

设计备选6

消息部件的自我宣传(self-advocating)和心跳.当消息部件启动时,可以发送消息宣传自己的诞生.其它(主要是监视部件)接收此消息并更新部件列表和状态.消息部件在运行期间还会不停发送心跳,证明自己健康.当监视部件收不到心跳时,就判定该部件死亡,并启动应急流程.

消息总线示意图

message bus