欢迎光临
Flink配置
   

Flink配置

一、Flink介绍

Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。

二、部署环境

操作系统环境:

flink支持Linux, Mac OS X, 和 Windows环境部署,本次部署选择Linux环境部署。

JDK:

要求Java 7或者更高

三、下载软件

• jdk1.8.0_144

• flink-1.4.2-bin-hadoop26-scala_2.11.tgz

四、部署安装

1、JDK安装步骤此处省略,安装后验证下JDK环境

Flink配置,在这里插入图片描述,第1张

2、安装部署flink

本文介绍flink部署分为两种模式:local,standalone。下面依次介绍这两种模式的部署方式。

找到下载的flink压缩包,进行解压

Flink配置,在这里插入图片描述,第2张

首先是local模式,最为简单。

Flink配置,在这里插入图片描述,第3张

我们可以通过查看日志确认是否启动成功

Flink配置,在这里插入图片描述,第4张

JobManager进程将会在8081端口上启动一个WEB页面,我们可以通过浏览器到hostname:8081中查看相关的信息。

可以打开页面查看到相关信息,说明local模式部署是没问题的。

下面来看一下standlone部署方式。

安装JDK,解压压缩包,都是一样的。不一样的是我们要修改解压后的flink配置文件。然后在集群主机间做免密,免密操作方法。

修改conf/flink-conf.yaml,我们将jobmanager.rpc.address的值设置成你master节点的IP地址。此外,我们通过jobmanager.heap.mb和taskmanager.heap.mb配置参数来设置每个节点的JVM能够分配的最大内存。从配置参数名字可以看出,这个参数的单位是MB,如果某些节点拥有比你之前设置的值更多的内存时,我们可以在那个节通过FLINK_TM_HEAP参数类覆盖值钱的设置。

我们需要把所有将要作为worker节点的IP地址存放在conf/slaves文件中,在conf/slaves文件中,每个IP地址必须放在一行,如下:

Flink配置,在这里插入图片描述,第5张

然后将修改好的flink包整理复制到集群各个节点。每个节点flink路径保持一致。然后启动集群

Flink配置,在这里插入图片描述,第6张

查看日志是否成功。

以上是部署方法,部署成功后,我们来跑一个demo程序,验证一下Flink的流处理功能,对其有个初步的了解。

flink为了更好的让大家理解,已经给大家提供了一些demo代码,demo的jar包可以在/examples/streaming首先看一下demo代码:

object SocketWindowWordCount {
	def main(args: Array[String]) : Unit = {
	        // the port to connect to
        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("No port specified. Please run 'SocketWindowWordCount --port '")
                return
                }
        }
	        // get the execution environment
            val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
	        // get input data by connecting to the socket
            val text = env.socketTextStream("localhost", port, '\n')
	        // parse the data, group it, window it, and aggregate the counts
            val windowCounts = text
               .flatMap { w => w.split("\\s") }
               .map { w => WordWithCount(w, 1) }
               .keyBy("word")
               .timeWindow(Time.seconds(5), Time.seconds(1))
               .sum("count")
	        // print the results with a single thread, rather than in parallel
            windowCounts.print().setParallelism(1)
	        env.execute("Socket Window WordCount")
     }
	// Data type for words with count
    case class WordWithCount(word: String, count: Long)
}

这个demo是监控端口,然后对端口输入单子进行wordcount的程序。

运行demo,首先打开一个窗口进行端口数据输入:

Flink配置,在这里插入图片描述,第7张

然后运行demo监控端口单词输入统计:

Flink配置,在这里插入图片描述,第8张

运行后可以看到结果统计:

Flink配置,在这里插入图片描述,第9张

 
 初中随笔  free欧美高清猪马牛  朱顶红的养殖方法和注意事项养殖日记  国庆节用什么花庆祝  孝道文化  杏花的诗句 
打赏
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《Flink配置》
文章链接:https://goodmancom.com/wl/175719.html