Single executor processing all tasks in spark structured streaming kafka

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Single executor processing all tasks in spark structured streaming kafka

Sachit Murarka
Hi All,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

The problem is spark is running only 1 executor with 1 task. Following is the summary of what I am doing.

Can anyone help on why my executor is 1 only?

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_URL) \
            .option("subscribe", KAFKA_TOPICS) \
            .option("startingOffsets", START_OFFSET).load() .selectExpr("CAST(value AS STRING)")

       
query = df.writeStream.foreach(process_events).option("checkpointLocation", "/opt/checkpoint").trigger(processingTime="30 seconds").start()



Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Single executor processing all tasks in spark structured streaming kafka

Kapil Garg
Hi Sachit,
What do you mean by "spark is running only 1 executor with 1 task" ?
Did you submit the spark application with multiple executors but only 1 is being used and rest are idle ?
If that's the case, then it might happen due to spark.locality.wait setting which is by default set to 3s. This will enable spark to wait for 3s for the tasks to finish on the executor before submitting the next batch on another executors. This happens due to spark's preference for cached kafka consumers.

And regarding having 1 task doing all the processing. Please check if your kafka topic has only 1 partition. Spark draws the parallelism from the number of partitions in the kafka topic. Once you have loaded the data from partitions, you can choose to repartition the batch so it is processed by multiple tasks.

On Mon, Mar 8, 2021 at 10:57 PM Sachit Murarka <[hidden email]> wrote:
Hi All,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

The problem is spark is running only 1 executor with 1 task. Following is the summary of what I am doing.

Can anyone help on why my executor is 1 only?

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_URL) \
            .option("subscribe", KAFKA_TOPICS) \
            .option("startingOffsets", START_OFFSET).load() .selectExpr("CAST(value AS STRING)")

       
query = df.writeStream.foreach(process_events).option("checkpointLocation", "/opt/checkpoint").trigger(processingTime="30 seconds").start()



Kind Regards,
Sachit Murarka


--
Regards
Kapil Garg

-----------------------------------------------------------------------------------------

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error, please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute or copy this email. Please notify the sender immediately by email if you have received this email by mistake and delete this email from your system. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

 

Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the organization. Any information on shares, debentures or similar instruments, recommended product pricing, valuations and the like are for information purposes only. It is not meant to be an instruction or recommendation, as the case may be, to buy or to sell securities, products, services nor an offer to buy or sell securities, products or services unless specifically stated to be so on behalf of the Flipkart group. Employees of the Flipkart group of companies are expressly required not to make defamatory statements and not to infringe or authorise any infringement of copyright or any other legal right by email communications. Any such communication is contrary to organizational policy and outside the scope of the employment of the individual concerned. The organization will not accept any liability in respect of such communication, and the employee responsible will be personally liable for any damages or other liability arising.

 

Our organization accepts no liability for the content of this email, or for the consequences of any actions taken on the basis of the information provided, unless that information is subsequently confirmed in writing. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

-----------------------------------------------------------------------------------------

Reply | Threaded
Open this post in threaded view
|

Re: Single executor processing all tasks in spark structured streaming kafka

Sachit Murarka
Hi Kapil,

Thanks for suggestion. Yes, It worked.

Regards
Sachit

On Tue, 9 Mar 2021, 00:19 Kapil Garg, <[hidden email]> wrote:
Hi Sachit,
What do you mean by "spark is running only 1 executor with 1 task" ?
Did you submit the spark application with multiple executors but only 1 is being used and rest are idle ?
If that's the case, then it might happen due to spark.locality.wait setting which is by default set to 3s. This will enable spark to wait for 3s for the tasks to finish on the executor before submitting the next batch on another executors. This happens due to spark's preference for cached kafka consumers.

And regarding having 1 task doing all the processing. Please check if your kafka topic has only 1 partition. Spark draws the parallelism from the number of partitions in the kafka topic. Once you have loaded the data from partitions, you can choose to repartition the batch so it is processed by multiple tasks.

On Mon, Mar 8, 2021 at 10:57 PM Sachit Murarka <[hidden email]> wrote:
Hi All,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

The problem is spark is running only 1 executor with 1 task. Following is the summary of what I am doing.

Can anyone help on why my executor is 1 only?

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_URL) \
            .option("subscribe", KAFKA_TOPICS) \
            .option("startingOffsets", START_OFFSET).load() .selectExpr("CAST(value AS STRING)")

       
query = df.writeStream.foreach(process_events).option("checkpointLocation", "/opt/checkpoint").trigger(processingTime="30 seconds").start()



Kind Regards,
Sachit Murarka


--
Regards
Kapil Garg

-----------------------------------------------------------------------------------------

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error, please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute or copy this email. Please notify the sender immediately by email if you have received this email by mistake and delete this email from your system. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

 

Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the organization. Any information on shares, debentures or similar instruments, recommended product pricing, valuations and the like are for information purposes only. It is not meant to be an instruction or recommendation, as the case may be, to buy or to sell securities, products, services nor an offer to buy or sell securities, products or services unless specifically stated to be so on behalf of the Flipkart group. Employees of the Flipkart group of companies are expressly required not to make defamatory statements and not to infringe or authorise any infringement of copyright or any other legal right by email communications. Any such communication is contrary to organizational policy and outside the scope of the employment of the individual concerned. The organization will not accept any liability in respect of such communication, and the employee responsible will be personally liable for any damages or other liability arising.

 

Our organization accepts no liability for the content of this email, or for the consequences of any actions taken on the basis of the information provided, unless that information is subsequently confirmed in writing. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

-----------------------------------------------------------------------------------------