Flink操练(四十六)之迟到数据更新窗口计算结果
关键代码 SingleOutputStreamOperator process = outputLateData.process(new ProcessWindowFunction, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable> elements, Collector out) throws Exception { // 初始化一个窗口状态变量,注意:窗口状态变量的可见范围是当前窗口 ValueState firstCalculate = context.windowState().getState(new ValueStateDescriptor("first", Types.BOOLEAN)); if (firstCalculate.value() == null) { out.collect("窗口第一次触发计算了!水位线是:" + context.currentWatermark() + " 窗口中共有 " + elements.spliterator().getExactSizeIfKnown()); firstCalculate.update(true); // 第一次触发process执行以后,更新为true } else { out.collect("迟到数据到了,更新以后的计算结果是:" + elements.spliterator().getExactSizeIfKnown()); } } });完整代码package day05; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.time.Duration; /** * @program: bigData_learn * @description: 使用迟到数据更新窗口计算结果 * @author: Mr.逗 * @create: 2021-09-24 14:34 **/ public class LateDataByWindow { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource source = env.socketTextStream("172.17.0.50", 9999); SingleOutputStreamOperator> map = source.map((MapFunction>) value -> { String[] arr = value.split(","); return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L); }).returns(Types.TUPLE(Types.STRING,Types.LONG)); DataStream> watermarks = map.assignTimestampsAndWatermarks( WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((SerializableTimestampAssigner>) (ele, l) -> ele.f1)); WindowedStream, String, TimeWindow> outputLateData = watermarks.keyBy(v -> v.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5)) .sideOutputLateData(new OutputTag>("late"){}); SingleOutputStreamOperator process = outputLateData.process(new ProcessWindowFunction, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable> elements, Collector out) throws Exception { // 初始化一个窗口状态变量,注意:窗口状态变量的可见范围是当前窗口 ValueState firstCalculate = context.windowState().getState(new ValueStateDescriptor("first", Types.BOOLEAN)); if (firstCalculate.value() == null) { out.collect("窗口第一次触发计算了!水位线是:" + context.currentWatermark() + " 窗口中共有 " + elements.spliterator().getExactSizeIfKnown()); firstCalculate.update(true); // 第一次触发process执行以后,更新为true } else { out.collect("迟到数据到了,更新以后的计算结果是:" + elements.spliterator().getExactSizeIfKnown()); } } }); process.print(); process.getSideOutput(new OutputTag>("late"){}).print(); String name = LateDataByWindow.class.getName(); try { env.execute(name); }catch (Exception e) { e.printStackTrace(); } } }
刘强东卸任京东集团CEO,徐雷接任,到底反映出什么问题?刘强东卸任京东集团CEO,徐雷接任,到底反映出什么问题?为什么大佬要退居幕后?这个问题我们把联想集团,阿里巴巴集团的权利交接,后期运作联系起来一起说。联想官方透露,未来,柳传志的主
互联网从业者如何转行互联网医疗?作者坛子谈医熟悉医药医疗医保商保互联网等领域,有着对当前医疗体制的深刻认知医药医疗医保的系统化理解互联网医疗的前沿行业信息商业模式的深入浅出讲解。一键三连,关注点赞收藏!诸位在互联
刘强东为什么放心了在大型互联网公司创始人中,刘强东看起来是最不会选择退居幕后那一个。如果不能控制这家企业(京东),我宁愿把它卖掉,刘强东曾直言。不过,最近几年,刘强东的管理哲学发生了明显变化,这位京
封号!罚款!冻结!跨境卖家何去何从?跨境圈封禁之事冲击波仍在继续!账号封禁风波越演越烈账号被封资金冻结自今年4月份以来,跨境电商平台亚马逊依据卖家行为准则等格式条款,对平台上的诸多卖家实施封店,波及众多中国卖家。封号
兼具高结晶高稳定的共价有机框架材料制备成功基于动态化学重构共价有机框架(ReconstructedCOF,RCCOF)的全新概念,通过可逆共价键预组装和合成后框架重构为不可逆共价键,制备了高结晶高稳定的共价有机框架(COF
python(16)函数(3)写在前面的话码字不易,点个赞关注一下作者再走吧呲牙1。传递列表将列表传递给函数后,函数就能直接访问其内容假设有一个用户列表,要问候其中的每位用户defgreetusers(name
iphone日历不显示中国的节假日?一招教你搞定使用iphone手机的人都知道,在日历中是不显示中国的节假日的,每次都需要重新打开第三方日历,有时候还是挺不方便的,要想搞定其实很简单,在这里教大家一个最简单的方式打开以下网址,法
如何查询身份证名下注册的微信支付账户?您可按以下方法查看身份证名下账户1进入查询名下账户,根据页面提示,输出姓名身份证号。点击下一步,系统人脸验证后,即可查询您身份证名下绑卡注册的账户信息。2选择需要注销的微信账户,点
2022年京东各大类目入驻开店保证金年费扣点资费标准一览表京东入驻开店资费主要有保证金年费扣点三部分组成,旗舰店专卖店专营店保证金一致,不开店可退还年费各大类目都是一样,除极个别类目都是1000元一个月,按年收取,因此下表不做统计扣点也就
台式电脑好用,还是笔记本好用?从实际用途出发主要用于出差办公的话,笔记本是更好的选择。对比台式机,笔记本最大的优点在于灵活方便,便于携带。且办公对于配置没有要求,市面上3000元左右的笔记本都可以满足基本的上网
没有电脑可以安装宽带吗,为什么?没有电脑也是可以装宽带的,因为现在的无线路由器都可以通过手机进行配置,不需要用电脑也可以。现在很多地区的光纤宽带,光猫已经支持路由模式。哪怕家里没有电脑,运营商的装维人员会给配置好