Subscribe Multiple Topics Structured Streaming

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Subscribe Multiple Topics Structured Streaming

sivaprakash
Hi

I have integrated Spark Streaming with Kafka in which Im listening 2 topics

def main(args: Array[String]): Unit = {

    val schema = StructType(
      List(
        StructField("gatewayId", StringType, true),
        StructField("userId", StringType, true)
      )
    )

    val spark = SparkSession
      .builder
      .master("local[4]")
      .appName("DeviceAutomation")
      .getOrCreate()

    val dfStatus = spark.readStream.
      format("kafka").
      option("subscribe", "utility-status, utility-critical").
      option("kafka.bootstrap.servers", "localhost:9092").
      option("startingOffsets", "earliest")
      .load()
   
     
      }
     
Since I have few more topics to be listed and perform different operations I
would like to move each topics into separate case class for better clarity.
Is it possible?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Subscribe Multiple Topics Structured Streaming

naresh Goud
You can have below statement for multiple topics 

val dfStatus = spark.readStream.
      format("kafka").
      option("subscribe", "utility-status, utility-critical").
      option("kafka.bootstrap.servers", "localhost:9092").
      option("startingOffsets", "earliest")
      .load()





On Mon, Sep 17, 2018 at 3:28 AM sivaprakash <[hidden email]> wrote:
Hi

I have integrated Spark Streaming with Kafka in which Im listening 2 topics

def main(args: Array[String]): Unit = {

    val schema = StructType(
      List(
        StructField("gatewayId", StringType, true),
        StructField("userId", StringType, true)
      )
    )

    val spark = SparkSession
      .builder
      .master("local[4]")
      .appName("DeviceAutomation")
      .getOrCreate()

    val dfStatus = spark.readStream.
      format("kafka").
      option("subscribe", "utility-status, utility-critical").
      option("kafka.bootstrap.servers", "localhost:9092").
      option("startingOffsets", "earliest")
      .load()


      }

Since I have few more topics to be listed and perform different operations I
would like to move each topics into separate case class for better clarity.
Is it possible?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

--
Reply | Threaded
Open this post in threaded view
|

Re: Subscribe Multiple Topics Structured Streaming

sivaprakash
I would like to know how to create stream and sink operations outside "main" method - just like another class which I can invoke from main. So that I can have different implementations for each topic which I subscribed in a specific class file. Is it a good practice or always the whole implementations should go inside "main" method?

On Mon, Sep 17, 2018 at 11:35 PM naresh Goud <[hidden email]> wrote:
You can have below statement for multiple topics 

val dfStatus = spark.readStream.
      format("kafka").
      option("subscribe", "utility-status, utility-critical").
      option("kafka.bootstrap.servers", "localhost:9092").
      option("startingOffsets", "earliest")
      .load()





On Mon, Sep 17, 2018 at 3:28 AM sivaprakash <[hidden email]> wrote:
Hi

I have integrated Spark Streaming with Kafka in which Im listening 2 topics

def main(args: Array[String]): Unit = {

    val schema = StructType(
      List(
        StructField("gatewayId", StringType, true),
        StructField("userId", StringType, true)
      )
    )

    val spark = SparkSession
      .builder
      .master("local[4]")
      .appName("DeviceAutomation")
      .getOrCreate()

    val dfStatus = spark.readStream.
      format("kafka").
      option("subscribe", "utility-status, utility-critical").
      option("kafka.bootstrap.servers", "localhost:9092").
      option("startingOffsets", "earliest")
      .load()


      }

Since I have few more topics to be listed and perform different operations I
would like to move each topics into separate case class for better clarity.
Is it possible?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

--


--
- Prakash.