In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

sd.hrishi

I have created one sample Direct Kafka Stream in Spark. Kafka has 30 partitions of given topic. But all consumers are executing from same executor machine. 

Kafka Manager screenshot.

Screenshot 2020-02-28 at 7.06.49 PM 2.png

As per my understanding in direct Kafka Stream, Drive gives the offsets to executors and polls with this.

Kafka Stream

        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));

Spark Version: 2.4

Spark Config

        SparkConf conf = new SparkConf().setAppName("StreamingTest");
        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");


Regards,
Hrishi
Reply | Threaded
Open this post in threaded view
|

回复: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

张 帅
Hi Hrishi.

I guess your code is similar to the following。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

The action of submitting the offset occurs on the driver side。

Spark calculates the records that should be consumed by each topic partition in the current batch on the driver side, and then the tasks on each executor actually consume the corresponding partitions.

You can check if the ip is the node address where the driver is located。


发件人: Hrishikesh Mishra <[hidden email]>
发送时间: 2020年2月29日 12:05
收件人: [hidden email] <[hidden email]>
主题: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors
 

I have created one sample Direct Kafka Stream in Spark. Kafka has 30 partitions of given topic. But all consumers are executing from same executor machine. 

Kafka Manager screenshot.

Screenshot 2020-02-28 at 7.06.49 PM 2.png

As per my understanding in direct Kafka Stream, Drive gives the offsets to executors and polls with this.

Kafka Stream

        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));

Spark Version: 2.4

Spark Config

        SparkConf conf = new SparkConf().setAppName("StreamingTest");
        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");


Regards,
Hrishi
Reply | Threaded
Open this post in threaded view
|

Re: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

sd.hrishi
Thanks Zhang. 

You are right. The driver is committing on Kafka that's why single consumer IP is coming on Kafka manager. Actually, in one spark context we are starting multiple Kafka steam, but Driver is executing them sequentially, not in parallel. While debugging this, I found this issue and suspected that everything is happening in the driver. But now it clear, even I enabled debug log on executors where KafkaRDD was fetching events from Kafka for given offsets. 


Second thing, where can I get some insight that why all different Kafka streams of a Spark context are being executed sequentially. I found spark.streaming.concurrentJob config to run job parallel but I read on stack overflow that it has some adverse effect. 






On Tue, Mar 3, 2020 at 8:18 AM Zhang Victor <[hidden email]> wrote:
Hi Hrishi.

I guess your code is similar to the following。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

The action of submitting the offset occurs on the driver side。

Spark calculates the records that should be consumed by each topic partition in the current batch on the driver side, and then the tasks on each executor actually consume the corresponding partitions.

You can check if the ip is the node address where the driver is located。


发件人: Hrishikesh Mishra <[hidden email]>
发送时间: 2020年2月29日 12:05
收件人: [hidden email] <[hidden email]>
主题: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors
 

I have created one sample Direct Kafka Stream in Spark. Kafka has 30 partitions of given topic. But all consumers are executing from same executor machine. 

Kafka Manager screenshot.

Screenshot 2020-02-28 at 7.06.49 PM 2.png

As per my understanding in direct Kafka Stream, Drive gives the offsets to executors and polls with this.

Kafka Stream

        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));

Spark Version: 2.4

Spark Config

        SparkConf conf = new SparkConf().setAppName("StreamingTest");
        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");


Regards,
Hrishi
Reply | Threaded
Open this post in threaded view
|

回复: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

张 帅
Hi Hrishi.

I tested using multiple Kafka Streams. 

When the number of executor * cores is greater than the number of topic partitions and spark.streaming.concurrentJob> 1, it is possible to execute jobs concurrently.

For example, stream1 -> topicA with 1 partitions and stream2 -> topicB with 2 partitions.

And set spark.streaming.concurrentJob=2.






发件人: Hrishikesh Mishra <[hidden email]>
发送时间: 2020年3月4日 0:35
收件人: Zhang Victor <[hidden email]>
抄送: [hidden email] <[hidden email]>
主题: Re: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors
 
Thanks Zhang. 

You are right. The driver is committing on Kafka that's why single consumer IP is coming on Kafka manager. Actually, in one spark context we are starting multiple Kafka steam, but Driver is executing them sequentially, not in parallel. While debugging this, I found this issue and suspected that everything is happening in the driver. But now it clear, even I enabled debug log on executors where KafkaRDD was fetching events from Kafka for given offsets. 


Second thing, where can I get some insight that why all different Kafka streams of a Spark context are being executed sequentially. I found spark.streaming.concurrentJob config to run job parallel but I read on stack overflow that it has some adverse effect. 






On Tue, Mar 3, 2020 at 8:18 AM Zhang Victor <[hidden email]> wrote:
Hi Hrishi.

I guess your code is similar to the following。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

The action of submitting the offset occurs on the driver side。

Spark calculates the records that should be consumed by each topic partition in the current batch on the driver side, and then the tasks on each executor actually consume the corresponding partitions.

You can check if the ip is the node address where the driver is located。


发件人: Hrishikesh Mishra <[hidden email]>
发送时间: 2020年2月29日 12:05
收件人: [hidden email] <[hidden email]>
主题: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors
 

I have created one sample Direct Kafka Stream in Spark. Kafka has 30 partitions of given topic. But all consumers are executing from same executor machine. 

Kafka Manager screenshot.

Screenshot 2020-02-28 at 7.06.49 PM 2.png

As per my understanding in direct Kafka Stream, Drive gives the offsets to executors and polls with this.

Kafka Stream

        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));

Spark Version: 2.4

Spark Config

        SparkConf conf = new SparkConf().setAppName("StreamingTest");
        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");


Regards,
Hrishi
Reply | Threaded
Open this post in threaded view
|

Re: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

sd.hrishi
Thanks Zhang,

I know spark.streaming.concurrentJobs provides parallelism but It has adverse affect as mentioned here https://stackoverflow.com/a/23533736. I don't know is still valid with Spark 2.4. 




On Fri, Mar 6, 2020 at 7:59 AM Zhang Victor <[hidden email]> wrote:
Hi Hrishi.

I tested using multiple Kafka Streams. 

When the number of executor * cores is greater than the number of topic partitions and spark.streaming.concurrentJob> 1, it is possible to execute jobs concurrently.

For example, stream1 -> topicA with 1 partitions and stream2 -> topicB with 2 partitions.

And set spark.streaming.concurrentJob=2.






发件人: Hrishikesh Mishra <[hidden email]>
发送时间: 2020年3月4日 0:35
收件人: Zhang Victor <[hidden email]>
抄送: [hidden email] <[hidden email]>
主题: Re: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors
 
Thanks Zhang. 

You are right. The driver is committing on Kafka that's why single consumer IP is coming on Kafka manager. Actually, in one spark context we are starting multiple Kafka steam, but Driver is executing them sequentially, not in parallel. While debugging this, I found this issue and suspected that everything is happening in the driver. But now it clear, even I enabled debug log on executors where KafkaRDD was fetching events from Kafka for given offsets. 


Second thing, where can I get some insight that why all different Kafka streams of a Spark context are being executed sequentially. I found spark.streaming.concurrentJob config to run job parallel but I read on stack overflow that it has some adverse effect. 






On Tue, Mar 3, 2020 at 8:18 AM Zhang Victor <[hidden email]> wrote:
Hi Hrishi.

I guess your code is similar to the following。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

The action of submitting the offset occurs on the driver side。

Spark calculates the records that should be consumed by each topic partition in the current batch on the driver side, and then the tasks on each executor actually consume the corresponding partitions.

You can check if the ip is the node address where the driver is located。


发件人: Hrishikesh Mishra <[hidden email]>
发送时间: 2020年2月29日 12:05
收件人: [hidden email] <[hidden email]>
主题: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors
 

I have created one sample Direct Kafka Stream in Spark. Kafka has 30 partitions of given topic. But all consumers are executing from same executor machine. 

Kafka Manager screenshot.

Screenshot 2020-02-28 at 7.06.49 PM 2.png

As per my understanding in direct Kafka Stream, Drive gives the offsets to executors and polls with this.

Kafka Stream

        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));

Spark Version: 2.4

Spark Config

        SparkConf conf = new SparkConf().setAppName("StreamingTest");
        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");


Regards,
Hrishi