29 在kubernetes集群中快速搭建 kafka - xiaoxin01/Blog GitHub Wiki

本文介绍如何在 k8s 集群中搭建 kafka

通过 Helm 可以快速在 k8s 集群搭建 kafka

环境准备

开始之前,先移步到 kafka 的helm仓库看一下说明:

https://github.com/helm/charts/tree/master/incubator/kafka

也可以看下可以设定的参数

https://github.com/helm/charts/blob/master/incubator/kafka/values.yaml

对于需要调整的参数,建立 custom-values.yaml:

kafkaHeapOptions: "-Xmx256M -Xms256M"

创建 pv

默认会建立 3 个 broker,这里 pv 创建3个, pv.yaml:

apiVersion: v1
kind: PersistentVolume
metadata:
  name: pv-kafka
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 1Gi
  nfs:
    path: /data/kafka
    server: nfs.server1
  persistentVolumeReclaimPolicy: Retain

---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: pv-kafka2
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 1Gi
  nfs:
    path: /data/kafka
    server: nfs.server2
  persistentVolumeReclaimPolicy: Retain


---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: pv-kafka3
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 1Gi
  nfs:
    path: /data/kafka
    server: nfs.server3
  persistentVolumeReclaimPolicy: Retain

通过 Helm 搭建

下面的命令将 kafka 搭建在 kafka 的 namespace 下:

helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
kubectl create ns kafka
helm install --name my-kafka --namespace -f custom-values.yaml kafka incubator/kafka

创建一个 pod 来测试 kafka : pod-testclient.yaml

apiVersion: v1
kind: Pod
metadata:
  name: testclient
  namespace: kafka
spec:
  containers:
  - name: kafka
    image: solsson/kafka:0.11.0.0
    command:
      - sh
      - -c
      - "exec tail -f /dev/null"

完成之后,可以通过下面的命令来测试:

    # 列出所有的 topic
    kubectl -n kafka exec testclient -- /usr/bin/kafka-topics --zookeeper my-kafka-zookeeper:2181 --list
    # 创建一个新的 topic
    kubectl -n kafka exec testclient -- /usr/bin/kafka-topics --zookeeper my-kafka-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1
    # 监听消息
    kubectl -n kafka exec -ti testclient -- /usr/bin/kafka-console-consumer --bootstrap-server my-kafka:9092 --topic test1 --from-beginning
    # 发送消息
    kubectl -n kafka exec -ti testclient -- /usr/bin/kafka-console-producer --broker-list my-kafka-0.cluster.local:19092 --topic test1

在集群的其他 namespace 访问,可以通过 Service 的 DNS 来访问:

my-kafka.kafka.svc.cluster.local:9092

如何从 Ingress 访问 kafka?

目前通过命令创建的 ingress 不支持 tcp 服务,所以无法直接创建 ingress 来暴露 kafka。

但是 Ingress Controller 有监听 configmap,通过如下的命令可以查看:

kubectl -n ingress-nginx describe daemonset.apps/nginx-ingress-controller
Name:           nginx-ingress-controller
Selector:       app=ingress-nginx
Pod Template:
  Containers:
   nginx-ingress-controller:
    Image:       rancher/nginx-ingress-controller:0.16.2-rancher1
    Ports:       80/TCP, 443/TCP
    Host Ports:  80/TCP, 443/TCP
    Args:
      /nginx-ingress-controller
      --default-backend-service=$(POD_NAMESPACE)/default-http-backend
      --configmap=$(POD_NAMESPACE)/nginx-configuration
      --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services
      --udp-services-configmap=$(POD_NAMESPACE)/udp-services
      --annotations-prefix=nginx.ingress.kubernetes.io

可以看到监听的 configmap 名称格式为:$(POD_NAMESPACE)/tcp-services,所以想要暴露 kafka ,则可以创建 ConfigMap 如下:

apiVersion: v1
kind: ConfigMap
metadata:
  name: tcp-services
  namespace: ingress-nginx
data:
  9092: "kafka/my-kafka:9092"

完成之后 Ingress Controller 会自动更新所有节点上的 Ingress,然后就可以用 Ingress 来访问 kafka 服务了:

kubectl  exec -ti testclient -- /usr/bin/kafka-console-consumer --bootstrap-server [nodeip]:9092 --topic test1 --from-beginning

遇到的一些问题

有尝试过按照 README.md 页面描述的方法启用集群外部访问 kafka,遇到不少问题。

pod 无法成功创建

通过下面的命令看到错误信息:

kubectl -n kafka logs my-kafka-0 -c init-ext

User "system:serviceaccount:kafka:default" cannot get pods in the namespace

需要添加权限 role.yaml:

kind: Role
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  namespace: kafka
  name: add-label-role
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "watch", "list", "patch"]

---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: add-label
  namespace: kafka
subjects:
- kind: ServiceAccount
  name: default
  namespace: kafka
roleRef:
  kind: Role
  name: add-label-role
  apiGroup: rbac.authorization.k8s.io

参考

⚠️ **GitHub.com Fallback** ⚠️