Issue Storing offset in Kafka for Spark Streaming Application

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue Storing offset in Kafka for Spark Streaming Application

Arpan Rajani
Hi all,

In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to store the offsets in Kafka in order to achieve restartability of the streaming application. ( Using checkpoints, I already implemented, we will require to change code in production hence checkpoint won't work)

Checking Spark Streaming documentation- Storing offsets on Kafka approach : 

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself, which describes :

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

Based on this, I modified the code like following:

val kafkaMap:Map[String,Object] = KakfaConfigs

val stream:InputDStream[ConsumerRecord[String,String]] = KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] (Array("topicName"),kafkaMap))

stream.foreach { rdd =>
    val offsetRangers : Array[OffsetRanger] = rdd.asInstanceOf[HasOffsetRangers].offsetRanges

    // Filter out the values which have empty values and get the tuple of type 
        // ( topicname, stringValue_read_from_kafka_topic)
    stream.map(x => ("topicName",x.value)).filter(x=> !x._2.trim.isEmpty).foreachRDD(processRDD _)

    // Sometime later, after outputs have completed.
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


def processRDD(rdd:RDD[(String,String)]) {
 // Process futher to hdfs 
}

Now, When I try to start Streaming application, it does not start and looking at the logs, here is what we see :

java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
    at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
    at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)

Can anyone suggest, or help to understand what are we missing here?


Regards,
Arpan
Reply | Threaded
Open this post in threaded view
|

Re: Issue Storing offset in Kafka for Spark Streaming Application

maasg
Hi Arpan,

The error suggests that the streaming context has been started with streamingContext.start() and after that statement, some other dstream operations have been attempted.
A suggested pattern to manage the offsets is the following:

var offsetRanges: Array[OffsetRanger] = _

//create streaming context, streams, ...
// as first operation after the stream has been created, do:

stream.foreachRDD { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}
//Then do other desired operations on the streaming data
val resultStream = stream.map(...).filter(...).transform(...)
//materialize the resulting stream

resultStream.foreachRDD{rdd => 
// do stuff... write to a db, to a kafka topic,... whatever,...

//at the end of the process, commit the offsets (note that I use the original stream instance, not `resultStream`
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

I hope this helps,

kr, Gerard.









 

On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani <[hidden email]> wrote:
Hi all,

In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to store the offsets in Kafka in order to achieve restartability of the streaming application. ( Using checkpoints, I already implemented, we will require to change code in production hence checkpoint won't work)

Checking Spark Streaming documentation- Storing offsets on Kafka approach : 

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself, which describes :

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

Based on this, I modified the code like following:

val kafkaMap:Map[String,Object] = KakfaConfigs

val stream:InputDStream[ConsumerRecord[String,String]] = KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] (Array("topicName"),kafkaMap))

stream.foreach { rdd =>
    val offsetRangers : Array[OffsetRanger] = rdd.asInstanceOf[HasOffsetRangers].offsetRanges

    // Filter out the values which have empty values and get the tuple of type 
        // ( topicname, stringValue_read_from_kafka_topic)
    stream.map(x => ("topicName",x.value)).filter(x=> !x._2.trim.isEmpty).foreachRDD(processRDD _)

    // Sometime later, after outputs have completed.
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


def processRDD(rdd:RDD[(String,String)]) {
 // Process futher to hdfs 
}

Now, When I try to start Streaming application, it does not start and looking at the logs, here is what we see :

java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
    at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
    at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)

Can anyone suggest, or help to understand what are we missing here?


Regards,
Arpan

Reply | Threaded
Open this post in threaded view
|

Re: Issue Storing offset in Kafka for Spark Streaming Application

Arpan Rajani
Hi Gerard,

Excellent, indeed your inputs helped. Thank you for the quick reply. 

I modified the code based on inputs. 

Now the application starts and it reads from the topic. Now we stream like 50,000 messages on the Kafka topic. 

After a while we terminate the application using YARN kill and check how many messages were written in HBase (say 9,000), 
Then we restart the application and wait for messages to get picked up from the topic, application does not read anything from the topic, that means Streaming Application fails to get the correct offset from where it should start. ( this was not the case with checkpointing mechanism, where I could see all 50,000 messages after restart). 

What do you think is missing in this?

Following is the improved code based on previous inputs

//create Spark Streaming Context

val stream:InputDStream[ConsumerRecord[String,String]] = KafkaUtil.createDirectStream(........))

// Modified based on inputs

stream.foreachRDD { rdd  =>

 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

}

val resultStream = stream.map ( x => (“tpicName”,x.value)).filter( x => !x._2.trim.isEmpty)

resultStream.foreach { rdd =>

  processRDD(rdd) // this stores messages in HBase

  //commit offsets using original stream.

  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

}

ssc.start()

ssc.awaitTermination()

.........

Could you please help me, figuring out what is missing here?

Many thanks,
Arpan



On Fri, Oct 13, 2017 at 3:27 PM, Gerard Maas <[hidden email]> wrote:
Hi Arpan,

The error suggests that the streaming context has been started with streamingContext.start() and after that statement, some other dstream operations have been attempted.
A suggested pattern to manage the offsets is the following:

var offsetRanges: Array[OffsetRanger] = _

//create streaming context, streams, ...
// as first operation after the stream has been created, do:

stream.foreachRDD { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}
//Then do other desired operations on the streaming data
val resultStream = stream.map(...).filter(...).transform(...)
//materialize the resulting stream

resultStream.foreachRDD{rdd => 
// do stuff... write to a db, to a kafka topic,... whatever,...

//at the end of the process, commit the offsets (note that I use the original stream instance, not `resultStream`
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

I hope this helps,

kr, Gerard.









 

On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani <[hidden email]> wrote:
Hi all,

In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to store the offsets in Kafka in order to achieve restartability of the streaming application. ( Using checkpoints, I already implemented, we will require to change code in production hence checkpoint won't work)

Checking Spark Streaming documentation- Storing offsets on Kafka approach : 

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself, which describes :

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

Based on this, I modified the code like following:

val kafkaMap:Map[String,Object] = KakfaConfigs

val stream:InputDStream[ConsumerRecord[String,String]] = KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] (Array("topicName"),kafkaMap))

stream.foreach { rdd =>
    val offsetRangers : Array[OffsetRanger] = rdd.asInstanceOf[HasOffsetRangers].offsetRanges

    // Filter out the values which have empty values and get the tuple of type 
        // ( topicname, stringValue_read_from_kafka_topic)
    stream.map(x => ("topicName",x.value)).filter(x=> !x._2.trim.isEmpty).foreachRDD(processRDD _)

    // Sometime later, after outputs have completed.
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


def processRDD(rdd:RDD[(String,String)]) {
 // Process futher to hdfs 
}

Now, When I try to start Streaming application, it does not start and looking at the logs, here is what we see :

java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
    at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
    at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)

Can anyone suggest, or help to understand what are we missing here?


Regards,
Arpan