GoKafkaPubSubBuild - ErikJiang/kafka_cluster_example GitHub Wiki
创建 Kafka 订阅发布实例
在 golang 中对 kafka 的应用,会涉及到两个关键的 package:
Q: 为什么有了 sarama 还要使用 sarama-cluster ?
A: 由于本项目实例使用到了 kafka consumer group 的特性,sarama 本身并不支持消费组,而作为 sarama 的扩展,sarama-cluster 能够有效支持消费组;
本项目采用的是一个发布者(produce)节点、两个消费者(consume)节点(同属一个消费组)的 docker compose 编排配置;
- produce dockerfile 配置文件见:src/produce/dockerfile
- consume dockerfile 配置文件见:src/consume/dockerfile
- docker-compose 编排配置文件见:src/docker-compose.yml
关于发布
此处仅展示关于发布的关键代码片段:
// action 创建 Kafka 生产者
func action(c *cli.Context) error {
...
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Version = sarama.V2_1_0_0
log.Info().Msg("start make topic")
err := createTopic(config, brokerUrls[0], topic)
if err != nil {
log.Error().Msgf("%v", err)
return err
}
log.Info().Msg("start make producer")
// 创建异步的生产者
producer, err := sarama.NewAsyncProducer(brokerUrls, config)
if err != nil {
log.Error().Msgf("%v", err)
return err
}
defer producer.AsyncClose()
errChan := make(chan error, 1)
go func(p sarama.AsyncProducer) {
errChan <- httpServer(p, topic, listenAddr)
}(producer)
...
}
// createTopic 创建 Topic 主题
func createTopic(config *sarama.Config, brokerURL, topicName string) error {
broker := sarama.NewBroker(brokerURL)
broker.Open(config)
yes, err := broker.Connected()
if err != nil {
log.Error().Msgf("broker connect fail, %v", err)
return err
}
log.Debug().Msgf("broker connect status: %v", yes)
topicDetail := &sarama.TopicDetail{
NumPartitions: 2,
ReplicationFactor: 1,
ConfigEntries: make(map[string]*string),
}
topicDetails := make(map[string]*sarama.TopicDetail)
topicDetails[topicName] = topicDetail
request := sarama.CreateTopicsRequest{
Timeout: time.Second * 15,
TopicDetails: topicDetails,
}
response, err := broker.CreateTopics(&request)
if err != nil {
log.Error().Msgf("create topics fail, %v", err)
return err
}
log.Debug().Msgf("response length: %d", len(response.TopicErrors))
for key, val := range response.TopicErrors {
log.Debug().Msgf("Key is %s", key)
log.Debug().Msgf("Val is %#v", val.Err.Error())
log.Debug().Msgf("ValMsg is %#v", val.ErrMsg)
}
log.Info().Msgf("create topics response: %v", response)
broker.Close()
return nil
}
// httpServer 向 kafka 发布消息
func httpServer(producer sarama.AsyncProducer, topicName, listenAddr string) error {
gin.SetMode(gin.ReleaseMode)
router := gin.New()
router.POST("/api/v1/data", func(ctx *gin.Context) {
parent := context.Background()
defer parent.Done()
...
// send message to kafka
msg := &sarama.ProducerMessage{
Topic: topicName,
Key: sarama.StringEncoder(MakeSha1(form.Text)),
Value: sarama.ByteEncoder(formInBytes),
Timestamp: time.Now(),
}
producer.Input() <- msg
...
})
return router.Run(listenAddr)
}
这里需要注意,在创建 topic 时,所设置的 NumPartitions 数量应该大于等于消费组中消费者的数量,因为在 kafka 中,一个消费者分配一个 partition;
关于订阅
订阅的代码片段如下:
// clusterConsumer 创建消费者并订阅消息
func clusterConsumer(wg *sync.WaitGroup, brokers, topics []string, groupID string) {
defer wg.Done()
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Version = sarama.V2_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// 初始化消费者
consumer, err := cluster.NewConsumer(brokers, groupID, topics, config)
if err != nil {
log.Debug().Msgf("%s: sarama.NewSyncProducer err, message=%s \n", groupID, err)
return
}
defer consumer.Close()
// 捕获终止中断信号触发程序退出
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
// 消费错误信息
go func() {
for err := range consumer.Errors() {
log.Debug().Msgf("%s:Error: %s\n", groupID, err.Error())
}
}()
// 消费通知信息
go func() {
for ntf := range consumer.Notifications() {
log.Debug().Msgf("%s:Rebalanced: %v \n", groupID, ntf)
}
}()
// 消费信息及监听信号
var successes int
Loop:
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
...
consumer.MarkOffset(msg, "") // 标记信息为已处理
successes++
}
case <-signals:
break Loop
}
}
log.Debug().Msgf("%s consume %d messages", groupID, successes)
}
Docker 构建镜像
此处仅展示 produce 节点镜像构建配置,consume 节点配置与此类似:
############################
# STEP 1 构建可执行文件
############################
# 指定 GO 版本号
ARG GO_VERSION=1.11.1
# 指定构建环境
FROM golang:${GO_VERSION}-alpine3.7 AS builder
# china aliyun mirrors
RUN echo "http://mirrors.aliyun.com/alpine/v3.7/main/" > /etc/apk/repositories
# ca-certificates is required to call HTTPS endpoints.
# tzdata is required to time zone info.
RUN apk update && apk upgrade && apk add --no-cache ca-certificates tzdata && update-ca-certificates
# 创建用户 appuser
RUN adduser -D -g '' appuser
# 复制源码并指定工作目录
RUN mkdir -p /src/app
COPY . /src/app
WORKDIR /src/app
# 为 go build 设置环境变量:
# * CGO_ENABLED=0 表示构建一个静态链接的可执行程序
# * GOOS=linux GOARCH=amd64 表示指定linux 64位的运行环境
# * GOFLAGS=-mod=vendor 在执行 `go build` 强制查看 `/vendor` 目录
ENV CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GOFLAGS=-mod=vendor
# 构建可执行文件
RUN go build -a -installsuffix cgo -ldflags="-w -s" -o /src/app/produce
############################
# STEP 2 构建镜像
############################
# 指定最小镜像源
FROM scratch AS final
# 设置系统语言
ENV LANG en_US.UTF-8
# 从 builder 中导入时区信息
COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo
# 从 builder 中导入证书
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# 从 builder 中导入用户及组相关文件
COPY --from=builder /etc/passwd /etc/passwd
# 将构建的可执行文件复制到新镜像中
COPY --from=builder /src/app/produce /produce
# 端口申明
EXPOSE 9000
# 运行
ENTRYPOINT [ "/produce" ]
Docker Compose 编排
构建一个发布节点,两个订阅节点:
version: '3'
services:
produce:
build: ./produce
container_name: produce
ports:
- "9000:9000"
environment:
LISTEN_ADDRESS: '0.0.0.0:9000'
KAFKA_BROKERS: 'kfk1:19092,kfk2:29092,kfk3:39092'
KAFKA_TOPIC: 'foo'
consume1:
build: ./consume
container_name: consume1
environment:
KAFKA_BROKERS: 'kfk1:19092,kfk2:29092,kfk3:39092'
KAFKA_CONSUMER_GROUP_ID: 'consumer-group'
KAFKA_TOPIC: 'foo'
consume2:
build: ./consume
container_name: consume2
environment:
KAFKA_BROKERS: 'kfk1:19092,kfk2:29092,kfk3:39092'
KAFKA_CONSUMER_GROUP_ID: 'consumer-group'
KAFKA_TOPIC: 'foo'
networks:
default:
external:
name: kafka_default
此处的 docker compose 复用了之前构建 kafka 集群时的默认网路 kafka_default,
只有这样才能保证订阅和发布程序能够与 kafka 集群保持有效通信;
参考阅读: