kafka常用命令大全
•
大数据
目录
启动kafka服务
停止kafka服务
创建一个叫demo-topic的主题(topic),有两个分区,每个分区3个副本,同时指定该主题的消息保留时长(72小时)
高版本
列出指定主题(topic)的详细信息
查看所有的主题
高版本
查看所有主题的详细信息
删除一个主题
高版本
向kafka指定topic写入数据
高版本
命令行消费某个topic消息
查看某个topic对应的消息数量
kafka重置分组已经消费的偏移量offest
topic增加分区
指定topic创建消费者分组
查看消费组组所属topic的消费情况
显示所有消费者
获取正在消费的topic的group的offset
重设 consumer group的offset
指定offset与partition导出消息
修改topic的参数
测试生产者性能脚本
测试消费者性能脚本
启动kafka服务
bin/kafka-server-start.sh config/server.properties &
停止kafka服务
./kafka-server-stop.sh
创建一个叫demo-topic的主题(topic),有两个分区,每个分区3个副本,同时指定该主题的消息保留时长(72小时)
./kafka-topics.sh --zookeeper(host:port) --create --topic demo-topic --replication-factor 3 --partitions 2 --config retention.ms=259200000
高版本
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test --partitions 3 --config retention.ms=259200000
列出指定主题(topic)的详细信息
./kafka-topics.sh --zookeeper(host:port) --describe --topic demo-topic
查看所有的主题
./kafka-topics.sh --list --zookeeper(host:port) kafka-host(host:port)
高版本
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
查看所有主题的详细信息
./kafka-topics.sh --zookeeper(host:port) --describe
删除一个主题
./kafka-topics.sh --zookeeper(host:port) --topic demo-topic --delete
高版本
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic test --delete
向kafka指定topic写入数据
./kafka-console-producer.sh --broker-list kafka-host(host:port)--topic demo-topic
高版本
bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test
持续写入数据
bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test < data1.txt
命令行消费某个topic消息
#加了--from-beginning 从头消费所有消息 ./kafka-console-consumer.sh --bootstrap-server kafka-host(host:port) --topic demo-topic --from-beginning #不加--from-beginning 从最新的一条消息开始消费 ./kafka-console-consumer.sh --bootstrap-server kafka-host(host:port) --topic demo-topic bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
查看某个topic对应的消息数量
# time为-1时表示最大值,time为-2时表示最小值 --partitions num 指定分区 ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host(host:port) --topic demo-topic --time -1 实例: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic app-aaaa
kafka重置分组已经消费的偏移量offest
./kafka-consumer-groups.sh --bootstrap-server=kafka-host(host:port) --execute --reset-offsets --topic=demo-topic --group=testPlatform --to-earliest
topic增加分区
第一条好用
/opt/idss/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic ids-sens --partitions 4
./kafka-topics.sh --alter --zookeeper(host:port) --topic demo-topic --partitions 12
指定topic创建消费者分组
./kafka-console-consumer.sh --bootstrap-server=kafka-host(host:port) --topic demo-topic --consumer-property group.id=testPlatform
查看消费组组所属topic的消费情况
./kafka-consumer-groups.sh --bootstrap-server=kafka-host(host:port) --group=demo-group --describ
显示所有消费者
./kafka-consumer-groups.sh --bootstrap-serverkafka-host(host:port) --list
获取正在消费的topic的group的offset
./kafka-consumer-groups.sh --describe --group demo-group --bootstrap-serverkafka-host(host:port)
重设 consumer group的offset
确定topic作用域: --all-topics 为consumer group下所有topic的所有分区调整位移 --topic t1 --topic t2 为指定的若干个topic的所有分区调整位移 --topic t1:0,1,2 为指定的topic分区调整位移 确定位移重设策略 --to-current 把位移调整到分区当前位移. --to-datetime 把位移调整到大于给定时间的最早位移处.datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss' 2020-07-01T12:00:00.000 --to-earliest 把位移调整到分区当前最小位移 --to-latest 把位移调整到分区当前最新位移 --to-offset 把位移调整到指定位移处 --shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动 --by-duration :把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S --from-file :从CSV文件中读取调整策略 例:(--dry-run 不运行只查看结果,类似k8s里面;--execute执行) 按时间点重置(--to-datetime) ./kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --topic=test --group=testPlatform --execute --reset-offsets --to-datetime 2022-01-18T12:00:00.000 按offset重置 (--to-offset ) 重置消费组下指定topic ./kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --topic=test --group=testPlatform --execute --reset-offsets --to-offset 359905139 重置消费组下面所以topic ./kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --all-topics --group testPlatform --reset-offsets --to-latest --execute
指定offset与partition导出消息
./kafka-console-consumer.sh --bootstrap-server=kafka-host(host:port) --topic --topic=demo-topic --offset 825000 --partition 0 >> messages.log
修改topic的参数
kafka-configs.sh --zookeeper(host:port) --entity-type topics --entity-name demo-topic --alter --add-config max.message.bytes=1048576
测试生产者性能脚本
./kafka-producer-perf-test.sh --topic demo-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host(host:port) acks=-1 linger.ms=2000 compression.type=lz4
测试消费者性能脚本
./kafka-consumer-perf-test.sh --broker-list kafka-host(host:port) --messages 10000000 --topic demo-topic
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/7218e867b9.html
