Backpressure initial rate not working

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Backpressure initial rate not working

Biplob Biswas
I have enabled the spark.streaming.backpressure.enabled setting and also set spark.streaming.backpressure.initialRate  to 15000, but my spark job is not respecting these settings when reading from Kafka after a failure.

In my kafka topic around 500k records are waiting for being processed and they are all taken in 1 huge batch which ultimately takes a long time and fails with executor failure exception. We don't have more resources to give in our test cluster and we expect the backpressure to kick in and take smaller batches.

What can I be doing wrong?

 
Thanks & Regards
Biplob Biswas
Reply | Threaded
Open this post in threaded view
|

Re: Backpressure initial rate not working

Biplob Biswas
Did anyone face similar issue? and any viable way to solve this? 
Thanks & Regards
Biplob Biswas


On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <[hidden email]> wrote:
I have enabled the spark.streaming.backpressure.enabled setting and also set spark.streaming.backpressure.initialRate  to 15000, but my spark job is not respecting these settings when reading from Kafka after a failure.

In my kafka topic around 500k records are waiting for being processed and they are all taken in 1 huge batch which ultimately takes a long time and fails with executor failure exception. We don't have more resources to give in our test cluster and we expect the backpressure to kick in and take smaller batches.

What can I be doing wrong?

 
Thanks & Regards
Biplob Biswas
Reply | Threaded
Open this post in threaded view
|

Re: Backpressure initial rate not working

tnist
Hi Biplob,

How many partitions are on the topic you are reading from and have you set the maxRatePerPartition?  iirc, spark back pressure is calculated as follows:

Spark back pressure:

Back pressure is calculated off of the following:


  •  maxRatePerPartition=200

  •  batchInterval 30s

  •  3 partitions on Ingest topic


This results in a maximum ingest rate of 18K:


  •  3 * 30 * 200 = 180000 max

The spark.streaming.backpressure.initialRate only applies to the first batch, per docs:


This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled.  

If you  set the maxRatePerPartition and apply the above formula, I believe you will be able to achieve the results you are looking for.

HTH.

-Todd


On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas <[hidden email]> wrote:
Did anyone face similar issue? and any viable way to solve this? 
Thanks & Regards
Biplob Biswas


On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <[hidden email]> wrote:
I have enabled the spark.streaming.backpressure.enabled setting and also set spark.streaming.backpressure.initialRate  to 15000, but my spark job is not respecting these settings when reading from Kafka after a failure.

In my kafka topic around 500k records are waiting for being processed and they are all taken in 1 huge batch which ultimately takes a long time and fails with executor failure exception. We don't have more resources to give in our test cluster and we expect the backpressure to kick in and take smaller batches.

What can I be doing wrong?

 
Thanks & Regards
Biplob Biswas
Reply | Threaded
Open this post in threaded view
|

Re: Backpressure initial rate not working

Biplob Biswas
Hi Todd,

Thanks for the reply. I have the mayxRatePerPartition set as well. Below is the spark submit config we used and still got the issue. Also the batch interval is set at 10s and number of partitions on the topic is set to 4  :

spark2-submit --name "${YARN_NAME}" \
--master yarn \
--deploy-mode ${DEPLOY_MODE} \
--num-executors ${NUM_EXECUTORS} \
--driver-cores ${NUM_DRIVER_CORES} \
--executor-cores ${NUM_EXECUTOR_CORES} \
--driver-memory ${DRIVER_MEMORY} \
--executor-memory ${EXECUTOR_MEMORY} \
--queue ${YARN_QUEUE} \
--keytab ${KEYTAB}-yarn \
--principal ${PRINCIPAL} \
--conf "spark.yarn.preserve.staging.files=true" \
--conf "spark.yarn.submit.waitAppCompletion=false" \
--conf "spark.shuffle.service.enabled=true" \
--conf "spark.dynamicAllocation.enabled=true" \
--conf "spark.dynamicAllocation.minExecutors=1" \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.streaming.receiver.maxRate=15000" \
--conf "spark.streaming.kafka.maxRatePerPartition=15000" \
--conf "spark.streaming.backpressure.initialRate=2000" \
--conf "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \
--driver-class-path "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \
--files "${JAAS_CONF},${KEYTAB}" \
--class "${MAIN_CLASS}" \
"${ARTIFACT_FILE}"

The first batch is huge, even if it worked for the first batch I would've tried researching more. The problem is that the first batch is more than 500k records.

Thanks & Regards
Biplob Biswas


On Thu, Jul 26, 2018 at 2:33 PM Todd Nist <[hidden email]> wrote:
Hi Biplob,

How many partitions are on the topic you are reading from and have you set the maxRatePerPartition?  iirc, spark back pressure is calculated as follows:

Spark back pressure:

Back pressure is calculated off of the following:


  •  maxRatePerPartition=200

  •  batchInterval 30s

  •  3 partitions on Ingest topic


This results in a maximum ingest rate of 18K:


  •  3 * 30 * 200 = 180000 max

The spark.streaming.backpressure.initialRate only applies to the first batch, per docs:


This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled.  

If you  set the maxRatePerPartition and apply the above formula, I believe you will be able to achieve the results you are looking for.

HTH.

-Todd


On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas <[hidden email]> wrote:
Did anyone face similar issue? and any viable way to solve this? 
Thanks & Regards
Biplob Biswas


On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <[hidden email]> wrote:
I have enabled the spark.streaming.backpressure.enabled setting and also set spark.streaming.backpressure.initialRate  to 15000, but my spark job is not respecting these settings when reading from Kafka after a failure.

In my kafka topic around 500k records are waiting for being processed and they are all taken in 1 huge batch which ultimately takes a long time and fails with executor failure exception. We don't have more resources to give in our test cluster and we expect the backpressure to kick in and take smaller batches.

What can I be doing wrong?

 
Thanks & Regards
Biplob Biswas
Reply | Threaded
Open this post in threaded view
|

Re: Backpressure initial rate not working

tnist
Have you tried reducing the maxRatePerPartition to a lower value?  Based on your settings, I believe you are going to be able to pull 600K worth of messages from Kafka, basically:

  •  maxRatePerPartition=15000

  •  batchInterval 10s

  •  4 partitions on Ingest topic


This results in a maximum ingest rate of 600K:


  •  4 * 10 * 15000 = 600,000 max

Can you reduce the maxRatePerPartition to say 1500 for a test run?  That should result in a more manageable  batch and you can adjust from there.


  •  4 * 10 * 1500 = 60,000 max

I know we are not setting the maxRate or initialRate, only the maxRatePerPartition and backpressure.enabled.  I thought that maxRate was not applicable when using back pressure, but may be mistaken.


-Todd






On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas <[hidden email]> wrote:
Hi Todd,

Thanks for the reply. I have the mayxRatePerPartition set as well. Below is the spark submit config we used and still got the issue. Also the batch interval is set at 10s and number of partitions on the topic is set to 4  :

spark2-submit --name "${YARN_NAME}" \
--master yarn \
--deploy-mode ${DEPLOY_MODE} \
--num-executors ${NUM_EXECUTORS} \
--driver-cores ${NUM_DRIVER_CORES} \
--executor-cores ${NUM_EXECUTOR_CORES} \
--driver-memory ${DRIVER_MEMORY} \
--executor-memory ${EXECUTOR_MEMORY} \
--queue ${YARN_QUEUE} \
--keytab ${KEYTAB}-yarn \
--principal ${PRINCIPAL} \
--conf "spark.yarn.preserve.staging.files=true" \
--conf "spark.yarn.submit.waitAppCompletion=false" \
--conf "spark.shuffle.service.enabled=true" \
--conf "spark.dynamicAllocation.enabled=true" \
--conf "spark.dynamicAllocation.minExecutors=1" \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.streaming.receiver.maxRate=15000" \
--conf "spark.streaming.kafka.maxRatePerPartition=15000" \
--conf "spark.streaming.backpressure.initialRate=2000" \
--conf "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \
--driver-class-path "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \
--files "${JAAS_CONF},${KEYTAB}" \
--class "${MAIN_CLASS}" \
"${ARTIFACT_FILE}"

The first batch is huge, even if it worked for the first batch I would've tried researching more. The problem is that the first batch is more than 500k records.

Thanks & Regards
Biplob Biswas


On Thu, Jul 26, 2018 at 2:33 PM Todd Nist <[hidden email]> wrote:
Hi Biplob,

How many partitions are on the topic you are reading from and have you set the maxRatePerPartition?  iirc, spark back pressure is calculated as follows:

Spark back pressure:

Back pressure is calculated off of the following:


  •  maxRatePerPartition=200

  •  batchInterval 30s

  •  3 partitions on Ingest topic


This results in a maximum ingest rate of 18K:


  •  3 * 30 * 200 = 180000 max

The spark.streaming.backpressure.initialRate only applies to the first batch, per docs:


This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled.  

If you  set the maxRatePerPartition and apply the above formula, I believe you will be able to achieve the results you are looking for.

HTH.

-Todd


On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas <[hidden email]> wrote:
Did anyone face similar issue? and any viable way to solve this? 
Thanks & Regards
Biplob Biswas


On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <[hidden email]> wrote:
I have enabled the spark.streaming.backpressure.enabled setting and also set spark.streaming.backpressure.initialRate  to 15000, but my spark job is not respecting these settings when reading from Kafka after a failure.

In my kafka topic around 500k records are waiting for being processed and they are all taken in 1 huge batch which ultimately takes a long time and fails with executor failure exception. We don't have more resources to give in our test cluster and we expect the backpressure to kick in and take smaller batches.

What can I be doing wrong?

 
Thanks & Regards
Biplob Biswas
Reply | Threaded
Open this post in threaded view
|

Re: Backpressure initial rate not working

Biplob Biswas
Hi Todd,

Thanks a lot, that works. Althouhg I am curious whether you know why the initialRate setting not kicking in? 

But for now the pipeline is usable again. Thanks a lot. 

Thanks & Regards
Biplob Biswas


On Thu, Jul 26, 2018 at 3:03 PM Todd Nist <[hidden email]> wrote:
Have you tried reducing the maxRatePerPartition to a lower value?  Based on your settings, I believe you are going to be able to pull 600K worth of messages from Kafka, basically:

  •  maxRatePerPartition=15000

  •  batchInterval 10s

  •  4 partitions on Ingest topic


This results in a maximum ingest rate of 600K:


  •  4 * 10 * 15000 = 600,000 max

Can you reduce the maxRatePerPartition to say 1500 for a test run?  That should result in a more manageable  batch and you can adjust from there.


  •  4 * 10 * 1500 = 60,000 max

I know we are not setting the maxRate or initialRate, only the maxRatePerPartition and backpressure.enabled.  I thought that maxRate was not applicable when using back pressure, but may be mistaken.


-Todd






On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas <[hidden email]> wrote:
Hi Todd,

Thanks for the reply. I have the mayxRatePerPartition set as well. Below is the spark submit config we used and still got the issue. Also the batch interval is set at 10s and number of partitions on the topic is set to 4  :

spark2-submit --name "${YARN_NAME}" \
--master yarn \
--deploy-mode ${DEPLOY_MODE} \
--num-executors ${NUM_EXECUTORS} \
--driver-cores ${NUM_DRIVER_CORES} \
--executor-cores ${NUM_EXECUTOR_CORES} \
--driver-memory ${DRIVER_MEMORY} \
--executor-memory ${EXECUTOR_MEMORY} \
--queue ${YARN_QUEUE} \
--keytab ${KEYTAB}-yarn \
--principal ${PRINCIPAL} \
--conf "spark.yarn.preserve.staging.files=true" \
--conf "spark.yarn.submit.waitAppCompletion=false" \
--conf "spark.shuffle.service.enabled=true" \
--conf "spark.dynamicAllocation.enabled=true" \
--conf "spark.dynamicAllocation.minExecutors=1" \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.streaming.receiver.maxRate=15000" \
--conf "spark.streaming.kafka.maxRatePerPartition=15000" \
--conf "spark.streaming.backpressure.initialRate=2000" \
--conf "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \
--driver-class-path "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \
--files "${JAAS_CONF},${KEYTAB}" \
--class "${MAIN_CLASS}" \
"${ARTIFACT_FILE}"

The first batch is huge, even if it worked for the first batch I would've tried researching more. The problem is that the first batch is more than 500k records.

Thanks & Regards
Biplob Biswas


On Thu, Jul 26, 2018 at 2:33 PM Todd Nist <[hidden email]> wrote:
Hi Biplob,

How many partitions are on the topic you are reading from and have you set the maxRatePerPartition?  iirc, spark back pressure is calculated as follows:

Spark back pressure:

Back pressure is calculated off of the following:


  •  maxRatePerPartition=200

  •  batchInterval 30s

  •  3 partitions on Ingest topic


This results in a maximum ingest rate of 18K:


  •  3 * 30 * 200 = 180000 max

The spark.streaming.backpressure.initialRate only applies to the first batch, per docs:


This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled.  

If you  set the maxRatePerPartition and apply the above formula, I believe you will be able to achieve the results you are looking for.

HTH.

-Todd


On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas <[hidden email]> wrote:
Did anyone face similar issue? and any viable way to solve this? 
Thanks & Regards
Biplob Biswas


On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <[hidden email]> wrote:
I have enabled the spark.streaming.backpressure.enabled setting and also set spark.streaming.backpressure.initialRate  to 15000, but my spark job is not respecting these settings when reading from Kafka after a failure.

In my kafka topic around 500k records are waiting for being processed and they are all taken in 1 huge batch which ultimately takes a long time and fails with executor failure exception. We don't have more resources to give in our test cluster and we expect the backpressure to kick in and take smaller batches.

What can I be doing wrong?

 
Thanks & Regards
Biplob Biswas