Flink操练(六)之DS简介(6)温度测量数据流
本章介绍了Flink DataStream API的基本知识。我们展示了典型的Flink流处理程序的结构和组成部分,还讨论了Flink的类型系统以及支持的数据类型,还展示了数据和分区转换操作。窗口操作符,基于时间语义的转换操作,有状态的操作符,以及和外部系统的连接器将在接下来的章节进行介绍。阅读完这一章后,我们将会知道如何去实现一个具有基本功能的流处理程序。我们的示例程序采用Scala语言,因为Scala语言相对比较简洁。但Java API也是十分类似的(特殊情况,我们将会指出)。在我们的Github仓库里,我们所写的应用程序具有Scala和Java两种版本。 1 你好,Flink!
让我们写一个简单的例子来获得使用DataStream API编写流处理应用程序的粗浅印象。我们将使用这个简单的示例来展示一个Flink程序的基本结构,以及介绍一些DataStream API的重要特性。我们的示例程序摄取了一条(来自多个传感器的)温度测量数据流。
首先让我们看一下表示传感器读数的数据结构:
scala version case class SensorReading(id: String, timestamp: Long, temperature: Double)
java version public class SensorReading { public String id; public long timestamp; public double temperature; public SensorReading() { } public SensorReading(String id, long timestamp, double temperature) { this.id = id; this.timestamp = timestamp; this.temperature = temperature; } public String toString() { return "(" + this.id + ", " + this.timestamp + ", " + this.temperature + ")"; } }
示例程序5-1将温度从华氏温度读数转换成摄氏温度读数,然后针对每一个传感器,每5秒钟计算一次平均温度值。
scala version object AverageSensorReadings { def main(args: Array[String]) { // 创建运行时环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 使用事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sensorData: DataStream[SensorReading] = env.addSource(new SensorSource) val avgTemp = sensorData .map(r => { val celsius = (r.temperature - 32) * (5.0 / 9.0) SensorReading(r.id, r.timestamp, celsius) }) .keyBy(_.id) .timeWindow(Time.seconds(5)) .apply(new TemperatureAverager) avgTemp.print() env.execute("Compute average sensor temperature") } }
java version public class AverageSensorReadings { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream sensorData = env.addSource(new SensorSource()); DataStream avgTemp = sensorData .map(r -> { Double celsius = (r.temperature - 32) * (5.0 / 9.0); return SensorReading(r.id, r.timestamp, celsius); }) .keyBy(r -> r.id) .timeWindow(Time.seconds(5)) .apply(new TemperatureAverager()); avgTemp.print(); env.execute("Compute average sensor temperature"); }
你可能已经注意到Flink程序的定义和提交执行使用的就是正常的Scala或者Java的方法。大多数情况下,这些代码都写在一个静态main方法中。在我们的例子中,我们定义了AverageSensorReadings对象,然后将大多数的应用程序逻辑放在了main()中。
Flink流处理程序的结构如下: 创建Flink程序执行环境。 从数据源读取一条或者多条流数据 使用流转换算子实现业务逻辑 将计算结果输出到一个或者多个外部设备(可选) 执行程序
接下来我们详细地学习一下这些部分。 2 搭建执行环境
编写Flink程序的第一件事情就是搭建执行环境。执行环境决定了程序是运行在单机上还是集群上。在DataStream API中,程序的执行环境是由StreamExecutionEnvironment设置的。在我们的例子中,我们通过调用静态getExecutionEnvironment()方法来获取执行环境。这个方法根据调用方法的上下文,返回一个本地的或者远程的环境。如果这个方法是一个客户端提交到远程集群的代码调用的,那么这个方法将会返回一个远程的执行环境。否则,将返回本地执行环境。
也可以用下面的方法来显式的创建本地或者远程执行环境:
scala version // create a local stream execution environment val localEnv = StreamExecutionEnvironment .createLocalEnvironment() // create a remote stream execution environment val remoteEnv = StreamExecutionEnvironment .createRemoteEnvironment( "host", // hostname of JobManager 1234, // port of JobManager process "path/to/jarFile.jar" ) // JAR file to ship to the JobManager
java version StreamExecutionEnvironment localEnv = StreamExecutionEnvironment .createLocalEnvironment(); StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment .createRemoteEnvironment( "host", // hostname of JobManager 1234, // port of JobManager process "path/to/jarFile.jar" ); // JAR file to ship to the JobManager
接下来,我们使用 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 来将我们程序的时间语义设置为事件时间。执行环境提供了很多配置选项,例如:设置程序的并行度和程序是否开启容错机制。3 读取输入流
一旦执行环境设置好,就改写业务逻辑了。 StreamExecutionEnvironment 提供了创建数据源的方法,这些方法可以从数据流中将数据摄取到程序中。数据流可以来自消息队列或者文件系统,也可能是实时产生的(例如socket)。
在我们的例子里面,我们这样写:
scala version val sensorData: DataStream[SensorReading] = env .addSource(new SensorSource)
java version DataStream sensorData = env .addSource(new SensorSource());
这样就可以连接到传感器测量数据的数据源并创建一个类型为 SensorReading 的DataStream 了。Flink支持很多数据类型,我们将在接下来的章节里面讲解。在我们的例子里面,我们的数据类型是一个定义好的Scala样例类。SensorReading 样例类包含了传感器ID,数据的测量时间戳,以及测量温度值。assignTimestampsAndWatermarks(new SensorTimeAssigner) 方法指定了如何设置事件时间语义的时间戳和水位线。有关SensorTimeAssigner 我们后面再讲。4 转换算子的使用
一旦我们有一条DataStream,我们就可以在这条数据流上面使用转换算子了。转换算子有很多种。一些转换算子可以产生一条新的DataStream,当然这个DataStream的类型可能是新类型。还有一些转换算子不会改变原有DataStream的数据,但会将数据流分区或者分组。业务逻辑就是由转换算子串起来组合而成的。
在我们的例子中,我们首先使用 map() 转换算子将传感器的温度值转换成了摄氏温度单位。然后,我们使用keyBy() 转换算子将传感器读数流按照传感器ID进行分区。接下来,我们定义了一个timeWindow() 转换算子,这个算子将每个传感器ID所对应的分区的传感器读数分配到了5秒钟的滚动窗口中。
scala version val avgTemp = sensorData .map(r => { val celsius = (r.temperature - 32) * (5.0 / 9.0) SensorReading(r.id, r.timestamp, celsius) }) .keyBy(_.id) .timeWindow(Time.seconds(5)) .apply(new TemperatureAverager)
java version DataStream avgTemp = sensorData .map(r -> { Double celsius = (r.temperature -32) * (5.0 / 9.0); return SensorReading(r.id, r.timestamp, celsius); }) .keyBy(r -> r.id) .timeWindow(Time.seconds(5)) .apply(new TemperatureAverager());
窗口转换算子将在"窗口操作符"一章中讲解。最后,我们使用了一个UDF函数来计算每个窗口的温度的平均值。我们稍后将会讨论UDF函数的实现。 5 输出结果
流处理程序经常将它们的计算结果发送到一些外部系统中去,例如:Apache Kafka,文件系统,或者数据库中。Flink提供了一个维护得很好的sink算子的集合,这些sink算子可以用来将数据写入到不同的系统中去。我们也可以实现自己的sink算子。也有一些Flink程序并不会向第三方外部系统发送数据,而是将数据存储到Flink系统内部,然后可以使用Flink的可查询状态的特性来查询数据。
在我们的例子中,计算结果是一个 DataStream[SensorReading] 数据记录。每一条数据记录包含了一个传感器在5秒钟的周期里面的平均温度。计算结果组成的数据流将会调用print() 将计算结果写到标准输出。avgTemp.print()
要注意一点,流的Sink算子的选择将会影响应用程序端到端( end-to-end )的一致性,具体就是应用程序的计算提供的到底是at-least-once 还是exactly-once 的一致性语义。应用程序端到端的一致性依赖于所选择的流的Sink算子和Flink的检查点算法的集成使用。6 执行
当应用程序完全写好时,我们可以调用 StreamExecutionEnvironment.execute() 来执行应用程序。在我们的例子中就是我们的最后一行调用:env.execute("Compute average sensor temperature")
Flink程序是惰性执行的。也就是说创建数据源和转换算子的API调用并不会立刻触发任何数据处理逻辑。API调用仅仅是在执行环境中构建了一个执行计划,这个执行计划包含了执行环境创建的数据源和所有的将要用在数据源上的转换算子。只有当 execute() 被调用时,系统才会触发程序的执行。
构建好的执行计划将被翻译成一个 JobGraph 并提交到JobManager 上面去执行。根据执行环境的种类,一个JobManager 将会运行在一个本地线程中(如果是本地执行环境的化)或者JobGraph 将会被发送到一个远程的JobManager 上面去。如果JobManager 远程运行,那么JobGraph 必须和一个包含有所有类和应用程序的依赖的JAR包一起发送到远程JobManager 。
梦婚礼国内浪漫婚纱照之大理旅拍记室内拍婚纱照很单调很没意思滴,场景会看起来很假。我和老公在网上找了好久,看了三亚的海,看了厦门的鼓浪屿。最后看梦婚礼APP上新娘分享的日记上,确定去大理拍,因为有山有水有海有古镇,
充满一次用两天?!海信手机金刚4热销中从非智能到智能,从单核变多核,从169屏幕进化到全面屏这几年手机发展的速度基本上是全面且迅猛的。不过在升级进化的过程中,我们似乎忘记了一点续航。导航手游社交网站这些高耗电的程序覆盖
国货最潮年!新年送礼妈妈也要潮起来我们都常说女儿乖,女儿巧,女儿是妈妈的贴心小棉袄。乖巧懂事的女儿,不仅从小的时候就懂得心疼妈妈,长大以后也是最疼妈妈的人,毕竟只有女人才更懂女人,男人大多时候还是大猪蹄子的属性。而
中国电信副总经理高同庆莅临海信手机展区体验点赞双屏手机A69月14日,在2018天翼智能生态博览会上,中国电信副总经理高同庆莅临海信手机展区,体验海信双屏手机A6,称赞了海信手机的研发实力和创新精神,同时,表扬海信的展区富有活力和创造力。
拿NEDC论续航蔚来等电动车如何反应?这一冬季是蔚来ES8从生产以来最为火爆的一季,但在备受关注的同时,也迎来了许多对蔚来不太友好的言论。近期,就有媒体借着NEDC的幌子,指出纯电动汽车最高续航里程与事实不符。作为纯电
通付盾上线一站式大数据治理平台,助力企业腾飞发展随着大数据时代的来临,企业管理的效率得到了大幅度提升,但问题也是层出不穷。为解决数据价值低应用难度大等问题,通付盾于近日正式推出了自主研发的新一代一站式图形化大数据治理平台大禹,为
想要奈何boss要娶我一样的霸道总裁?先从电眼女神开始!网剧在如今电视剧市场是一片火热景象,特别是宫斗剧基本上都是霸占C位,但就在开年不久,一部黑马网剧杀出宫斗剧重围,成为当下话题性超高的剧集,这部剧就是披着玛丽苏外衣的高甜言情剧奈何b
贞观拍卖汝窑精品低于市场价十几倍,实属罕见据香港星岛日报报道,香港苏富比4日举行中国瓷器及工艺品拍卖,有900年历史的北宋汝窑天青釉葵花洗经34口叫价,以天价2。0786亿港元成交,较拍卖前估值底价高逾3倍,刷新宋瓷世界拍
中南海红色内招酒国招壹號酒镶嵌在国酒之林的璀璨明珠访贵州国招酒业中南海酒厂董事长朱柏民先生记者王尚和九十年代初,在京工作的杭州籍朱柏民先生,响应党中央艰苦朴素勤俭节约发展西部经济中南海先行的号召,在国务院机关事务管理局机关服务局和
专项突破海信手机金刚4满足长续航手机市场需求当如今的手机普遍性能满足日常使用都无压力之后,针对独特需求推出产品的重要性就开始逐渐凸显出来了,长续航强拍照游戏性能强,各个方面都成为了厂商们的发力之处,于是我们可以看到细分市场在
业界热议区块链赋能新零售,链客商城火了!新零售时代下,如果说互联网时代的赋能方式仅仅只是流量输送的话,那么区块链技术的赋能方式更加全面更加多样化。我们可以将这种多样化的赋能方式看作是深度赋能的方式,当深度赋能时代来临,社