博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink之Operator State(non-keyed state)
阅读量:3959 次
发布时间:2019-05-24

本文共 4698 字,大约阅读时间需要 15 分钟。

flink介绍,flink对所有的算子都支持有状态计算,在博主之前分享的文章中,关于keyed state已经做过很详细的介绍,欢迎各位发烧友点击交流

虽然在我们平时的生产环境中,关于keyed state应用的比较多,但是对于那些基于窗口输出到外部系统的需要,为了数据不丢失或者重复输出到外部系统,sink算子也是需要进行保存状态的,接下来本文主要介绍Operator state

1、Operator state

如果用户想要使用Operator State,只需要实现通用的CheckpointedFunction 接口或者ListCheckpointed<T extends Serializable>,值得注意的是,目前的operator-state仅仅支持list-style风格的状态,要求所存储的状态必须是一个List,且其中的元素必须可以序列化。

(1)CheckpointedFunction

CheckpointedFunction提供两种不同的状态分发方案:Even-splitUnion,该接口提供了两个方法。

Even-split:表示系统在故障恢复时,会将operator-state的元素均分给所有的operator实例,每个operator实例将获取到整个operator-state的sub-list数据。

Union:表示系统在故障恢复时,每一个operator实例可以获取到整个operator-state的全部数据。

void snapshotState(FunctionSnapshotContext context) throws Exception;void initializeState(FunctionInitializationContext context) throws Exception;
  • snapshotState():调用checkpoint()的时候,系统会调用snapshotState()对状态做快照
  • initializeState():第一次启动或者从上一次状态恢复的时候,系统会调用initializeState()

样例:进行wordCount统计,在统计到一定数据阈值的情况下,才输出到下一个算子或者外围系统。

class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]  with CheckpointedFunction  {
var listState:ListState[(String,Int)]=_ val bufferedElements = ListBuffer[(String, Int)]() //负责将数据输出到外围系统 override def invoke(value: (String, Int)): Unit = {
bufferedElements += value if(bufferedElements.size == threshold){
for(ele <- bufferedElements){
println(ele) } bufferedElements.clear() } } //是在savepoint|checkpoint时候将数据持久化 override def snapshotState(context: FunctionSnapshotContext): Unit = {
listState.clear() for(ele <- bufferedElements){
listState.add(ele) } } //状态恢复|初始化 创建状态 override def initializeState(context: FunctionInitializationContext): Unit = {
val lsd = new ListStateDescriptor[(String, Int)]("buffered-elements",createTypeInformation[(String,Int)]) //context.getOperatorStateStore.getUnionState(lsd) //Union方案 listState=context.getOperatorStateStore.getListState(lsd) //Even-split方案 if(context.isRestored){
for(element <- listState.get().asScala) {
bufferedElements += element } } }}
var env=StreamExecutionEnvironment.getExecutionEnvironment	env.socketTextStream("centos",9999)		.flatMap(_.split("\\s+"))		.map((_,1))		.keyBy(0)		.sum(1)		.addSink(new BufferingSink(5))env.execute("operator_state")

注意:在进行测试的时候,一定要将全局的并行度设置为1,方便测试。

附:测试使用到的指令:

[root@centos flink-1.8.1]# ./bin/flink list -m centos:8081------------------ Running/Restarting Jobs -------------------17.10.2019 09:49:20 : f21795e74312eb06fbf0d48cb8d90489 : testoperatorstate (RUNNING)--------------------------------------------------------------[root@centos flink-1.8.1]# ./bin/flink cancel -m centos:8081 -s hdfs:///savepoints f21795e74312eb06fbf0d48cb8d90489Cancelling job f21795e74312eb06fbf0d48cb8d90489 with savepoint to hdfs:///savepoints.Cancelled job f21795e74312eb06fbf0d48cb8d90489. Savepoint stored in hdfs://centos:9000/savepoints/savepoint-f21795-38e7beefe07b.

在做状态恢复的时候,要带上checkpoint。

(2)ListCheckpointed

ListCheckpointed接口是CheckpointedFunction接口的一种变体形式,仅仅支持Even-split状态的分发策略。

List
snapshotState(long checkpointId, long timestamp) throws Exception;void restoreState(List
state) throws Exception;
  • snapshotState():调用checkpoint()的时候,系统会调用snapshotState()对状态做快照。
  • restoreState():等价于上述CheckpointedFunction中声明的initializeState()方法,用作状态恢复。

案例:

import java.lang.{
Long => JLong} //修改类别名import scala.{
Long => SLong} //修改类别名class CustomStatefulSourceFunction extends ParallelSourceFunction[SLong] with ListCheckpointed[JLong]{
@volatile var isRunning:Boolean = true var offset = 0L override def run(ctx: SourceFunction.SourceContext[SLong]): Unit = {
val lock = ctx.getCheckpointLock while(isRunning){
Thread.sleep(1000) lock.synchronized({
ctx.collect(offset) offset += 1 }) } } override def cancel(): Unit = {
isRunning=false } override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JLong] = {
Collections.singletonList(offset) //存储的是 当前source的偏移量,如果状态不可拆分,用户可以使Collections.singletonList } override def restoreState(state: util.List[JLong]): Unit = {
for (s <- state.asScala) {
offset = s } }}
var env=StreamExecutionEnvironment.getExecutionEnvironment	env.addSource[Long](new CustomStatefulSourceFunction)		.print("offset:")env.execute("test_Offset")

好了,关于Operator本文介绍到这里,flink除了Keyed state和Operator state,其实还有Broadcast state,博主在前面的文章中已经分享过,需要的伙伴们可以点击链接:进行查看,博主用了两个世纪案例介绍了keyed和non-keyed的broadcast state,如有不正确的地方,欢迎交流指出错误,博主愿意与放大发烧友共同学习、进步。

转载地址:http://oimzi.baihongyu.com/

你可能感兴趣的文章
测试JSTL表达式
查看>>
一口一口吃掉Struts(六)——动态ActionForm
查看>>
一口一口吃掉struts(七)——ActionForward知多少
查看>>
浅析Hibernate映射(二)——关系映射(3)
查看>>
浅析Hibernate映射(四)——组件映射(component)
查看>>
Hibernate性能优化
查看>>
Spring核心ioc
查看>>
SSH框架总结(框架分析+环境搭建+实例源码下载)
查看>>
Struts2+Spring3+Mybatis3开发环境搭建
查看>>
mongoDB入门必读(概念与实战并重)
查看>>
通俗易懂解剖jbpm4
查看>>
云盘 同步盘介绍 同步工具介绍
查看>>
rsync
查看>>
win7 英文版电脑 不睡眠,不休眠
查看>>
Bash中如何判断一个命令是否存在 查看当前目录下文件的个数
查看>>
makefile
查看>>
linux 文件权限
查看>>
部分简化字感觉不如繁体字有深意
查看>>
cgo 崩溃 64位地址截断引发的挂死问题
查看>>
drbd
查看>>