Kubernetes 部署 Kafka 集群

docker-compose 部署 kafka

  • 镜像地址

    kafka官网

    kafka镜像

    zookeeper镜像

  • Kafka 4.0 将移除zookeeper,仅支持KRaft

    所以我们使用KRaft模式,这也是kafka:3.4的默认模式.

  • 由于这是一个非 root 的容器,挂载的文件和目录必须具有 UID 1001 的适当权限

    sudo chown -R 1001:1001 ./kafka_data

  • 创建kafka容器 docker compose -f docker-compose.yml up -d
version: "3"services:  kafka:    image: bitnami/kafka:3.4.1    ports:      - "9092:9092"    volumes:      - "./kafka_data:/bitnami"    environment:      # KRaft settings      - KAFKA_CFG_NODE_ID=0      - KAFKA_CFG_PROCESS_ROLES=controller,broker      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093      # Listeners      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  • 集群方式可参考链接

在 Kubernetes 上部署 Kafka 集群

  1. yaml示例使用的是KRaft模式
  2. 创建无头Service kafka-headless,用于kafka间相互通信
  3. 创建Service kafka, 用于外部访问kafka
  4. 启动脚本写入ConfigMap, 只是在默认的启动脚本前增加了环境变量KAFKA_CFG_NODE_ID赋值:
    1. 集群中每个副本都需要设置KAFKA_CFG_NODE_ID,且必须为整数
    2. StatefulSet中将副本名称metadata.name赋值给环境变量MY_POD_NAME参考文档。(也可以直接使用环境变量HOSTNAME,副本名称是默认的主机名)
    3. 截取环境变量MY_POD_NAME最后的序号,赋值给环境变量KAFKA_CFG_NODE_ID,比如:MY_POD_NAME=kafka-0,那么 KAFKA_CFG_NODE_ID=0
    4. 环境变量KAFKA_CFG_CONTROLLER_QUORUM_VOTERS也可以在此脚本中自动生成,可参考。yaml示例中直接设置成了3个。
  5. 非root容器需要设置securityContext
  6. 使用模板volumeClaimTemplates动态创建存储,每个副本挂载单独的存储。例子里使用的是华为云现有的storageClassName: csi-disk,如果没有声明StorageClass,可以参考文档提前创建。
apiVersion: v1kind: Servicemetadata:  name: kafka-headless  labels:    app: kafkaspec:  type: ClusterIP  clusterIP: None  ports:  - name: kafka-client    port: 9092    targetPort: kafka-client  - name: controller    port: 9093    targetPort: controller     selector:    app: kafka---#部署 Service,用于外部访问 KafkaapiVersion: v1kind: Servicemetadata:  name: kafka  labels:    app: kafkaspec:  type: ClusterIP  ports:  - name: kafka-client    port: 9092    targetPort: kafka-client  selector:    app: kafka---# 分别在 StatefulSet 中的每个 Pod 中获取相应的序号作为 KAFKA_CFG_NODE_ID(只能是整数),然后再执行启动脚本apiVersion: v1kind: ConfigMapmetadata:  name: ldc-kafka-scriptsdata:  setup.sh: |-    #!/bin/bash    export KAFKA_CFG_NODE_ID=${MY_POD_NAME##*-}     exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh---apiVersion: apps/v1kind: StatefulSetmetadata:  name: kafka  labels:    app: kafkaspec:  selector:    matchLabels:      app: kafka  serviceName: kafka-headless  podManagementPolicy: Parallel  replicas: 3 # 部署完成后,将会创建 3 个 Kafka 副本  updateStrategy:    type: RollingUpdate  template:    metadata:      labels:        app: kafka    spec:      affinity:        podAntiAffinity: # 工作负载反亲和          preferredDuringSchedulingIgnoredDuringExecution: # 尽量满足如下条件          - weight: 1            podAffinityTerm:              labelSelector: # 选择Pod的标签,与工作负载本身反亲和                matchExpressions:                  - key: "app"                    operator: In                    values:                      - kafka              topologyKey: "kubernetes.io/hostname"  # 在节点上起作用          containers:      - name: kafka        image: bitnami/kafka:3.4.1        imagePullPolicy: "IfNotPresent"        command:        - /opt/leaderchain/setup.sh        env:        - name: BITNAMI_DEBUG          value: "true" # true 详细日志        # KRaft settings         - name: MY_POD_NAME # 用于生成 KAFKA_CFG_NODE_ID          valueFrom:            fieldRef:              fieldPath: metadata.name                    - name: KAFKA_CFG_PROCESS_ROLES          value: "controller,broker"        - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS          value: "0@kafka-0.kafka-headless:9093,1@kafka-1.kafka-headless:9093,2@kafka-2.kafka-headless:9093"        - name: KAFKA_KRAFT_CLUSTER_ID          value: "Jc7hwCMorEyPprSI1Iw4sW"          # Listeners                    - name: KAFKA_CFG_LISTENERS          value: "PLAINTEXT://:9092,CONTROLLER://:9093"        - name: KAFKA_CFG_ADVERTISED_LISTENERS          value: "PLAINTEXT://:9092"        - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP          value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"        - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES          value: "CONTROLLER"        - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME          value: "PLAINTEXT"          ports:        - containerPort: 9092          name: kafka-client                          - containerPort: 9093          name: controller          protocol: TCP                             volumeMounts:        - mountPath: /bitnami/kafka          name: data        - mountPath: /opt/leaderchain/setup.sh          name: scripts          subPath: setup.sh          readOnly: true            securityContext:        fsGroup: 1001        runAsUser: 1001      volumes:          - configMap:          defaultMode: 493          name: ldc-kafka-scripts        name: scripts                     volumeClaimTemplates:  - apiVersion: v1    kind: PersistentVolumeClaim    metadata:      name: data      annotations:        everest.io/disk-volume-type: SAS      labels:        failure-domain.beta.kubernetes.io/region: cn-south-1        failure-domain.beta.kubernetes.io/zone: cn-south-2b          spec:      accessModes: [ "ReadWriteOnce" ]       storageClassName: csi-disk      resources:        requests:          storage: 10Gi

主题Topic

  1. 查看帮助(容器中kafka的脚本目录为:/opt/bitnami/kafka/bin)

    sh kafka-topics.sh –help

  2. 获取所有的主题

    sh kafka-topics.sh –bootstrap-server localhost:9092 –list

  3. 创建一个Topic

    –partitions(分区数量)

    –topic(主题名)

    –replication-factor(副本数量,不能大于broker的数量)

    sh kafka-topics.sh –create –topic myTopic –replication-factor 1 –partitions 1 –bootstrap-server localhost:9092

  4. 查询 Topic 的详细信息

    sh kafka-topics.sh –describe –bootstrap-server localhost:9092 –topic myTopic

  5. 删除 Topic (Topic 中所有的消息数据都将被永久删除,且无法恢复)

    sh kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic myTopic

  6. 增加主题分区数量 (如果要减少分区的数量,只能删除Topic,然后重新创建)

    sh kafka-topics.sh –bootstrap-server localhost:9092 –topic myTopic –alter –partitions 3

  7. 修改数据过期时间 (kafka默认的只保存7天的数据,retention.ms=-1表示不过期)

    sh kafka-topics.sh –bootstrap-server localhost:9092 -topic myTopic –alter –config retention.ms=259200000

  8. 修改多字段

    sh kafka-topics.sh –bootstrap-server localhost:9092 -topic myTopic –alter –config retention.ms=259200000 max.message.bytes=128000

  9. 修改 Topic 副本数
    1. 编写分配脚本get-reassign-tpl.json

      echo ‘{“topics”:[{“topic”:”myTopic”}],”version”: 1}’ > get-reassign-tpl.json

    2. 执行分配计划,用于生成json格式的文件

      sh kafka-reassign-partitions.sh –bootstrap-server localhost:9092 –topics-to-move-json-file get-reassign-tpl.json –broker-list “0,1,2” –generate

    3. 复制上一步返回结果中的json,修改副本字段replicas,填写broker.id,生成reassign.json文件

      echo ‘{“version”:1,”partitions”:[{“topic”:”myTopic”,”partition”:0,”replicas”:[1,2]}]}’ > reassign.json

    4. 利用上一步生成的reassign.json,进行topic的重新分配

      sh kafka-reassign-partitions.sh –bootstrap-server localhost:9092 –reassignment-json-file reassign.json –execute

    5. 查看分配的进度

      sh kafka-reassign-partitions.sh –bootstrap-server localhost:9092 –reassignment-json-file reassign.json –verify

    6. 分配完成,再次查询详情

      sh kafka-topics.sh –describe –bootstrap-server localhost:9092 –topic myTopic

测试发送消息和接收消息

  1. 开启一个 Producer(生产者)窗口,然后生产几条信息
sh kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic>hello>world
  1. 创建一个 Consumer(消费者)窗口:

    sh kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic myTopic –consumer-property group.id=myGroup –from-beginning

    –from-beginning 如果消费者尚未建立消费偏移量(offset),那么就从Topic的第一条消息开始消费

    –consumer-property group.id=myGroup 消费者的group.id,不设置会自动生成

    如果存在group.id相同的多个消费者窗口,只会有其中一个消费者收到消息

  2. 列出所有主题中的所有用户组

    sh kafka-consumer-groups.sh –bootstrap-server localhost:9092 –list

  3. 查询消费者组详情(数据积压情况)

    sh kafka-consumer-groups.sh –bootstrap-server localhost:9092 –describe –group myGroup

    LogEndOffset:下一条将要被加入到日志的消息的位移

    CurrentOffset:当前消费的位移

    LAG:消息堆积量:消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量也称之为消费滞后量

  4. 更多操作,可以参考官方文档

Open-Source Web UI for Apache Kafka

KRaft模式的kafka没有zookeeper,图形客户端工具offsetexplorer无法连接,找到一套开源的 Web UI,Docker、Helm下的安装方式可参考官方文档。

使用kubectl apply命令安装provectuslabs/kafka-ui:

  1. kubectl apply -f k8s.kafka-ui.yaml
  2. k8s.kafka-ui.yaml示例如下:
apiVersion: v1
kind: Service
metadata:
  name: kafka-ui
  labels:
    app: kafka-ui
spec:
  type: NodePort
  ports:
  - name: web
    port: 8080
    targetPort: web
    nodePort: 0
  selector:
    app: kafka-ui
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-ui
  labels:
    app: kafka-ui
spec:
  selector:
    matchLabels:
      app: kafka-ui
  replicas: 1
  template:
    metadata:
      labels:
        app: kafka-ui
    spec:
      containers:
      - name: kafka-ui
        image: provectuslabs/kafka-ui:latest
        # imagePullPolicy: "IfNotPresent"
        env:
        - name: KAFKA_CLUSTERS_0_NAME
          value: "kafka-c0"
        - name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
          value: "kafka-0.kafka-headless:9092"              
        - name: DYNAMIC_CONFIG_ENABLED
          value: "true"
        - name: AUTH_TYPE # https://docs.kafka-ui.provectus.io/configuration/authentication/basic-authentication
          value: "LOGIN_FORM"
        - name: SPRING_SECURITY_USER_NAME
          value: "name_admin"    
        - name: SPRING_SECURITY_USER_PASSWORD
          value: "password_123456"                                   
        ports:
        - name: web
          containerPort: 8080

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/a35925abf2.html