Re: Multiple Kafka topics processing in Spark 2.2

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka topics processing in Spark 2.2

Cody Koeninger
If you want an "easy" but not particularly performant way to do it, each org.apache.kafka.clients.consumer.ConsumerRecord has a topic.  

The topic is going to be the same for the entire partition as long as you haven't shuffled, hence the examples on how to deal with it at a partition level.

On Fri, Sep 8, 2017 at 8:29 PM, Dan Dong <[hidden email]> wrote:
Hi,Alonso.
  Thanks! I've read about this but did not quite understand it. To pick out the topic name of a kafka message seems a simple task but the example code looks so complicated with redundent info. Why do we need offsetRanges here and do we have a easy way to achieve this?

Cheers,
Dan


2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman <[hidden email]>:
Hi, reading the official doc, i think you can do it this way:

import org.apache.spark.streaming.kafka._

val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

 // Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   
 }

2017-09-06 14:38 GMT+02:00 Dan Dong <[hidden email]>:
Hi, All,
  I have one issue here about how to process multiple Kafka topics in a Spark 2.* program. My question is: How to get the topic name from a message received from Kafka? E.g:

......
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
......

Kafka send the messages in multiple topics through console producer for example. But when Spark receive the message, how it will know which topic is this piece of message coming from? Thanks a lot for any of your helps!

Cheers,
Dan



--


Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka topics processing in Spark 2.2

kant kodali
@Dan shouldn't you be using Dataset/Dataframes ? I heard it is recommended to use Dataset and Dataframes than using Dstreams since Dstreams is in maintenance mode.

On Mon, Sep 11, 2017 at 7:41 AM, Cody Koeninger <[hidden email]> wrote:
If you want an "easy" but not particularly performant way to do it, each org.apache.kafka.clients.consumer.ConsumerRecord has a topic.  

The topic is going to be the same for the entire partition as long as you haven't shuffled, hence the examples on how to deal with it at a partition level.

On Fri, Sep 8, 2017 at 8:29 PM, Dan Dong <[hidden email]> wrote:
Hi,Alonso.
  Thanks! I've read about this but did not quite understand it. To pick out the topic name of a kafka message seems a simple task but the example code looks so complicated with redundent info. Why do we need offsetRanges here and do we have a easy way to achieve this?

Cheers,
Dan


2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman <[hidden email]>:
Hi, reading the official doc, i think you can do it this way:

import org.apache.spark.streaming.kafka._

val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

 // Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   
 }

2017-09-06 14:38 GMT+02:00 Dan Dong <[hidden email]>:
Hi, All,
  I have one issue here about how to process multiple Kafka topics in a Spark 2.* program. My question is: How to get the topic name from a message received from Kafka? E.g:

......
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
......

Kafka send the messages in multiple topics through console producer for example. But when Spark receive the message, how it will know which topic is this piece of message coming from? Thanks a lot for any of your helps!

Cheers,
Dan



--