微服务集成Windows版kafka
微服务集成Windows版kafka
文章目录
- 微服务集成Windows版kafka
-
- 1-兼容
- 2-雷点
- 3-安装
- 4-配置
- 5-启动
- 6-实现
1-兼容
Kafka 和 Spring Boot
兼容版本:https://spring.io/projects/spring-kafka/

2-雷点
依赖版本需要匹配Spring Boot版本,这里使用的 3.1.5 版本
org.apache.kafka
kafka_2.13
3.6.0
mvnrepository:https://mvnrepository.com/
一个 Maven 仓库的在线查找工具,用于查找和浏览 Java 开发中使用的依赖库(dependencies)的信息
3-安装
-
Zookeeper
Apache ZooKeeper 项目的存档目录:https://archive.apache.org/dist/zookeeper/
在这个目录下,可以找到 Apache ZooKeeper 发布的历史版本以及与这些版本相关的二进制文件、源代码和其他相关文档。
Kafka 依赖于 Zookeeper,所以首先需要启动 Zookeeper 服务器,这里使用的 apache-zookeeper-3.5.5-bin 版本
-
kafka
Apache Kafka 官方网站下载:https://kafka.apache.org/downloads
这里使用的 kafka_2.12-3.5.1 版本
4-配置
-
环境配置(可选操作)
可以将 Kafka 的 bin 目录添加到系统的 PATH 环境变量中,方便可以在任何地方运行 Kafka 相关的命令。
-
apache-zookeeper-3.5.5-bin\conf\zoo.cfg
# ZooKeeper 基本时间单元,用于计算时间的基本单位(毫秒) tickTime=2000 # 存储 ZooKeeper 数据的目录 dataDir=D:/myApp/zookeeper/apache-zookeeper-3.5.5-bin/data # 用于接受客户端连接的端口号 clientPort=2181 # ZooKeeper AdminServer 的端口号(默认端口8080) admin.serverPort=8081
5-启动
-
Zookeeper:bin目录
zkServer.cmd
-
Kafka:kafka_2.12-3.5.1目录
.\bin\windows\kafka-server-start.bat .\config\server.properties
6-实现
- 生产者
package com.xueyi.sample.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 配置Kafka生产者
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者
Producer producer = new KafkaProducer(properties);
// 发送消息
ProducerRecord record = new ProducerRecord("your_topic", "key", "Hello , Kafka!");
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Message sent successfully! Topic: " + metadata.topic() +
", Partition: " + metadata.partition() + ", Offset: " + metadata.offset());
} else {
exception.printStackTrace();
}
});
// 关闭生产者
producer.close();
}
}
-
消费者
package com.xueyi.sample.kafka.consumer; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // 配置Kafka消费者 Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("group.id", "your_group_id"); properties.put("key.deserializer", StringDeserializer.class.getName()); properties.put("value.deserializer", StringDeserializer.class.getName()); // 创建Kafka消费者 Consumer consumer = new KafkaConsumer(properties); // 订阅主题 consumer.subscribe(Collections.singletonList("your_topic")); // 拉取消息 while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.println("Received message: Key = " + record.key() + ", Value = " + record.value() + ", Topic = " + record.topic() + ", Partition = " + record.partition() + ", Offset = " + record.offset()); }); } } } -
发送消息
Message sent successfully! Topic: your_topic, Partition: 0, Offset: 7

-
接收消息
Received message: Key = key, Value = Hello, Kafka!, Topic = your_topic, Partition = 0, Offset = 7

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/21c4797bf1.html
