linux 消息发布工具-kafka
下载地址:https://mirrors.cnnic.cn/apache/kafka
一、系统构成
在整个消息发布流程中,kafka作为一个中间件对系统的运行提供了解耦、削峰、异步处理的能力。
由生产者发起信息同步条件,中间件作为信息转储的角色对news进行发布。消费者会通过offset来保证接收最新的消息
主要由生产者、消费者、中间件构成。中间件由
|
producer |
生产者 |
|
|
broker |
kafka实现实例 |
其可实现主要功能部分 |
|
topic |
每个kafka实例内部的消息类型 |
每个实例内部可存在多个topic |
|
partition |
消息类型分组,每组的数据不同。 |
每个topic内存在多个partition |
|
message |
每条发送的消息主体 |
|
|
consumer |
消费者 |
|
|
consumer_group |
消费者组 |
组形式的消费者,一个可以获取多个信息 |
|
zookeeper |
保存集群内部信息,保证系统可用性 |
配置文件,系统启动 |
二、kafka应用环境部署
2.1、环境搭建
- 下载应用包

- 下载后的文件放在linux系统的

- 创建一个日志文件,用来保存日志信息

- 修改config文件,添加日志路径,侦听端口信息

对下列信息改动
设置broker的数量
提供生产者和消费者的服务地址
zk服务器地址
- 启动zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties &
- 启动kafka服务
./bin/kafka-server-start.sh config/server.properties &
- 创建一个topic
./bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic1
–zookeeper: kafka连接zookeeper的url,和server.properties文件中的配置项 zookeeper.connect=localhost:2181 一致
- 查看topic
./bin/kafka-topics.sh –list –zookeeper localhost:2181
- 查看topic的详细信息
./bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic topic1

- 生产消息
/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic topic1
这里的 –broker-list localhost:9092
- 消费消息
./bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic topic1 –from-beginning
hello,world
this is my kafka

2.2、编写生产者-消费者示例代码
源码编译gcc语句
gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt
gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt
生产者流程
-
创建conf配置对象
rd_kafka_conf_new
配置broke集群
rd_kafka_conf_set:localhost:9092
配置消息发送回调函数
rd_kafka_conf_set_dr_msg_cb
创建一个producer实例
rd_kafka_new+使用上述conf
创建一个topic
rd_kafka_topic_new
发送消息
rd_kafka_produce
消息发送成功回调
rd_kafka_poll
释放
|
等待队列处理完毕 |
rd_kafka_flush |
|
释放topic配置 |
rd_kafka_topic_destroy |
|
释放product配置 |
rd_kafka_destroy |
消费者流程
初始化
|
创建kafka配置 |
rd_kafka_conf_new |
|
|
设置信号处理 |
rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size); |
conf:配置结构 vame:配置项名称 value:配置项值 errstr:错误提示 errstr_size:错误提示长度 返回值:rd_kafka_conf_res_t 枚举,错误写入errstr中 |
|
创建topic配置 |
rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void) |
参数:无 返回值:rd_kafka_topic_conf_t * 创建一个主题配置结构,并进行默认初始化设置,返回其引用指针。 |
|
创建kafka实例 |
rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf, char *errstr, size_t errstr_size) |
参数: Type:RD_KAFKA_PRODUCER是创建生产者类型,RD_KAFKA_CONSUMER是创建消费者类型 Conf:配置结构 Errstr:错误提示 errstr_size:错误提示长度 返回值: 成功:返回rd_kafka_t *kafka操作句柄 失败:返回NULL,并记录错误信息到errstr 程序中先配置conf和topic_conf,然后调用此接口生成操作句柄。对消费者来讲,订阅主题,轮询接收消息。对生产者来讲,根据主题生成主题操作句柄,并通过主题操作句柄发送消息。 |
|
添加服务器 |
int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist) |
参数: Rk:kafka操作句柄 Brokerlist:broker字符串 如:”172.20.51.38:9092” 不写端口,则采用默认端口9092 多个broker brokerlist = “broker1:10000,broker2” 返回值:成功添加的broker个数 添加一个broker也可以通过 设置rd_kafka_conf_t结构中的 “bootstrap.servers” 配置项 rd_kafka_conf_set(conf, “bootstrap.servers”, brokers, errstr, sizeof(errstr)) |
|
消息重定向 |
rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) |
参数: Rk:kafka操作句柄 返回值:rd_kafka_resp_err_t 枚举 将消息重定向到了消费者队列,可以使用rd_kafka_consumer_poll()进行取消息。 |
|
创建一个Topic+Partition的存储空间 |
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) |
rd_kafka_topic_partition_list_new()创建,创建时指定长度,通过rd_kafka_topic_partition_list_add()添加 主题-分区对,用于订阅消息。 |
|
开启consumer订阅,匹配的topic将被添加到订阅列表中 |
rd_kafka_topic_partition_list_add rd_kafka_subscribe |
|
|
读取队列消息 |
rd_kafka_consumer_poll |
参数一:kafka示例句柄 参数二:等待时间 |
|
获取topic name |
rd_kafka_topic_name |
参数一:topic handle 句柄 |
释放
|
待队列消息处理完 |
rd_kafka_consumer_close |
kafka句柄 |
|
释放队列资源 |
rd_kafka_topic_partition_list_destroy |
释放队列,类型为rd_kafka_topic_partition_list_t |
|
释放kafka实例 |
rd_kafka_destroy |
kafka句柄 |
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/9e3d63c3e9.html
