Job is not able to perform Broadcast Join

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Job is not able to perform Broadcast Join

Sachit Murarka
Hello Users,

I am facing an issue in spark job where I am doing row number() without partition by clause because I need to add sequential increasing IDs.
But to avoid the large spill I am not doing row number() over the complete data frame.

Instead I am applying monotically_increasing id on actual data set ,
then create a new data frame from original data frame which will have just monotically_increasing id.

So DF1 = All columns + monotically_increasing_id
DF2 = Monotically_increasingID

Now I am applying row number() on DF2 since this is a smaller dataframe.

DF3 = Monotically_increasingID + Row_Number_ID

Df.join(broadcast(DF3))

This will give me sequential increment id in the original Dataframe.

But below is the stack trace.

py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
        at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Could not execute broadcast in 1000 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

Initially this threshold was 300. I already increased it.


Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Job is not able to perform Broadcast Join

EveLiao
How many rows does df3 have? Broadcast joins are a great way to append data stored in relatively small single source of truth data files to large DataFrames. DataFrames up to 2GB can be broadcasted so a data file with tens or even hundreds of thousands of rows is a broadcast candidate. Your broadcast variable is probably too large.

On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka <[hidden email]> wrote:
Hello Users,

I am facing an issue in spark job where I am doing row number() without partition by clause because I need to add sequential increasing IDs.
But to avoid the large spill I am not doing row number() over the complete data frame.

Instead I am applying monotically_increasing id on actual data set ,
then create a new data frame from original data frame which will have just monotically_increasing id.

So DF1 = All columns + monotically_increasing_id
DF2 = Monotically_increasingID

Now I am applying row number() on DF2 since this is a smaller dataframe.

DF3 = Monotically_increasingID + Row_Number_ID

Df.join(broadcast(DF3))

This will give me sequential increment id in the original Dataframe.

But below is the stack trace.

py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
        at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Could not execute broadcast in 1000 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

Initially this threshold was 300. I already increased it.


Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Job is not able to perform Broadcast Join

Sachit Murarka
Thanks Eve for response.

Yes I know we can use broadcast for smaller datasets,I increased the threshold (4Gb) for the same then also it did not work. and the df3 is somewhat greater than 2gb. 

Trying by removing broadcast as well.. Job is running since 1 hour. Will let you know.


Thanks
Sachit

On Wed, 7 Oct 2020, 00:41 Eve Liao, <[hidden email]> wrote:
How many rows does df3 have? Broadcast joins are a great way to append data stored in relatively small single source of truth data files to large DataFrames. DataFrames up to 2GB can be broadcasted so a data file with tens or even hundreds of thousands of rows is a broadcast candidate. Your broadcast variable is probably too large.

On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka <[hidden email]> wrote:
Hello Users,

I am facing an issue in spark job where I am doing row number() without partition by clause because I need to add sequential increasing IDs.
But to avoid the large spill I am not doing row number() over the complete data frame.

Instead I am applying monotically_increasing id on actual data set ,
then create a new data frame from original data frame which will have just monotically_increasing id.

So DF1 = All columns + monotically_increasing_id
DF2 = Monotically_increasingID

Now I am applying row number() on DF2 since this is a smaller dataframe.

DF3 = Monotically_increasingID + Row_Number_ID

Df.join(broadcast(DF3))

This will give me sequential increment id in the original Dataframe.

But below is the stack trace.

py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
        at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Could not execute broadcast in 1000 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

Initially this threshold was 300. I already increased it.


Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Job is not able to perform Broadcast Join

EveLiao

On Tue, Oct 6, 2020 at 12:18 PM Sachit Murarka <[hidden email]> wrote:
Thanks Eve for response.

Yes I know we can use broadcast for smaller datasets,I increased the threshold (4Gb) for the same then also it did not work. and the df3 is somewhat greater than 2gb. 

Trying by removing broadcast as well.. Job is running since 1 hour. Will let you know.


Thanks
Sachit

On Wed, 7 Oct 2020, 00:41 Eve Liao, <[hidden email]> wrote:
How many rows does df3 have? Broadcast joins are a great way to append data stored in relatively small single source of truth data files to large DataFrames. DataFrames up to 2GB can be broadcasted so a data file with tens or even hundreds of thousands of rows is a broadcast candidate. Your broadcast variable is probably too large.

On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka <[hidden email]> wrote:
Hello Users,

I am facing an issue in spark job where I am doing row number() without partition by clause because I need to add sequential increasing IDs.
But to avoid the large spill I am not doing row number() over the complete data frame.

Instead I am applying monotically_increasing id on actual data set ,
then create a new data frame from original data frame which will have just monotically_increasing id.

So DF1 = All columns + monotically_increasing_id
DF2 = Monotically_increasingID

Now I am applying row number() on DF2 since this is a smaller dataframe.

DF3 = Monotically_increasingID + Row_Number_ID

Df.join(broadcast(DF3))

This will give me sequential increment id in the original Dataframe.

But below is the stack trace.

py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
        at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Could not execute broadcast in 1000 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

Initially this threshold was 300. I already increased it.


Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Job is not able to perform Broadcast Join

Sachit Murarka
Yes, Even I tried the same first. Then I moved to join method because shuffle spill was happening because row num without partition happens on single task. Instead of processinf entire dataframe on single task. I have broken down that into df1 and df2 and joining.
Because df2 is having very less data set since it has 2 cols only.

Thanks
Sachit

On Wed, 7 Oct 2020, 01:04 Eve Liao, <[hidden email]> wrote:

On Tue, Oct 6, 2020 at 12:18 PM Sachit Murarka <[hidden email]> wrote:
Thanks Eve for response.

Yes I know we can use broadcast for smaller datasets,I increased the threshold (4Gb) for the same then also it did not work. and the df3 is somewhat greater than 2gb. 

Trying by removing broadcast as well.. Job is running since 1 hour. Will let you know.


Thanks
Sachit

On Wed, 7 Oct 2020, 00:41 Eve Liao, <[hidden email]> wrote:
How many rows does df3 have? Broadcast joins are a great way to append data stored in relatively small single source of truth data files to large DataFrames. DataFrames up to 2GB can be broadcasted so a data file with tens or even hundreds of thousands of rows is a broadcast candidate. Your broadcast variable is probably too large.

On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka <[hidden email]> wrote:
Hello Users,

I am facing an issue in spark job where I am doing row number() without partition by clause because I need to add sequential increasing IDs.
But to avoid the large spill I am not doing row number() over the complete data frame.

Instead I am applying monotically_increasing id on actual data set ,
then create a new data frame from original data frame which will have just monotically_increasing id.

So DF1 = All columns + monotically_increasing_id
DF2 = Monotically_increasingID

Now I am applying row number() on DF2 since this is a smaller dataframe.

DF3 = Monotically_increasingID + Row_Number_ID

Df.join(broadcast(DF3))

This will give me sequential increment id in the original Dataframe.

But below is the stack trace.

py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
        at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Could not execute broadcast in 1000 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

Initially this threshold was 300. I already increased it.


Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Job is not able to perform Broadcast Join

David Edwards
After adding the sequential ids you might need a repartition? I've found using monotically increasing id before that the df goes to a single partition. Usually becomes clear in the spark ui though 

On Tue, 6 Oct 2020, 20:38 Sachit Murarka, <[hidden email]> wrote:
Yes, Even I tried the same first. Then I moved to join method because shuffle spill was happening because row num without partition happens on single task. Instead of processinf entire dataframe on single task. I have broken down that into df1 and df2 and joining.
Because df2 is having very less data set since it has 2 cols only.

Thanks
Sachit

On Wed, 7 Oct 2020, 01:04 Eve Liao, <[hidden email]> wrote:

On Tue, Oct 6, 2020 at 12:18 PM Sachit Murarka <[hidden email]> wrote:
Thanks Eve for response.

Yes I know we can use broadcast for smaller datasets,I increased the threshold (4Gb) for the same then also it did not work. and the df3 is somewhat greater than 2gb. 

Trying by removing broadcast as well.. Job is running since 1 hour. Will let you know.


Thanks
Sachit

On Wed, 7 Oct 2020, 00:41 Eve Liao, <[hidden email]> wrote:
How many rows does df3 have? Broadcast joins are a great way to append data stored in relatively small single source of truth data files to large DataFrames. DataFrames up to 2GB can be broadcasted so a data file with tens or even hundreds of thousands of rows is a broadcast candidate. Your broadcast variable is probably too large.

On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka <[hidden email]> wrote:
Hello Users,

I am facing an issue in spark job where I am doing row number() without partition by clause because I need to add sequential increasing IDs.
But to avoid the large spill I am not doing row number() over the complete data frame.

Instead I am applying monotically_increasing id on actual data set ,
then create a new data frame from original data frame which will have just monotically_increasing id.

So DF1 = All columns + monotically_increasing_id
DF2 = Monotically_increasingID

Now I am applying row number() on DF2 since this is a smaller dataframe.

DF3 = Monotically_increasingID + Row_Number_ID

Df.join(broadcast(DF3))

This will give me sequential increment id in the original Dataframe.

But below is the stack trace.

py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
        at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Could not execute broadcast in 1000 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

Initially this threshold was 300. I already increased it.


Kind Regards,
Sachit Murarka