spark streaming 检查点机制(checkpoint)

发布于:2021-07-31 15:25:06

一个Streaming应用程序要求7天24小时不间断运行,因此必须适应各种导致应用程序失败的场景。Spark Streaming的检查点具有容错机制,有足够的信息能够支持故障恢复。支持两种数据类型的检查点:元数据检查点和数据检查点。


(1)元数据检查点,在类似HDFS的容错存储上,保存Streaming计算信息。这种检查点用来恢复运行Streaming应用程序失败的Driver进程。


(2)数据检查点,在进行跨越多个批次合并数据的有状态操作时尤其重要。在这种转换操作情况下,依赖前一批次的RDD生成新的RDD,随着时间不断增加,RDD依赖链的长度也在增加,为了避免这种无限增加恢复时间的情况,通过周期检查将转换RDD的中间状态进行可靠存储,借以切断无限增加的依赖。使用有状态的转换,如果updateStateByKey或者reduceByKeyAndWindow在应用程序中使用,那么需要提供检查点路径,对RDD进行周期性检查。


元数据检查点主要用来恢复失败的Driver进程,而数据检查点主要用来恢复有状态的转换操作。无论是Driver失败,还是Worker失败,这种检查点机制都能快速恢复。许多Spark Streaming都是使用检查点方式。但是简单的Streaming应用程序,不包含状态转换操作不能运行检查点;从Driver程序故障中恢复可能会造成一些收到没有处理的数据丢失。


为了让一个Spark Streaming程序能够被恢复,需要启用检查点,必须设置一个容错的、可靠的文件系统(如HDFS、S3等)路径保存检查点信息,同时设置时间间隔。


streamingContext.checkpoint(checkpointDirectory)//checkpointDirectory是一个文件系统路径(最好是一个可靠的比如hdfs://....)


dstream.checkpoint(checkpointInterval)//设置时间间隔


当程序第一次启动时,创建一个新的StreamingContext,接着创建所有的数据流,然后再调用start()方法。


//定义一个创建并设置StreamingContext的函数


def functionToCreateContext(): StreamingContext = {


val ssc = new StreamingContext(...) ? ? ? ? ? ? ? //创建StreamingContext实例


val DsSream = ssc.socketTextStream(...) ? ? ?//创建DStream


...


ssc.checkpoint(checkpointDirectory) ? ? ? ? ? //设置检查点机制


ssc


}


//从检查点数据重建或者新建一个StreamingContext


val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreate-Context_)


//在context需要做额外的设置完成,不考虑是否被启动或重新启动


context. ...


//启动context


context.start()


context.awaitTermination()


通过使用getOrCreate创建StreamingContext。


当程序因为异常*羰保绻觳榈懵肪洞嬖冢騝ontext将从检查点数据中重建。如果检查点目录不存在(首次运行),将会调用functionToCreateContext函数新建context函数新建context,并设置DStream。


但是,Streaming需要保存中间数据到容错存储系统,这个策略会引入存储开销,进而可能会导致相应的批处理时间变长,因此,检查点的时间间隔需要精心设置。采取小批次时,每批次检查点可以显著减少操作的吞吐量;相反,检查点太少可能会导致每批次任务大小的增加。对于RDD检查点的有状态转换操作,其检查点间隔默认设置成DStream的滑动间隔的5~10倍。


故障恢复可以使用Spark的Standalone模式自动完成,该模式允许任何Spark应用程序的Driver在集群内启动,并在失败时*簟6杂赮ARN或Mesos这样的部署环境,则必须通过其他的机制*鬌river。



相关推荐

最新更新

猜你喜欢