欢迎光临
Kafka快速入门及使用
   

Kafka快速入门及使用

入门

官网

简介

  • Kafka是一个分布式的流媒体平台
  • 应用:
    • 消息系统
    • 日志收集
    • 用户行为追踪
    • 流式处理

      特点

      • 高吞吐量
      • 消息持久化
      • 高可靠性
      • 高扩展性

        常用术语

        • Broker:集群中的服务器
        • Zookeeper:服务管理
        • Topic:主题,Kafka发送消息为发布订阅模式,用来存放消息的空间为Topic
        • Partition:分区,对Topic进行分区,可以多线程读写Topic
        • Offset:消息在分区内存放的索引序列
        • Leader Replica:主副本,对数据做备份;提高容错率,响应读取
        • Follower Replica:随从副本,从主副本做备份;主副本出问题时,从 从副本中随机选取一个作为主副本

          安装

          下载地址:

          Kafka不分平台,虽然压缩包格式为tgz,但是解压后,一样可以在Windows运行。

          配置

          压缩包解压到不含中文的目录下后,在config包中进行配置;

          Kafka快速入门及使用,image,第1张

          配置Zookeeper.properties

          对Zookeeper相关的集群做配置;配置Zookeeper数组存放位置;如下所示:

          Kafka快速入门及使用,image,第2张

          配置server.properties

          主要配置Kafka日志文件存放位置;在配置文件中的第62行如下图所示:

          Kafka快速入门及使用,image,第3张

          运行kafka

          进入config同层级目录的bin目录中,因为是Windows系统,所以进入windows包中,执行命令。

          先启动Zookeeper

          在初级目录(解压后可以看到bin以及config的目录)下;执行如下命令启动Zookeeper:

          bin\windows\zookeeper-server-start.bat config\zookeeper.properties
          

          即使用config目录下的zookeeper配置文件,启动zookeeper;启动结果如下:

          Kafka快速入门及使用,image,第4张

          再启动Kafka

          进入kafka_2.13-3.2.3后;执行如下命令启动kafka;

          bin\windows\kafka-server-start.bat config\server.properties
          

          启动后结果如下:

          Kafka快速入门及使用,image,第5张

          启动成功

          启动成功后可以在之前,在配置文件中,配置的Zookeeper数据目录和Kafka日志保存目录中查看自动创建的文件;如下图所示:

          Kafka快速入门及使用,image,第6张

          使用Kafka

          进入包含命令的Windows命令包目录(kafka_2.13-3.2.3\bin\windows)下

          创建主题

          因为Kafka作为一个消息队列;采用的是发布订阅模式;需要将消息发布到某个主题下,首先需要创建主题;

          主题:

          • 代表一个位置
          • 代表一种消息的类别

            使用如下命令创建主题:

            kafka-topics.bat --create --创建主题的服务器 地址:端口 --创建副本 副本数 --分区 分区数 --topic 主题名
            

            执行成功案例如下图所示:

            Kafka快速入门及使用,image,第7张

            运行如下命令查看主题:

            kafka-topics.bat --list --指定主题所在服务器 地址:端口
            

            执行示例如下:

            Kafka快速入门及使用,image,第8张

            发送消息

            主题创建成功后,需要再往主题上发送消息;发送消息是以生产者模式身份发送;执行如下命令:

            kafka-console-producer.bat --服务器列表 服务器地址:端口 --topic 主题名
            

            执行上述命令后,即可输入需要发布的消息;执行示例如下:

            Kafka快速入门及使用,image,第9张

            接收消息

            在新的命令行窗口,重新进入目录(kafka_2.13-3.2.3\bin\windows)下;以消费者身份接收消息;执行命令如下:

            kafka-console-consumer.bat --指定服务器 读取消息的服务器地址:端口 --topic 读取消息的主题名 --从头开始读取消息
            

            执行示例如下所示:

            Kafka快速入门及使用,image,第10张

            Spring整合Kafka

            引入依赖

            在项目pom文件中引入以下依赖:

            
            
                org.springframework.kafka
                spring-kafka
            
            

            配置Kafka

            在application.properties文件中,进行如下配置:

            # 配置服务器列表
            spring.kafka.bootstrap-servers=localhost:9092
            # 消费者分组id   在Kafka的消费者配置文件中有
            # 可在配置文件中 更改分组id 更改后需要重新启动kafka
            spring.kafka.consumer.group-id=test-consumer-group
            # 是否自动提交 消费者的偏移量
            spring.kafka.consumer.enable-auto-commit=true
            # 自动提交频率 此处配置3000ms
            spring.kafka.consumer.auto-commit-interval=3000
            

            访问Kafka

            主要是通过生产者发送消息;消费者监听消息,测试在Spring中使用Kafka代码如下:

            /**
             * @author 花木凋零成兰
             * @date 2024/3/23 23:41
             */
            @SpringBootTest
            @ContextConfiguration(classes = Application.class)		// 使用Application类的配置
            public class KafkaTests {
                @Autowired
                private KafkaProducer kafkaProducer;
                @Test
                public void testKafka() {
                    // 发送消息
                    kafkaProducer.send("test", "你好");
                    kafkaProducer.send("test", "在干嘛");
                    try {
                        Thread.sleep(1000 * 20);    // 阻塞主线程 用户观察消费者是否接收到消息
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            /**
             * 生产者
             */
            @Component
            class KafkaProducer {
                @Autowired
                private KafkaTemplate kafkaTemplate;
                /**
                 * 发送消息方法
                 * @param topic 主题
                 * @param content 发送消息的内容
                 */
                public void send(String topic, String content) {
                    kafkaTemplate.send(topic, content);
                }
            }
            /**
             * 消费者
             */
            @Component
            class KafkaConsumer {
                /**
                 * 监听主题发送的消息
                 * @param record    消息自动封装为ConsumerRecord
                 */
                @KafkaListener(topics = {"test"})   // 需要监听的主题
                public void handleMessage(ConsumerRecord record) {
                    System.out.println(record.value()); // 读取消息
                }
            }
            

            运行测试后,成功结果如下所示,可观察到消费者读取到生产者发送的消息

            Kafka快速入门及使用,image,第11张

             
打赏
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《Kafka快速入门及使用》
文章链接:https://goodmancom.com/wl/175957.html