欢迎光临
Flink消费kafka消息实战(1)
   

Flink消费kafka消息实战(1)

}

}

  1. SingleMessage对象的定义:

public class SingleMessage {

private long timeLong;

private String name;

private String bizID;

private String time;

private String message;

public long getTimeLong() {

return timeLong;

}

public void setTimeLong(long timeLong) {

this.timeLong = timeLong;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public String getBizID() {

return bizID;

}

public void setBizID(String bizID) {

this.bizID = bizID;

}

public String getTime() {

return time;

}

public void setTime(String time) {

this.time = time;

}

public String getMessage() {

return message;

}

public void setMessage(String message) {

this.message = message;

}

}

  1. 实时处理的操作都集中在StreamingJob类,源码的关键位置已经加了注释,就不再赘述了:

/*

  • Licensed to the Apache Software Foundation (ASF) under one

  • or more contributor license agreements. See the NOTICE file

  • distributed with this work for additional information

  • regarding copyright ownership. The ASF licenses this file

  • to you under the Apache License, Version 2.0 (the

  • “License”); you may not use this file except in compliance

  • with the License. You may obtain a copy of the License at

  • http://www.apache.org/licenses/LICENSE-2.0
    
  • Unless required by applicable law or agreed to in writing, software

  • distributed under the License is distributed on an “AS IS” BASIS,

  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

  • See the License for the specific language governing permissions and

  • limitations under the License.

    */

    package com.bolingcavalry;

    import org.apache.flink.api.common.functions.FlatMapFunction;

    import org.apache.flink.api.common.serialization.SimpleStringSchema;

    import org.apache.flink.api.java.tuple.Tuple;

    import org.apache.flink.api.java.tuple.Tuple2;

    import org.apache.flink.streaming.api.TimeCharacteristic;

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

    import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;

    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;

    import org.apache.flink.streaming.api.watermark.Watermark;

    import org.apache.flink.streaming.api.windowing.time.Time;

    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

    import javax.annotation.Nullable;

    import java.util.Properties;

    /**

    • Skeleton for a Flink Streaming Job.

    • For a tutorial how to write a Flink streaming application, check the

    • tutorials and examples on the Flink Website.

    • To package your application into a JAR file for execution, run

    • ‘mvn clean package’ on the command line.

    • If you change the name of the main class (with the public static void main(String[] args))

    • method, change the respective entry in the POM.xml file (simply search for ‘mainClass’).

      */

      public class StreamingJob {

      public static void main(String[] args) throws Exception {

      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      env.enableCheckpointing(5000); // 要设置启动检查点

      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

      Properties props = new Properties();

      props.setProperty(“bootstrap.servers”, “kafka1:9092”);

      props.setProperty(“group.id”, “flink-group”);

      //数据源配置,是一个kafka消息的消费者

      FlinkKafkaConsumer011 consumer =

      new FlinkKafkaConsumer011<>(“topic001”, new SimpleStringSchema(), props);

      //增加时间水位设置类

      consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks (){

      @Override

      public long extractTimestamp(String element, long previousElementTimestamp) {

      return JSONHelper.getTimeLongFromRawMessage(element);

      }

      @Nullable

      @Override

      public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {

      if (lastElement != null) {

      return new Watermark(JSONHelper.getTimeLongFromRawMessage(lastElement));

      }

      return null;

      }

      });

      env.addSource(consumer)

      //将原始消息转成Tuple2对象,保留用户名称和访问次数(每个消息访问次数为1)

      .flatMap((FlatMapFunction>) (s, collector) -> {

      SingleMessage singleMessage = JSONHelper.parse(s);

      if (null != singleMessage) {

      collector.collect(new Tuple2<>(singleMessage.getName(), 1L));

      }

      })

      //以用户名为key

      .keyBy(0)

      //时间窗口为2秒

      .timeWindow(Time.seconds(2))

      //将每个用户访问次数累加起来

      .apply((WindowFunction, Tuple2, Tuple, TimeWindow>) (tuple, window, input, out) -> {

      long sum = 0L;

      for (Tuple2 record: input) {

      sum += record.f1;

      }

      Tuple2 result = input.iterator().next();

      result.f1 = sum;

      out.collect(result);

      })

      //输出方式是STDOUT

      .print();

      env.execute(“Flink-Kafka demo”);

      }

      }

      1. 在pom.xml所在文件夹执行以下命令打包:

      mvn clean package -Dmaven.test.skip=true -U

      1. 打包成功后,会在target目录下生成文件flinkkafkademo-1.0-SNAPSHOT.jar,将此文件提交到Flinkserver上,如下图:

      Flink消费kafka消息实战(1),在这里插入图片描述,第1张

      先自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

      深知大多数初中级Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则近万的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

      因此收集整理了一份《Java开发全套学习资料》送给大家,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

      Flink消费kafka消息实战(1),img,第2张

      Flink消费kafka消息实战(1),img,第3张

      Flink消费kafka消息实战(1),img,第4张

      由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频

      如果你觉得这些内容对你有帮助,可以扫码领取!

      Flink消费kafka消息实战(1),img,第5张

      分享

      1、算法大厂——字节跳动面试题

      Flink消费kafka消息实战(1),第6张

      2、2000页互联网Java面试题大全

      Flink消费kafka消息实战(1),第7张

      3、高阶必备,算法学习

      Flink消费kafka消息实战(1),第8张

      来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频**

      如果你觉得这些内容对你有帮助,可以扫码领取!

      Flink消费kafka消息实战(1),img,第5张

      分享

      1、算法大厂——字节跳动面试题

      [外链图片转存中…(img-gqUH0fBH-1711366115621)]

      2、2000页互联网Java面试题大全

      [外链图片转存中…(img-N3QcornQ-1711366115621)]

      3、高阶必备,算法学习

      [外链图片转存中…(img-A1OblZgd-1711366115622)]

      需要更多Java资料的小伙伴可以帮忙点赞+关注,点击传送门,即可免费领取!

       
打赏
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《Flink消费kafka消息实战(1)》
文章链接:https://goodmancom.com/wl/175918.html