[spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

Srinivas V
Hello,
 In Structured Streaming, is it possible to have one spark application with one query to consume topics from multiple kafka clusters?

I am trying to consume two topics each from different Kafka Cluster, but it gives one of the topics as an unknown topic and the job keeps running without completing in Spark UI.

Is it not allowed in Spark 2.4.5?

Regards
Srini



Reply | Threaded
Open this post in threaded view
|

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

German SM
Hello,

I've never tried that, this doesn't work?

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1")
val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:port")
  .option("subscribe", "topic2")

On Tue, 9 Jun 2020 at 18:10, Srinivas V <[hidden email]> wrote:
Hello,
 In Structured Streaming, is it possible to have one spark application with one query to consume topics from multiple kafka clusters?

I am trying to consume two topics each from different Kafka Cluster, but it gives one of the topics as an unknown topic and the job keeps running without completing in Spark UI.

Is it not allowed in Spark 2.4.5?

Regards
Srini



Reply | Threaded
Open this post in threaded view
|

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

Srinivas V
Thanks for the quick reply. This may work but I have like 5 topics to listen to right now, I am trying to keep all topics in an array in a properties file and trying to read all at once. This way it is dynamic and you have one code block like below and you may add or delete topics from the config file without changing code. If someone confirms that it does not work, I would have to do something like you have provided.
val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port,cluster2_host:port")
.option("subscribe", "topic1, topic2,topic3,topic4,topic5")
Reply | Threaded
Open this post in threaded view
|

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

maasg
Hi Srinivas, 

Reading from different brokers is possible but you need to connect to each Kafka cluster separately.
Trying to mix connections to two different Kafka clusters in one subscriber is not supported. (I'm sure that it would give all kind of weird errors)
The  "kafka.bootstrap.servers" option is there to indicate the potential many brokers of the *same* Kafka cluster.

The way to address this is following the suggestion of German to create a subscriptions for each Kafka cluster you are talking to.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1, topic2")
 .load()

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
  .option("subscribe", "topic3, topicn, topicn+1,")
 .load()

After acquiring the DataFrame, you can union them and treat all the data with a single process.

val unifiedData = df_cluster1.union(df_cluster2)
// apply further transformations on `unifiedData` 

kr, Gerard.


:



On Tue, Jun 9, 2020 at 6:30 PM Srinivas V <[hidden email]> wrote:
Thanks for the quick reply. This may work but I have like 5 topics to listen to right now, I am trying to keep all topics in an array in a properties file and trying to read all at once. This way it is dynamic and you have one code block like below and you may add or delete topics from the config file without changing code. If someone confirms that it does not work, I would have to do something like you have provided.
val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port,cluster2_host:port")
.option("subscribe", "topic1, topic2,topic3,topic4,topic5")
Reply | Threaded
Open this post in threaded view
|

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

Srinivas V
ok, thanks for confirming, I will do it this way.

Regards
Srini

On Tue, Jun 9, 2020 at 11:31 PM Gerard Maas <[hidden email]> wrote:
Hi Srinivas, 

Reading from different brokers is possible but you need to connect to each Kafka cluster separately.
Trying to mix connections to two different Kafka clusters in one subscriber is not supported. (I'm sure that it would give all kind of weird errors)
The  "kafka.bootstrap.servers" option is there to indicate the potential many brokers of the *same* Kafka cluster.

The way to address this is following the suggestion of German to create a subscriptions for each Kafka cluster you are talking to.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1, topic2")
 .load()

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
  .option("subscribe", "topic3, topicn, topicn+1,")
 .load()

After acquiring the DataFrame, you can union them and treat all the data with a single process.

val unifiedData = df_cluster1.union(df_cluster2)
// apply further transformations on `unifiedData` 

kr, Gerard.


:



On Tue, Jun 9, 2020 at 6:30 PM Srinivas V <[hidden email]> wrote:
Thanks for the quick reply. This may work but I have like 5 topics to listen to right now, I am trying to keep all topics in an array in a properties file and trying to read all at once. This way it is dynamic and you have one code block like below and you may add or delete topics from the config file without changing code. If someone confirms that it does not work, I would have to do something like you have provided.
val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port,cluster2_host:port")
.option("subscribe", "topic1, topic2,topic3,topic4,topic5")