What is the best way to consume parallely from multiple topics in Spark Stream with Kafka

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

What is the best way to consume parallely from multiple topics in Spark Stream with Kafka

sd.hrishi
Hi 

My spark stream job consumes from multiple Kafka topics. How can I process parallely? Should I try for spark.streaming.concurrentJobs, but it has some adverse effects as mentioned by the creator. Is it still valid with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode, will it help in this scenario. I am not getting any valid links around this.

Regards
Hrishi 

Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka

maasg
Hi Hrishi,

When using the Direct Kafka stream approach, processing tasks will be distributed to the cluster. 
The level of parallelism is dependent on how many partitions the consumed topics have.
Why do you think that the processing is not happening in parallel?

I would advise you to get the base scenario working before looking into advanced features like `concurrentJobs` or a particular scheduler.

kind regards, Gerard.

On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <[hidden email]> wrote:
Hi 

My spark stream job consumes from multiple Kafka topics. How can I process parallely? Should I try for spark.streaming.concurrentJobs, but it has some adverse effects as mentioned by the creator. Is it still valid with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode, will it help in this scenario. I am not getting any valid links around this.

Regards
Hrishi 

Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka

sd.hrishi
HI Gerard,

First of all, apologies for late reply. 

You are right, tasks are distributed to the cluster and parallelism is achieve through Kafka partitions.  But my uses case is different, in one streaming context, I am consuming events from 6 different topics and for each topic  6 different actions are being performed.

So total Spark jobs = 6 streams X 6 actions = 36 jobs (plus some Kafka commits which happen on drivers) for a batch.

These 36 jobs executed sequentially, because at point of time only one job is active (see below image). And delay in job leads to delay in complete batch. 

image.png

But one Job (which is a corresponding a topic and one action), is executed parallel based on number of partition that topic has. 
image.png

What is my requirement: 
  • I want run these jobs in parallel in some controlled manner, like I want to run jobs of different topics in parallal but within a topic job sequentially. We tried with spark.scheduler.mode: FAIR and submitted jobs in different pool but didn't get any benefit. 
But when I tried with spark.streaming.concurrentJobs = 4, then 4 jobs are actively running from different batches ( batch time 19:15:55 and batch time 19:16:00. ), which could be problem with committing offsets. 

image.png


Regards
Hrishi


On Thu, Mar 5, 2020 at 12:49 AM Gerard Maas <[hidden email]> wrote:
Hi Hrishi,

When using the Direct Kafka stream approach, processing tasks will be distributed to the cluster. 
The level of parallelism is dependent on how many partitions the consumed topics have.
Why do you think that the processing is not happening in parallel?

I would advise you to get the base scenario working before looking into advanced features like `concurrentJobs` or a particular scheduler.

kind regards, Gerard.

On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <[hidden email]> wrote:
Hi 

My spark stream job consumes from multiple Kafka topics. How can I process parallely? Should I try for spark.streaming.concurrentJobs, but it has some adverse effects as mentioned by the creator. Is it still valid with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode, will it help in this scenario. I am not getting any valid links around this.

Regards
Hrishi 

Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka

maasg
Hrishi,

Could you share a simplified version of the code you are running?  A job is made out of tasks. 
While jobs are indeed sequential, tasks will be executed in parallel. 
In the Spark UI, you can see that in the "Event Timeline" visualization. 

If you could share an example of your code that illustrates what you want to achieve, I could have a look at it.

kr, Gerard.

On Wed, Mar 18, 2020 at 8:24 AM Hrishikesh Mishra <[hidden email]> wrote:
HI Gerard,

First of all, apologies for late reply. 

You are right, tasks are distributed to the cluster and parallelism is achieve through Kafka partitions.  But my uses case is different, in one streaming context, I am consuming events from 6 different topics and for each topic  6 different actions are being performed.

So total Spark jobs = 6 streams X 6 actions = 36 jobs (plus some Kafka commits which happen on drivers) for a batch.

These 36 jobs executed sequentially, because at point of time only one job is active (see below image). And delay in job leads to delay in complete batch. 

image.png

But one Job (which is a corresponding a topic and one action), is executed parallel based on number of partition that topic has. 
image.png

What is my requirement: 
  • I want run these jobs in parallel in some controlled manner, like I want to run jobs of different topics in parallal but within a topic job sequentially. We tried with spark.scheduler.mode: FAIR and submitted jobs in different pool but didn't get any benefit. 
But when I tried with spark.streaming.concurrentJobs = 4, then 4 jobs are actively running from different batches ( batch time 19:15:55 and batch time 19:16:00. ), which could be problem with committing offsets. 

image.png


Regards
Hrishi


On Thu, Mar 5, 2020 at 12:49 AM Gerard Maas <[hidden email]> wrote:
Hi Hrishi,

When using the Direct Kafka stream approach, processing tasks will be distributed to the cluster. 
The level of parallelism is dependent on how many partitions the consumed topics have.
Why do you think that the processing is not happening in parallel?

I would advise you to get the base scenario working before looking into advanced features like `concurrentJobs` or a particular scheduler.

kind regards, Gerard.

On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <[hidden email]> wrote:
Hi 

My spark stream job consumes from multiple Kafka topics. How can I process parallely? Should I try for spark.streaming.concurrentJobs, but it has some adverse effects as mentioned by the creator. Is it still valid with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode, will it help in this scenario. I am not getting any valid links around this.

Regards
Hrishi 

Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka

sd.hrishi
This is simplified version of the code.
Here
Number of topics #. 2 
Actions per topics# 2 

Total Jobs = 2 X 2 = 4

And committing offset to Kafka for both topics. 



import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;

import java.util.HashMap;


public class Main {

public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("StreamingTest");

conf.set("spark.shuffle.service.enabled", "true");
conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
conf.set("spark.streaming.backpressure.enabled", "true");
conf.set("spark.streaming.concurrentJobs", "1");
conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
conf.set("spark.executor.extraJavaOptions", "-Dlog4j.configuration=file:/tmp/log4j-executor.properties");
conf.set("spark.streaming.backpressure.pid.minRate", "1500");

JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
ssc.sparkContext().setLogLevel("DEBUG");

/** Kafka Stream 1 **/
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "topic1");

/** Action 1 on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {

try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 1 -> Stream 1");
System.out.println(e);
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));



/** Action 2 on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 2 -> Stream 1");
System.out.println(e);
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));

/** Commit on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
});

/** Kafka Stream 2 **/
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream2 = createKafkaStream(ssc, "topic2");

/** Action 1 on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 1 -> Stream 2");
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));

/** Action 2 on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 2 -> Stream 2");
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));


/** Commit on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);

((CanCommitOffsets) kafkaStream2.inputDStream()).commitAsync(beginOffsets);
});

ssc.start();
ssc.awaitTermination();
}

public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
HashMap<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<borker-ips>");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic + "hrishi-testing-nfr-21");
kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 90000);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
}
}




On Wed, Mar 18, 2020 at 4:45 PM Gerard Maas <[hidden email]> wrote:
Hrishi,

Could you share a simplified version of the code you are running?  A job is made out of tasks. 
While jobs are indeed sequential, tasks will be executed in parallel. 
In the Spark UI, you can see that in the "Event Timeline" visualization. 

If you could share an example of your code that illustrates what you want to achieve, I could have a look at it.

kr, Gerard.

On Wed, Mar 18, 2020 at 8:24 AM Hrishikesh Mishra <[hidden email]> wrote:
HI Gerard,

First of all, apologies for late reply. 

You are right, tasks are distributed to the cluster and parallelism is achieve through Kafka partitions.  But my uses case is different, in one streaming context, I am consuming events from 6 different topics and for each topic  6 different actions are being performed.

So total Spark jobs = 6 streams X 6 actions = 36 jobs (plus some Kafka commits which happen on drivers) for a batch.

These 36 jobs executed sequentially, because at point of time only one job is active (see below image). And delay in job leads to delay in complete batch. 

image.png

But one Job (which is a corresponding a topic and one action), is executed parallel based on number of partition that topic has. 
image.png

What is my requirement: 
  • I want run these jobs in parallel in some controlled manner, like I want to run jobs of different topics in parallal but within a topic job sequentially. We tried with spark.scheduler.mode: FAIR and submitted jobs in different pool but didn't get any benefit. 
But when I tried with spark.streaming.concurrentJobs = 4, then 4 jobs are actively running from different batches ( batch time 19:15:55 and batch time 19:16:00. ), which could be problem with committing offsets. 

image.png


Regards
Hrishi


On Thu, Mar 5, 2020 at 12:49 AM Gerard Maas <[hidden email]> wrote:
Hi Hrishi,

When using the Direct Kafka stream approach, processing tasks will be distributed to the cluster. 
The level of parallelism is dependent on how many partitions the consumed topics have.
Why do you think that the processing is not happening in parallel?

I would advise you to get the base scenario working before looking into advanced features like `concurrentJobs` or a particular scheduler.

kind regards, Gerard.

On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <[hidden email]> wrote:
Hi 

My spark stream job consumes from multiple Kafka topics. How can I process parallely? Should I try for spark.streaming.concurrentJobs, but it has some adverse effects as mentioned by the creator. Is it still valid with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode, will it help in this scenario. I am not getting any valid links around this.

Regards
Hrishi 

Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka

Tathagata Das
Why are you not using Structured Streaming? Structured Streaming kafka support directly support multiple topics. 

val df = spark.readStream.format("kafka").option("subscribe", "topic1,topic2").load()


On Wed, Mar 18, 2020 at 7:49 AM Hrishikesh Mishra <[hidden email]> wrote:
This is simplified version of the code.
Here
Number of topics #. 2 
Actions per topics# 2 

Total Jobs = 2 X 2 = 4

And committing offset to Kafka for both topics. 



import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;

import java.util.HashMap;


public class Main {

public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("StreamingTest");

conf.set("spark.shuffle.service.enabled", "true");
conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
conf.set("spark.streaming.backpressure.enabled", "true");
conf.set("spark.streaming.concurrentJobs", "1");
conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
conf.set("spark.executor.extraJavaOptions", "-Dlog4j.configuration=file:/tmp/log4j-executor.properties");
conf.set("spark.streaming.backpressure.pid.minRate", "1500");

JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
ssc.sparkContext().setLogLevel("DEBUG");

/** Kafka Stream 1 **/
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "topic1");

/** Action 1 on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {

try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 1 -> Stream 1");
System.out.println(e);
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));



/** Action 2 on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 2 -> Stream 1");
System.out.println(e);
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));

/** Commit on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
});

/** Kafka Stream 2 **/
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream2 = createKafkaStream(ssc, "topic2");

/** Action 1 on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 1 -> Stream 2");
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));

/** Action 2 on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 2 -> Stream 2");
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));


/** Commit on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);

((CanCommitOffsets) kafkaStream2.inputDStream()).commitAsync(beginOffsets);
});

ssc.start();
ssc.awaitTermination();
}

public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
HashMap<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<borker-ips>");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic + "hrishi-testing-nfr-21");
kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 90000);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
}
}




On Wed, Mar 18, 2020 at 4:45 PM Gerard Maas <[hidden email]> wrote:
Hrishi,

Could you share a simplified version of the code you are running?  A job is made out of tasks. 
While jobs are indeed sequential, tasks will be executed in parallel. 
In the Spark UI, you can see that in the "Event Timeline" visualization. 

If you could share an example of your code that illustrates what you want to achieve, I could have a look at it.

kr, Gerard.

On Wed, Mar 18, 2020 at 8:24 AM Hrishikesh Mishra <[hidden email]> wrote:
HI Gerard,

First of all, apologies for late reply. 

You are right, tasks are distributed to the cluster and parallelism is achieve through Kafka partitions.  But my uses case is different, in one streaming context, I am consuming events from 6 different topics and for each topic  6 different actions are being performed.

So total Spark jobs = 6 streams X 6 actions = 36 jobs (plus some Kafka commits which happen on drivers) for a batch.

These 36 jobs executed sequentially, because at point of time only one job is active (see below image). And delay in job leads to delay in complete batch. 

image.png

But one Job (which is a corresponding a topic and one action), is executed parallel based on number of partition that topic has. 
image.png

What is my requirement: 
  • I want run these jobs in parallel in some controlled manner, like I want to run jobs of different topics in parallal but within a topic job sequentially. We tried with spark.scheduler.mode: FAIR and submitted jobs in different pool but didn't get any benefit. 
But when I tried with spark.streaming.concurrentJobs = 4, then 4 jobs are actively running from different batches ( batch time 19:15:55 and batch time 19:16:00. ), which could be problem with committing offsets. 

image.png


Regards
Hrishi


On Thu, Mar 5, 2020 at 12:49 AM Gerard Maas <[hidden email]> wrote:
Hi Hrishi,

When using the Direct Kafka stream approach, processing tasks will be distributed to the cluster. 
The level of parallelism is dependent on how many partitions the consumed topics have.
Why do you think that the processing is not happening in parallel?

I would advise you to get the base scenario working before looking into advanced features like `concurrentJobs` or a particular scheduler.

kind regards, Gerard.

On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <[hidden email]> wrote:
Hi 

My spark stream job consumes from multiple Kafka topics. How can I process parallely? Should I try for spark.streaming.concurrentJobs, but it has some adverse effects as mentioned by the creator. Is it still valid with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode, will it help in this scenario. I am not getting any valid links around this.

Regards
Hrishi 

Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka

sd.hrishi


On Thu, Mar 19, 2020 at 12:31 AM Tathagata Das <[hidden email]> wrote:
Why are you not using Structured Streaming? Structured Streaming kafka support directly support multiple topics. 

val df = spark.readStream.format("kafka").option("subscribe", "topic1,topic2").load()


On Wed, Mar 18, 2020 at 7:49 AM Hrishikesh Mishra <[hidden email]> wrote:
This is simplified version of the code.
Here
Number of topics #. 2 
Actions per topics# 2 

Total Jobs = 2 X 2 = 4

And committing offset to Kafka for both topics. 



import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;

import java.util.HashMap;


public class Main {

public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("StreamingTest");

conf.set("spark.shuffle.service.enabled", "true");
conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
conf.set("spark.streaming.backpressure.enabled", "true");
conf.set("spark.streaming.concurrentJobs", "1");
conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
conf.set("spark.executor.extraJavaOptions", "-Dlog4j.configuration=file:/tmp/log4j-executor.properties");
conf.set("spark.streaming.backpressure.pid.minRate", "1500");

JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
ssc.sparkContext().setLogLevel("DEBUG");

/** Kafka Stream 1 **/
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "topic1");

/** Action 1 on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {

try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 1 -> Stream 1");
System.out.println(e);
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));



/** Action 2 on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 2 -> Stream 1");
System.out.println(e);
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));

/** Commit on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
});

/** Kafka Stream 2 **/
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream2 = createKafkaStream(ssc, "topic2");

/** Action 1 on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 1 -> Stream 2");
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));

/** Action 2 on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 2 -> Stream 2");
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));


/** Commit on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);

((CanCommitOffsets) kafkaStream2.inputDStream()).commitAsync(beginOffsets);
});

ssc.start();
ssc.awaitTermination();
}

public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
HashMap<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<borker-ips>");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic + "hrishi-testing-nfr-21");
kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 90000);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
}
}




On Wed, Mar 18, 2020 at 4:45 PM Gerard Maas <[hidden email]> wrote:
Hrishi,

Could you share a simplified version of the code you are running?  A job is made out of tasks. 
While jobs are indeed sequential, tasks will be executed in parallel. 
In the Spark UI, you can see that in the "Event Timeline" visualization. 

If you could share an example of your code that illustrates what you want to achieve, I could have a look at it.

kr, Gerard.

On Wed, Mar 18, 2020 at 8:24 AM Hrishikesh Mishra <[hidden email]> wrote:
HI Gerard,

First of all, apologies for late reply. 

You are right, tasks are distributed to the cluster and parallelism is achieve through Kafka partitions.  But my uses case is different, in one streaming context, I am consuming events from 6 different topics and for each topic  6 different actions are being performed.

So total Spark jobs = 6 streams X 6 actions = 36 jobs (plus some Kafka commits which happen on drivers) for a batch.

These 36 jobs executed sequentially, because at point of time only one job is active (see below image). And delay in job leads to delay in complete batch. 

image.png

But one Job (which is a corresponding a topic and one action), is executed parallel based on number of partition that topic has. 
image.png

What is my requirement: 
  • I want run these jobs in parallel in some controlled manner, like I want to run jobs of different topics in parallal but within a topic job sequentially. We tried with spark.scheduler.mode: FAIR and submitted jobs in different pool but didn't get any benefit. 
But when I tried with spark.streaming.concurrentJobs = 4, then 4 jobs are actively running from different batches ( batch time 19:15:55 and batch time 19:16:00. ), which could be problem with committing offsets. 

image.png


Regards
Hrishi


On Thu, Mar 5, 2020 at 12:49 AM Gerard Maas <[hidden email]> wrote:
Hi Hrishi,

When using the Direct Kafka stream approach, processing tasks will be distributed to the cluster. 
The level of parallelism is dependent on how many partitions the consumed topics have.
Why do you think that the processing is not happening in parallel?

I would advise you to get the base scenario working before looking into advanced features like `concurrentJobs` or a particular scheduler.

kind regards, Gerard.

On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <[hidden email]> wrote:
Hi 

My spark stream job consumes from multiple Kafka topics. How can I process parallely? Should I try for spark.streaming.concurrentJobs, but it has some adverse effects as mentioned by the creator. Is it still valid with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode, will it help in this scenario. I am not getting any valid links around this.

Regards
Hrishi 

Reply | Threaded
Open this post in threaded view
|

回复: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka

张 帅
In reply to this post by Tathagata Das
Actually, I am also trying the CurrrentJobs feature in Spark Streaming.

We have a business scenario where we need to write data from kafka topics to a hive tables.

One topic corresponds to one table.

TopicA -> TableA
TopicB -> TableB
...

But there are already dozens of topics.

Currently, I have started multiple spark apps to process, each app corresponds to a topic -> table.

I tried using a big kafka stream (N topics). 

However, the data in each topic needs to be filtered out and then write to correspond table. The performance is very low.

Can you give me some advice? 

Thanks


发件人: Tathagata Das <[hidden email]>
发送时间: 2020年3月19日 3:01
收件人: Hrishikesh Mishra <[hidden email]>
抄送: Gerard Maas <[hidden email]>; spark users <[hidden email]>
主题: Re: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka
 
Why are you not using Structured Streaming? Structured Streaming kafka support directly support multiple topics. 

val df = spark.readStream.format("kafka").option("subscribe", "topic1,topic2").load()


On Wed, Mar 18, 2020 at 7:49 AM Hrishikesh Mishra <[hidden email]> wrote:
This is simplified version of the code.
Here
Number of topics #. 2 
Actions per topics# 2 

Total Jobs = 2 X 2 = 4

And committing offset to Kafka for both topics. 



import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;

import java.util.HashMap;


public class Main {

public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("StreamingTest");

conf.set("spark.shuffle.service.enabled", "true");
conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
conf.set("spark.streaming.backpressure.enabled", "true");
conf.set("spark.streaming.concurrentJobs", "1");
conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
conf.set("spark.executor.extraJavaOptions", "-Dlog4j.configuration=file:/tmp/log4j-executor.properties");
conf.set("spark.streaming.backpressure.pid.minRate", "1500");

JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
ssc.sparkContext().setLogLevel("DEBUG");

/** Kafka Stream 1 **/
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "topic1");

/** Action 1 on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {

try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 1 -> Stream 1");
System.out.println(e);
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));



/** Action 2 on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 2 -> Stream 1");
System.out.println(e);
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));

/** Commit on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
});

/** Kafka Stream 2 **/
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream2 = createKafkaStream(ssc, "topic2");

/** Action 1 on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 1 -> Stream 2");
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));

/** Action 2 on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 2 -> Stream 2");
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));


/** Commit on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);

((CanCommitOffsets) kafkaStream2.inputDStream()).commitAsync(beginOffsets);
});

ssc.start();
ssc.awaitTermination();
}

public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
HashMap<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<borker-ips>");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic + "hrishi-testing-nfr-21");
kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 90000);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
}
}




On Wed, Mar 18, 2020 at 4:45 PM Gerard Maas <[hidden email]> wrote:
Hrishi,

Could you share a simplified version of the code you are running?  A job is made out of tasks. 
While jobs are indeed sequential, tasks will be executed in parallel. 
In the Spark UI, you can see that in the "Event Timeline" visualization. 

If you could share an example of your code that illustrates what you want to achieve, I could have a look at it.

kr, Gerard.

On Wed, Mar 18, 2020 at 8:24 AM Hrishikesh Mishra <[hidden email]> wrote:
HI Gerard,

First of all, apologies for late reply. 

You are right, tasks are distributed to the cluster and parallelism is achieve through Kafka partitions.  But my uses case is different, in one streaming context, I am consuming events from 6 different topics and for each topic  6 different actions are being performed.

So total Spark jobs = 6 streams X 6 actions = 36 jobs (plus some Kafka commits which happen on drivers) for a batch.

These 36 jobs executed sequentially, because at point of time only one job is active (see below image). And delay in job leads to delay in complete batch. 

image.png

But one Job (which is a corresponding a topic and one action), is executed parallel based on number of partition that topic has. 
image.png

What is my requirement: 
  • I want run these jobs in parallel in some controlled manner, like I want to run jobs of different topics in parallal but within a topic job sequentially. We tried with spark.scheduler.mode: FAIR and submitted jobs in different pool but didn't get any benefit. 
But when I tried with spark.streaming.concurrentJobs = 4, then 4 jobs are actively running from different batches ( batch time 19:15:55 and batch time 19:16:00. ), which could be problem with committing offsets. 

image.png


Regards
Hrishi


On Thu, Mar 5, 2020 at 12:49 AM Gerard Maas <[hidden email]> wrote:
Hi Hrishi,

When using the Direct Kafka stream approach, processing tasks will be distributed to the cluster. 
The level of parallelism is dependent on how many partitions the consumed topics have.
Why do you think that the processing is not happening in parallel?

I would advise you to get the base scenario working before looking into advanced features like `concurrentJobs` or a particular scheduler.

kind regards, Gerard.

On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <[hidden email]> wrote:
Hi 

My spark stream job consumes from multiple Kafka topics. How can I process parallely? Should I try for spark.streaming.concurrentJobs, but it has some adverse effects as mentioned by the creator. Is it still valid with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode, will it help in this scenario. I am not getting any valid links around this.

Regards
Hrishi