1.

How to configure checkpointing?

Answer»

Checkpointing can be enabled by setting a directory in a fault-tolerant, reliable file system like  HDFS, S3, etc, to which the checkpoint information will be saved. This is done by using streamingContext.checkpoint(checkpointDirectory). This will allow you to use the aforementioned stateful transformations. Additionally, if you want to make the application RECOVER from driver failures, you should use checkpointing functionality in your streaming application to have the FOLLOWING behavior:

  • When the program is being started for the first time, it will create a new StreamingContext, set up all the streams and then call start()
  • When the program is being restarted after failure, it will re-create a StreamingContext from the checkpoint data in the checkpoint directory.
def createStreamingContext():StreamingContext ={ val SSC = new StreamingContext(...) // new context  val lines = ssc.socketTextStream(...) // create DStreams ...  ssc.checkpoint(checkpointDirectory) // set checkpoint directory  ssc } // Get StreamingContext from checkpoint data or create a new one  val context = StreamingContext.getOrCreate(checkpointDirectory, createStreamingContext _)

If the checkpointDirectory exists, then the context will be recreated from the checkpoint data. If the directory does not exist (i.e., RUNNING for the first time), then the function createStreamingContext will be called to create a new context and set up the DStreams.



Discussion

No Comment Found