欢迎光临
Linux 安装 kafka
   

Linux 安装 kafka

文章目录

  • 前言
  • 一、Kafka简介
            • kafka核心概念
            • 二、安装Kafka
              • 1.准备工作
                      • 1.1 Java
                      • 1.2 安装包下载
                      • 2.安装KafKa
                            • 1. 解压安装包
                            • 2. 配置kafka
                            • 3 进入配置文件目录
                            • 4 修改配置文件server.properties,添加下面内容
                            • 5 配置zookeeper服务 zookeeper.properties
                            • 6 创建启动和关闭的 kafka 执行脚本
                              • 6.1 创建启动脚本
                              • 6.2 创建关闭脚本 kafkaStop.sh
                              • 7 启动脚本,关闭脚本赋予权限
                              • 8 创建生产者 topic 和 消费者 topic
                              • 9 Spring boot集成Kafka

                                前言

                                例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。


                                提示:以下是本篇文章正文内容,下面案例可供参考

                                一、Kafka简介

                                Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

                                kafka核心概念

                                在深入了解 Kafka 的使用教程之前,让我们先介绍一些 Kafka 的核心概念,这些概念是理解 Kafka 的基础:

                                Broker: Kafka 集群中的每个服务器节点称为 Broker,它们负责存储和处理数据。

                                Topic: 消息发布的主题,是数据流的类别。生产者将消息发布到主题,消费者从主题中订阅消息。

                                Partition: 每个 Topic 可以分成多个 Partition,每个 Partition 是一个有序的消息队列。分区允许数据水平分布和并行处理。

                                Producer: 数据的发布者,将消息发送到一个或多个 Topic。

                                Consumer: 数据的订阅者,从一个或多个 Topic 中消费消息。

                                Consumer Group: 一组消费者的集合,共同消费一个 Topic 的消息。每个分区只能由一个消费者组中的一个消费者消费。

                                Offset: 每个消息在 Partition 中的唯一标识,消费者使用 Offset 来追踪已消费的消息。

                                二、安装Kafka

                                1.准备工作

                                1.1 Java

                                Kafka是依赖Java环境运行,所以需要在Linux系统内安装Java环境。

                                1.2 安装包下载

                                官方下载地址:http://kafka.apache.org/downloads.html

                                Linux 安装 kafka,在这里插入图片描述,第1张

                                我这里下载的是:kafka_2.12-3.6.0.tgz

                                # 在线下载安装包
                                wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.12-3.6.0.tgz
                                

                                2.安装KafKa

                                将安装包传送到服务器并解压,这里我放到opt下面

                                1. 解压安装包
                                cd  opt
                                tar -zxvf kafka_2.12-3.6.0.tgz
                                
                                2. 配置kafka

                                在kafka解压目录同一路径下创建

                                mkdir -p /opt/software/kafka
                                mkdir -p /opt/software/kafka/zookeeper  #zookeeper数据目录
                                mkdir -p /opt/software/kafka/log        #kafka日志
                                mkdir -p /opt/software/kafka/zookeeper/log  #zookeeper日志
                                
                                3 进入配置文件目录
                                cd /opt/kafka_2.12-3.6.0/config/
                                
                                4 修改配置文件server.properties,添加下面内容
                                broker.id=0
                                port=9092 #端口号
                                host.name=localhost #服务器IP地址,修改为自己的服务器IP
                                log.dirs=/opt/software/kafka/log  #日志存放路径,上面创建的目录
                                zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181
                                
                                5 配置zookeeper服务 zookeeper.properties
                                dataDir=/opt/software/kafka/zookeeper   #zookeeper数据目录
                                dataLogDir=/opt/software/kafka/zookeeper/log #zookeeper日志目录
                                clientPort=2181
                                maxClientCnxns=100
                                tickTimes=2000
                                initLimit=10
                                syncLimit=5
                                
                                6 创建启动和关闭的 kafka 执行脚本
                                6.1 创建启动脚本
                                cd /opt/kafka_2.12-3.6.0/
                                vi kafkaStart.sh
                                

                                配置启动脚本 kafkaStart.sh

                                #启动zookeeper
                                /opt/kafka_2.12-3.6.0/bin/zookeeper-server-start.sh /opt/kafka_2.12-3.6.0/config/zookeeper.properties &
                                 
                                sleep 3 #等3秒后执行
                                 
                                #启动kafka
                                /opt/kafka_2.12-3.6.0/bin/kafka-server-start.sh /opt/kafka_2.12-3.6.0/config/server.properties &
                                
                                6.2 创建关闭脚本 kafkaStop.sh
                                cd /opt/kafka_2.12-3.6.0/
                                vi kafkaStop.sh
                                

                                配置关闭脚本 kafkaStop.sh

                                #关闭zookeeper
                                /opt/kafka_2.12-3.6.0/bin/zookeeper-server-stop.sh /opt/kafka_2.12-3.6.0/config/zookeeper.properties &
                                 
                                sleep 3 #等3秒后执行
                                 
                                #关闭kafka
                                /opt/kafka_2.12-3.6.0/bin/kafka-server-stop.sh /opt/kafka_2.12-3.6.0/config/server.properties &
                                
                                7 启动脚本,关闭脚本赋予权限
                                chmod 777 kafkaStart.sh
                                chmod 777 kafkaStop.sh
                                

                                启动和关闭kafka

                                cd /opt/kafka_2.12-3.6.0/ 
                                sh kafkaStart.sh #启动
                                sh kafkaStop.sh  #关闭
                                
                                8 创建生产者 topic 和 消费者 topic

                                cd /opt/kafka_2.12-3.6.0/bin/   #进入kafka目录
                                ./kafka-console-producer.sh --broker-list localhost:9092 --topic test  #创建生产者 test你要建立的topic名
                                ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test #创建消费者
                                

                                ​ 查看 kafka 是否启动

                                [root@localhost kafka_2.12-3.6.0]# jps
                                21324 QuorumPeerMain 
                                15211 Jps
                                21215 Kafka
                                

                                里面有QuorumPeerMain和kafkas说明启动成功了

                                查看当前的一些topic

                                cd /opt/kafka_2.12-3.6.0/bin/
                                 
                                ./kafka-topics.sh --zookeeper localhost:2181 --list
                                ./kafka-topics.sh --list --bootstrap-server localhost:9092
                                
                                9 Spring boot集成Kafka

                                1、pom依赖

                                
                                    org.springframework.kafka
                                    spring-kafka
                                
                                
                                   org.apache.kafka
                                   kafka-clients
                                   2.4.1
                                
                                

                                2.消费者

                                @Autowired
                                private KafkaTemplate kafkaTemplate;
                                @RequestMapping("/userGets")
                                public Object gets() {
                                    // send 第一个参数为topic的名称,第二个参数为我们要发送的信息
                                    kafkaTemplate.send("topic.quick.default","1231235");
                                    return "发送成功";
                                }
                                @KafkaListener(topics = {"topic1"})
                                public void onMessage(ConsumerRecord record) {
                                    System.out.println(record.value());
                                }
                                @KafkaListener(topics = {"topic2"})
                                public void getMessage(ConsumerRecord record) {
                                    String key = record.key();
                                    String value = record.value();
                                }
                                
                                1. 测试
                                 //生产者
                                 public static void main(String[] args) {
                                        Properties properties = new Properties();
                                        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                                        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                                        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                                        KafkaProducer producer = new KafkaProducer<>(properties);
                                        String topic = "test-topic";
                                        for (int i = 0; i < 10; i++) {
                                            String message = "Message " + i;
                                            producer.send(new ProducerRecord<>(topic, message));
                                            System.out.println("Sent: " + message);
                                        }
                                        producer.close();
                                    }
                                    //消费者
                                    public static void main(String[] args) {
                                        Properties properties = new Properties();
                                        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                                        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
                                        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                                        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                                        KafkaConsumer consumer = new KafkaConsumer<>(properties);
                                        //消息者订阅主题
                                        consumer.subscribe(Collections.singletonList("test-topic"));
                                        //循环
                                        while (true) {
                                            //每次拉取 1千条消息
                                            ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
                                            for (ConsumerRecord record : records) {
                                                System.out.println("=============> 消费kafka消息:"+ record.value());
                                            }
                                        }
                                    }
                                

                                4.配置文件

                                server:
                                    port: 8080
                                spring:
                                    kafka:
                                        bootstrap-servers: 172.16.253.21: 9093
                                        producer: # 生产者
                                            retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送
                                            batch-size: 16384
                                            buffer-memory: 33554432
                                            acks: 1
                                            # 指定消息key和消息体的编解码方式
                                            key-serializer: org.apache.kafka.common.serialization.StringSerializer
                                            value-serializer: org.apache.kafka.common.serialization.StringSerializer
                                        consumer:
                                            group-id: default-group
                                            enable-auto-commit: false
                                            auto-offset-reset: earliest
                                            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
                                            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
                                            max-poll-records: 500
                                        listener:
                                        # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
                                        # RECORD
                                        # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
                                        # BATCH
                                        # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
                                        # TIME
                                        # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
                                        # COUNT
                                        # TIME | COUNT 有一个条件满足时提交
                                        # COUNT_TIME
                                        # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
                                        # MANUAL
                                        # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
                                        # MANUAL_IMMEDIATE
                                            ack-mode: MANUAL_IMMEDIATE
                                

                                可视化工具地址

                                https://www.kafkatool.com/download.html

                                 
打赏
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《Linux 安装 kafka》
文章链接:https://goodmancom.com/wl/175756.html