state
Flink 在进行流处理的过程中是如何保证容错以及exactly once的呢? 本文主要介绍Flink 的state ,状态后端以及checkpoint的过程来介绍flink 是如何保证数据流在处理的过程中进行状态存储和检查点的运行的。
flink的state 主要包含keyState 和operateState. 每个下边又都包含managestate和RawState. Rawstate 是通过自己管理的byte[] 数组,manageState 是Flink管理的。 KeyState 下边的manageState 包含ValueState,MapState,ListState,ReducingState,AggregatingState.
ValueState<T>: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
ListState<T>: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List<T>) 进行添加元素,通过 Iterable<T> get() 获得整个列表。还可以通过 update(List<T>) 覆盖当前的列表。
ReducingState<T>: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
State使用MapState例子如下:
public class MapStateExample {
//统计每个用户每种行为的个数
public static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, String, Integer>> {
//定义一个MapState句柄
private transient MapState<String, Integer> behaviorCntState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(
"userBehavior", // 状态描述符的名称
TypeInformation.of(new TypeHint<String>() {}), // MapState状态的key的数据类型
TypeInformation.of(new TypeHint<Integer>() {}) // MapState状态的value的数据类型
);
// ListStateDescriptor<String> userBehaviorListStateDesc = new ListStateDescriptor<String>("userbahavior",TypeInformation.of(new TypeHint<String>() {
// }));
//userBehaviorMapStateDesc.enableTimeToLive(ttlConfig);
behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 获取状态
}
@Override
public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
Integer behaviorCnt = 1;
// 如果当前状态包括该行为,则+1
if (behaviorCntState.contains(value.f1)) {
behaviorCnt = behaviorCntState.get(value.f1) + 1;
}
// 更新状态
behaviorCntState.put(value.f1, behaviorCnt);
out.collect(Tuple3.of(value.f0, value.f1, behaviorCnt));
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 模拟数据源[userId,behavior,product]
DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
Tuple3.of(1L, "buy", "iphone"),
Tuple3.of(1L, "cart", "huawei"),
Tuple3.of(1L, "buy", "logi"),
Tuple3.of(1L, "fav", "oppo"),
Tuple3.of(2L, "buy", "huawei"),
Tuple3.of(2L, "buy", "onemore"),
Tuple3.of(2L, "fav", "iphone"));
userBehaviors
.keyBy(key->key.f0)
.flatMap(new UserBehaviorCnt())
.print();
env.execute("MapStateExample");
}
}
其它的state的使用方法跟这个比较类似。
state TTL
对于任何类型Keyed State都可以设定状态的生命周期(TTL),即状态的存活时间,以确保能够在规定时间内及时地清理状态数据。如果配置了状态的TTL,那么当状态过期时,存储的状态会被清除。状态生命周期功能可以通过StateTtlConfig配置,然后将StateTtlConfig配置传入StateDescriptor中的enableTimeToLive方法中即可。以下是三种不同的清理策略
StateTtlConfig ttlConfig = StateTtlConfig
// 指定TTL时长为10S
.newBuilder(Time.seconds(10))
//.cleanupFullSnapshot()
// 只对创建和写入操作有效
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// 不返回过期的数据
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
// 初始化状态
StateTtlConfig ttlConfig2 = StateTtlConfig.newBuilder(Time.seconds(10))
.cleanupIncrementally(10, false)
.build();
StateTtlConfig ttlConfig3 = StateTtlConfig
.newBuilder(Time.seconds(3))
.cleanupInRocksdbCompactFilter(1000)
.build();
userBehaviorMapStateDesc.enableTimeToLive(ttlConfig);
- 在state操作的时候的进行的ttl的设置,只有对state进行创建和读写的时候才会检查TTL 如果后续对该key一直没有操作,那会一直保留。
- 它的实现方法是存储后端在所有状态条目上维护一个惰性全局迭代器。某些事件(例如状态访问)会触发增量清理。每次触发增量清理时,迭代器都会向前迭代删除已遍历的过期数据。如果启用,则每次进行状态访问都会触发清理步骤。对于每个清理步骤,都会检查一定数量的数据是否过期。
有两个参数:第一个参数是检查每个清理步骤的状态条目数。第二个参数是一个标志,用于数据处理后触发清理步骤,此外对于每次状态访问同样有效。
关于这种方法有两点需要注意:第一个是增量清理所花费的时间增加了数据处理延迟。第二个应该可以忽略不计,但仍然值得一提:如果没有状态访问或没有数据处理记录,则不会删除过期状态。
- RocksDB定期运行异步压缩以合并状态更新并减少存储。Flink压缩过滤器使用TTL检查状态条目的到期时间戳,并丢弃所有过期值。
operateState
opersta state 作用于某个算子任务。该状态只能被同一任务执行的算子访问
在Flink中可以实现ListCheckpointed接口或者CheckpointedFunction 接口来实现一个Operator State
ListCheckpointed
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
使用Operator ListState时,在进行扩缩容时,重分布的策略(状态恢复的模式)如下图
ListCheckpointed 的操作示例如下
public class ListCheckpointedExample {
private static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple2<String, Long>> implements ListCheckpointed<Long> {
private Long userBuyBehaviorCnt = 0L;
@Override
public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple2<String, Long>> out) throws Exception {
if(value.f1.equals("buy")){
userBuyBehaviorCnt ++;
out.collect(Tuple2.of("buy",userBuyBehaviorCnt));
}
}
@Override
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
//返回单个元素的List集合,该集合元素是用户购买行为的数量
return Collections.singletonList(userBuyBehaviorCnt);
}
@Override
public void restoreState(List<Long> state) throws Exception {
// 在进行扩缩容之后,进行状态恢复,需要把其他subtask的状态加在一起
for (Long cnt : state) {
userBuyBehaviorCnt += 1;
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 模拟数据源[userId,behavior,product]
DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
Tuple3.of(1L, "buy", "iphone"),
Tuple3.of(1L, "cart", "huawei"),
Tuple3.of(1L, "buy", "logi"),
Tuple3.of(1L, "fav", "oppo"),
Tuple3.of(2L, "buy", "huawei"),
Tuple3.of(2L, "buy", "onemore"),
Tuple3.of(2L, "fav", "iphone"));
userBehaviors
.flatMap(new UserBehaviorCnt())
.print();
env.execute("ListCheckpointedExample");
}
}
checkpoint 扩容缩容的过程如下:
checkpoint Function 主要包含以下两个实现
public class CheckpointFunctionExample {
private static class UserBehaviorCnt implements CheckpointedFunction, FlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, Long, Long>> {
// 统计每个operator实例的用户行为数量的本地变量
private Long opUserBehaviorCnt = 0L;
// 每个key的state,存储key对应的相关状态
private ValueState<Long> keyedCntState;
// 定义operator state,存储算子的状态
private ListState<Long> opCntState;
@Override
public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, Long, Long>> out) throws Exception {
if (value.f1.equals("buy")) {
// 更新算子状态本地变量值
opUserBehaviorCnt += 1;
Long keyedCount = keyedCntState.value();
// 更新keyedstate的状态 ,判断状态是否为null,否则空指针异常
keyedCntState.update(keyedCount == null ? 1L : keyedCount + 1 );
// 结果输出
out.collect(Tuple3.of(value.f0, keyedCntState.value(), opUserBehaviorCnt));
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 使用opUserBehaviorCnt本地变量更新operator state
opCntState.clear();
opCntState.add(opUserBehaviorCnt);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 通过KeyedStateStore,定义keyedState的StateDescriptor描述符
ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("keyedCnt", TypeInformation.of(new TypeHint<Long>() {
}));
// 通过OperatorStateStore,定义OperatorState的StateDescriptor描述符
ListStateDescriptor opStateDescriptor = new ListStateDescriptor("opCnt", TypeInformation.of(new TypeHint<Long>() {
}));
// 初始化keyed state状态值
keyedCntState = context.getKeyedStateStore().getState(valueStateDescriptor);
// 初始化operator state状态
opCntState = context.getOperatorStateStore().getListState(opStateDescriptor);
// 初始化本地变量operator state
for (Long state : opCntState.get()) {
opUserBehaviorCnt += state;
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 模拟数据源[userId,behavior,product]
DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
Tuple3.of(1L, "buy", "iphone"),
Tuple3.of(1L, "cart", "huawei"),
Tuple3.of(1L, "buy", "logi"),
Tuple3.of(1L, "fav", "oppo"),
Tuple3.of(2L, "buy", "huawei"),
Tuple3.of(2L, "buy", "onemore"),
Tuple3.of(2L, "fav", "iphone"));
userBehaviors
.keyBy(0)
.flatMap(new UserBehaviorCnt())
.print();
env.execute("CheckpointFunctionExample");
}
}
状态后端
MemoryStateBackend
MemoryStateBackend将状态数据全部存储在JVM堆内存中,包括用户在使用DataStream API中创建的Key/Value State,窗口中缓存的状态数据,以及触发器等数据。MemoryStateBackend具有非常快速和高效的特点,但也具有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个应用的正常运行。同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免在生产环境中使用MemoryStateBackend。Flink将MemoryStateBackend作为默认状态后端。
MemoryStateBackend比较适合用于测试环境中,并用于本地调试和验证,不建议在生产环境中使用。但如果应用状态数据量不是很大,例如使用了大量的非状态计算算子,也可以在生产环境中使MemoryStateBackend.
FsStateBackend
FsStateBackend是基于文件系统的一种状态后端,这里的文件系统可以是本地文件系统,也可以是HDFS分布式文件系统。创建FsStateBackend的构造函数如下:
FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots)
其中path如果为本地路径,其格式为“file:///data/flink/checkpoints”,如果path为HDFS路径,其格式为“hdfs://nameservice/flink/checkpoints”。FsStateBackend中第二个Boolean类型的参数指定是否以同步的方式进行状态数据记录,默认采用异步的方式将状态数据同步到文件系统中,异步方式能够尽可能避免在Checkpoint的过程中影响流式计算任务。如果用户想采用同步的方式进行状态数据的检查点数据,则将第二个参数指定为True即可。
相比于MemoryStateBackend, FsStateBackend更适合任务状态非常大的情况,例如应用中含有时间范围非常长的窗口计算,或Key/value State状态数据量非常大的场景,这时系统内存不足以支撑状态数据的存储。同时FsStateBackend最大的好处是相对比较稳定,在checkpoint时,将状态持久化到像HDFS分布式文件系统中,能最大程度保证状态数据的安全性。
RocksDBStateBackend
与前面的状态后端不同,RocksDBStateBackend需要单独引入相关的依赖包。RocksDB 是一个 key/value 的内存存储系统,类似于HBase,是一种内存磁盘混合的 LSM DB。当写数据时会先写进write buffer(类似于HBase的memstore),然后在flush到磁盘文件,当读取数据时会现在block cache(类似于HBase的block cache),所以速度会很快。
RocksDBStateBackend在性能上要比FsStateBackend高一些,主要是因为借助于RocksDB存储了最新热数据,然后通过异步的方式再同步到文件系统中,但RocksDBStateBackend和MemoryStateBackend相比性能就会较弱一些。
需要注意 RocksDB 不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。不过 RocksDB 支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的 sst 文件即可。它的 Checkpoint 存储在外部文件系统(本地或HDFS),其容量限制只要单个 TaskManager 上 State 总量不超过它的内存+磁盘,单 Key最大 2G,总大小不超过配置的文件系统容量即可。对于超大状态的作业,例如天级窗口聚合等场景下可以使会用该状态后端。
checkpoint
Flink 的checkpoint 是容错的核心,在处理数据的过程中flink会定时将state的内容存储到外部存储系统中,当出现故障的时候就可以从外部的系统中恢复state的内容通过checkpoint机制,Flink可以实现Exactly-once语义(Flink内部的Exactly-once,关于端到端的exactly_once,Flink是通过两阶段提交协议实现的)。下面将会详细分析Flink的checkpoint机制。
检查点的生成
如上图,输入流是用户行为数据,包括购买(buy)和加入购物车(cart)两种,每种行为数据都有一个偏移量,统计每种行为的个数。
第一步:JobManager checkpoint coordinator 触发checkpoint。
第二步:假设当消费到[cart,3]这条数据时,触发了checkpoint。那么此时数据源会把消费的偏移量3写入持久化存储。
第三步:当写入结束后,source会将state handle(状态存储路径)反馈给JobManager的checkpoint coordinator。
第四步:接着算子count buy与count cart也会进行同样的步骤
第五步:等所有的算子都完成了上述步骤之后,即当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件,那么整个checkpoint也就完成了,如果中间有一个不成功,那么本次checkpoin就宣告失败。
检查点的恢复
通过上面的分析,或许你已经对Flink的checkpoint有了初步的认识。那么接下来,我们看一下是如何从检查点恢复的。
任务失败
重启作业
恢复检查点
继续处理数据
上述过程具体总结如下:
第一步:重启作业
第二步:从上一次检查点恢复状态数据
第三步:继续处理新的数据
Flink内部Exactly-Once实现
Flink提供了精确一次的处理语义,精确一次的处理语义可以理解为:数据可能会重复计算,但是结果状态只有一个。Flink通过Checkpoint机制实现了精确一次的处理语义,Flink在触发Checkpoint时会向Source端插入checkpoint barrier,checkpoint barriers是从source端插入的,并且会向下游算子进行传递。checkpoint barriers携带一个checkpoint ID,用于标识属于哪一个checkpoint,checkpoint barriers将流逻辑是哪个分为了两部分。对于双流的情况,通过barrier对齐的方式实现精确一次的处理语义。
关于什么是checkpoint barrier,可以看一下CheckpointBarrier类的源码描述,如下:
/**
* Checkpoint barriers用来在数据流中实现checkpoint对齐的.
* Checkpoint barrier由JobManager的checkpoint coordinator插入到Source中,
* Source会把barrier广播发送到下游算子,当一个算子接收到了其中一个输入流的Checkpoint barrier时,
* 它就会知道已经处理完了本次checkpoint与上次checkpoint之间的数据.
*
* 一旦某个算子接收到了所有输入流的checkpoint barrier时,
* 意味着该算子的已经处理完了截止到当前checkpoint的数据,
* 可以触发checkpoint,并将barrier向下游传递
*
* 根据用户选择的处理语义,在checkpoint完成之前会缓存后一次checkpoint的数据,
* 直到本次checkpoint完成(exactly once)
*
* checkpoint barrier的id是严格单调递增的
*
*/
public class CheckpointBarrier extends RuntimeEvent {...}
可以看出checkpoint barrier主要功能是实现checkpoint对齐的,从而可以实现Exactly-Once处理语义。
下面将会对checkpoint过程进行分解,具体如下:
图1,包括两个流,每个任务都会消费一条用户行为数据(包括购买(buy)和加购(cart)),数字代表该数据的偏移量,count buy任务统计购买行为的个数,coun cart统计加购行为的个数。
图2,触发checkpoint,JobManager会向每个数据源发送一个新的checkpoint编号,以此来启动检查点生成流程。
图3,当Source任务收到消息后,会停止发出数据,然后利用状态后端触发生成本地状态检查点,并把该checkpoint barrier以及checkpoint id广播至所有传出的数据流分区。状态后端会在checkpoint完成之后通知任务,随后任务会向Job Manager发送确认消息。在将checkpoint barrier发出之后,Source任务恢复正常工作。
图4,Source任务发出的checkpoint barrier会发送到与之相连的下游算子任务,当任务收到一个新的checkpoint barrier时,会继续等待其他输入分区的checkpoint barrier到来,这个过程称之为barrier 对齐,checkpoint barrier到来之前会把到来的数据线缓存起来。
图5,任务收齐了全部输入分区的checkpoint barrier之后,会通知状态后端开始生成checkpoint,同时会把checkpoint barrier广播至下游算子。
图6,任务在发出checkpoint barrier之后,开始处理因barrier对齐产生的缓存数据,在缓存的数据处理完之后,就会继续处理输入流数据。
图7,最终checkpoint barrier会被传送到sink端,sink任务接收到checkpoint
barrier之后,会向其他算子任务一样,将自身的状态写入checkpoint,之后向Job Manager发送确认消息。Job Manager接收到所有任务返回的确认消息之后,就会将此次检查点标记为完成。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpoint的时间间隔,如果状态比较大,可以适当调大该值
env.enableCheckpointing(1000);
// 配置处理语义,默认是exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 两个checkpoint之间的最小时间间隔,防止因checkpoint时间过长,导致checkpoint积压
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoint执行的上限时间,如果超过该阈值,则会中断checkpoint
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 最大并行执行的检查点数量,默认为1,可以指定多个,从而同时出发多个checkpoint,提升效率
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设定周期性外部检查点,将状态数据持久化到外部系统中,
// 使用该方式不会在任务正常停止的过程中清理掉检查点数据
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Feb 23, 2021 at 11:32 am