通用异步化方案 kafka - MrLining/project GitHub Wiki

##逻辑视图

##基础逻辑视图

#如何使用 ##Step1.消息类型配置

###配置文件:Framework\Libs\Util\Constant\Mq;

class Mq extends \Framework\Config\MqRouter{
    private static $message_map = array(
        'partner_join_success' => array(    //消息标识类型
            'topic'         => self::MQ_TOPIC_PARTNER_CORE,     //消息所属主题
            'consumers'     => array(   //消费者程序
                'Mlservice\\Package\\Consumer\\Partner_join_success\\SyncPartnerExtInfo',
            )
        ),
        'goods_status' => array(
            'topic'     => self::MQ_TOPIC_GOODS_STATUS,
            'consumers' => array(
                'Mlservice\\Package\\Consumer\\Goods_status\\SyncGoodsStatus',
            ),
        ),
    );

    /**
     * 根据消息类型,获取消息配置信息
     * @param $message_type
     * @return null
     */
    public static function getMessageInfo($message_type) {
        return isset(self::$message_map[$message_type]) ? self::$message_map[$message_type] : null;
    }
}

###相关名词解释

  1. 消息类型(MessageKey),标识一类消息
  2. 消息主题(Topic),每一种消息主题下会包含多种消息类型,每种消息主题可初步按照
  3. 消费者(Consumer),消费者进行消息订阅,当消息产生时负责处理某种消息类型下的消息,采用发送 & 订阅模式,顾一种消息消息类型,可以对多种消费者

###消费主题list

标识常量 value 描述
Mq::MQ_TOPIC_PARTNER_CORE partner_core_topic 商家核心业务
Mq::MQ_TOPIC_PARTNER_OTHER partner_other_topic 商家非核心业务
Mq::MQ_TOPIC_GOODS_CORE goods_core_topic 商品核心业务
Mq::MQ_TOPIC_GOODS_OTHER goods_other_topic 商品非核心业务
Mq::MQ_TOPIC_GOODS_INVENTORY goods_inventory_topic 商品库存业务
Mq::MQ_TOPIC_GOODS_STATUS goods_status_topic 商品状态业务
Mq::MQ_TOPIC_GOODS_CACHE goods_cache_topic 商品缓存业务

##Step2. 发送消息

###示例:

class Partner_join extends \Framework\Libs\Http\SubmitModule {
    public function main() {
        //arg[1] : 消息类型
        //arg[2] : 发送数据
        //arg[3] : 分区标识,optional
        Framework\Libs\Mq\ProducerProxy::sendMsg('partner_join_success', array(
            'a' => 1,
            'b' => 2,
        ), 12345);
    }
}

##Step3. 订阅消息的消费程序开发 (Consumer 开发)

#示例:


class SyncPartnerExtInfo extends \Framework\Libs\Mq\ConsumerBase{
    const _DATABASE_ = 'brd_partner';   //操作db
    
    /**
     * 主方法,执行逻辑
     * @param $msg_key 消息类型标识
     * @param $data 消息数据
     * @return boolean|array  返回 true or false | array(boolean, string)
     */
    public function main($msg_key, $data) {
        //返回 boolean,标识正确or错误
        return true;
        
        //返回 array
        return array(true, '执行xxx失败');
    }
}
  1. DB类型Consumer,继承\Framework\Libs\Mq\ConsumerBaseDb;其他类型Consumer,继承\Framework\Libs\Mq\ConsumerBase
  2. 文件目录:
    1. GOODS_TOPIC主题:goods:package/consumer/xxx
    2. PARTNER_TOPIC主题:partner:pacakge/consumer/xxx