Flink操练(四十)之水位线
关键代码DataStream> watermarks = map.assignTimestampsAndWatermarks( WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner>() { @Override public long extractTimestamp(Tuple2 element, long recordTimestamp) { return element.f1; } }) ); DataStream process = watermarks.keyBy(v -> v.f0) .process(new KeyedProcessFunction, String>() { @Override public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { out.collect("当前的水位线是:" + ctx.timerService().currentWatermark()); ctx.timerService().registerEventTimeTimer(value.f1 + 5000L); out.collect("注册了一个时间戳是:" + new Timestamp(value.f1 + 5000L) + " 的定时器"); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { super.onTimer(timestamp, ctx, out); out.collect("定时器触发了!"); } });完整代码package day04; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.sql.Timestamp; import java.time.Duration; /** * @program: bigData_learn * @description: 水位线测试 * @author: Mr.逗 * @create: 2021-09-24 09:17 **/ public class WaterTest1 { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource source = env.socketTextStream("172.17.0.50", 9999); DataStream> map = source.map(v -> Tuple2.of(v.split(" ")[0], Long.parseLong(v.split(" ")[1]) * 1000L)).returns(Types.TUPLE(Types.STRING,Types.LONG)); DataStream> watermarks = map.assignTimestampsAndWatermarks( WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner>() { @Override public long extractTimestamp(Tuple2 element, long recordTimestamp) { return element.f1; } }) ); DataStream process = watermarks.keyBy(v -> v.f0) .process(new KeyedProcessFunction, String>() { @Override public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { out.collect("当前的水位线是:" + ctx.timerService().currentWatermark()); ctx.timerService().registerEventTimeTimer(value.f1 + 5000L); out.collect("注册了一个时间戳是:" + new Timestamp(value.f1 + 5000L) + " 的定时器"); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { super.onTimer(timestamp, ctx, out); out.collect("定时器触发了!"); } }); process.print(); String name = WaterTest1.class.getName(); try { env.execute(name); }catch (Exception e) { e.printStackTrace(); } } }