Spark 2.1.2 Spark Streaming checkpoint interval not respected

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark 2.1.2 Spark Streaming checkpoint interval not respected

Shing Hing Man-2
Hi,

In the following example using mapWithState, I set checkpoint interval to 1 minute.
From the log, Spark stills write to the checkpoint directory every second.
Would be appreciated if someone can point out what I have done wrong.

object MapWithStateDemo {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: MapWithStateDemo <hostname> <port>")
System.exit(1)
}


val sparkConf = new SparkConf().setAppName("MapWithStateDemo")
.setIfMissing("spark.master","local[*]")

// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))


// Initial state RDD for mapWithState operation
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}

val stateDstream: MapWithStateDStream[String, Int, Int, (String, Int)] =
wordDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(10)).initialState(initialRDD))


stateDstream.checkpoint(Minutes(1L))
stateDstream.print()



val targetDir = new File(getClass.getResource("/").toURI).getParentFile.getParentFile
val checkpointDir = targetDir + "/checkpoint"
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
}
}
Thanks in advance for any assistance !

Shing