Flink操练(三十二)之自定义键控状态(二)ListState
0 简介
ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下: ListState.add(value: T) ListState.addAll(values: java.util.List[T]) ListState.get()返回Iterable[T] ListState.update(values: java.util.List[T])
ListState需要将某些值存到一个List中(Iterable),意味着缓存的数据不只是一个而是多个值。很多情况下都可以使用,例如计算的数值要包含全天的每一个记录,此时只有将每个记录的值存成一个列表才可以计算。 1.实例
1.1 实例一
首先需要先定义一个ListState,然后再重写KeyedProcessFunction中的open方法: private var itemState : ListState[ItemViewCount] = _ override def open(parameters: Configuration): Unit = { //命名状态变量的名字和类型 val itemStateDescription: ListStateDescriptor[ItemViewCount] = new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount]) itemState = getRuntimeContext.getListState(itemStateDescription) }
ListStateDescriptor提供了几种不同的定义方式:
两个参数分别是ListStateDescriptor的名字和typeClass
1.2 实例二 package qiuhua; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.util.Collector; import java.util.Collections; import java.util.List; /** * @program: bigdata_learn * @description: 通过ListState求key 出现了 3 次,则需要计算平均值 * @author: Mr.逗 * @create: 2021-09-08 16:18 **/ public class CountAverageWithListState extends RichFlatMapFunction, Tuple2> { /** * ValueState : 里面只能存一条元素 * ListState : 里面可以存很多数据 */ private ListState> elementsByKey; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //注册状态 ListStateDescriptor> descriptor = new ListStateDescriptor<> ("list_state"//状态名字 , Types.TUPLE(Types.LONG, Types.LONG)//状态存储的数据类型 ); elementsByKey=getRuntimeContext().getListState(descriptor); } @Override public void flatMap(Tuple2 value, Collector> out) throws Exception { Iterable> currentState = elementsByKey.get();//拿到当前key的状态值 //如果状态值没有初始化,则初始化 if(currentState==null) { elementsByKey.addAll(Collections.emptyList()); } //更新状态 elementsByKey.update((List>) value); //判断,如果当前key出现了3次,则需要计算平均值,并且输出 List> allElements = Lists.newArrayList(currentState); if (allElements.size()==3) { long count=0; long sum=0; for(Tuple2 ele:allElements) { count++; sum+=ele.f1; } double avg=(double)sum/count; out.collect(Tuple2.of(value.f0,avg)); //清除状态 elementsByKey.clear(); } } }
总结
Flink提供了三种基于key/value的state接口,ListState接口适用于缓存多个值的计算。具体实现之前,因为state必须是基于key,且必须获取getRuntimeContext,state必须同时满足两个条件: 直接基于keyedStream或者由keyedStream转换来的windowedStream 必须继承RichFunction
实际实现时候,因为windowedStream在scala中不能实现RichWindowFunction,因此在main中使用flatmap间接实现了windowFunction中的功能: val fromTransactionDataStream = watermarkTransaction .keyBy(_.code) .window(TumblingEventTimeWindows.of(Time.seconds(10))) val transaction = fromTransactionDataStream .apply(new StockTransactionApply) .keyBy(_._3) .flatMap(new TransactionStateFlatMapFunction)