How to debug Spark job

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to debug Spark job

James Starks

I have a Spark job that reads from a postgresql (v9.5) table, and write result to parquet. The code flow is not complicated, basically

    case class MyCaseClass(field1: String, field2: String)
    val df = spark.read.format("jdbc")...load()
    df.createOrReplaceTempView(...)
    val newdf = spark.sql("seslect field1, field2 from mytable").as[MyCaseClass].map { row =>
      val fieldX = ... // extract something from field2
      (field1, fileldX)
    }.filter { ... /* filter out field 3 that's not valid */ }
    newdf.write.mode(...).parquet(destPath)

This job worked correct without a problem. But it's doesn't look working ok (the job looks like hanged) when adding more fields. The refactored job looks as below
    ... 
    val newdf = spark.sql("seslect field1, field2, ... fieldN from mytable").as[MyCaseClassWithMoreFields].map { row =>
        ...
        NewCaseClassWithMoreFields(...) // all fields plus fieldX
    }.filter { ... }
    newdf.write.mode(...).parquet(destPath)

Basically what the job does is extracting some info from one of a field in db table, appends that newly extracted field to the original row, and then dumps the whole new table to parquet.

    new filed + (original field1 + ... + original fieldN)
    ...
    ...

Records loaded by spark sql to spark job (before refactored) are around 8MM, this remains the same, but when the refactored spark runs, it looks hanging there without progress. The only output on the console is (there is no crash, no exceptions thrown) 

    WARN  HeartbeatReceiver:66 - Removing executor driver with no recent heartbeats: 137128 ms exceeds timeout 120000 ms

Memory in top command looks like

    VIRT     RES     SHR        %CPU %MEM
    15.866g 8.001g  41.4m     740.3   25.6

The command used to  submit spark job is

    spark-submit --class ... --master local[*] --driver-memory 10g --executor-memory 10g ... --files ... --driver-class-path ... <jar file> ...

How can I debug or check which part of my code might cause the problem (so I can improve it)?

Thanks

 
Reply | Threaded
Open this post in threaded view
|

Re: [External Sender] How to debug Spark job

Anthony, Olufemi
One way I would go about this would be to try running a new_df.show(numcols, truncate=False) on a few columns before you try writing to parquet to force computation of newdf and see whether the hanging is occurring at that point or during the write. You may also try doing a newdf.count() as well.

Femi

On Fri, Sep 7, 2018 at 5:48 AM James Starks <[hidden email]> wrote:

I have a Spark job that reads from a postgresql (v9.5) table, and write result to parquet. The code flow is not complicated, basically

    case class MyCaseClass(field1: String, field2: String)
    val df = spark.read.format("jdbc")...load()
    df.createOrReplaceTempView(...)
    val newdf = spark.sql("seslect field1, field2 from mytable").as[MyCaseClass].map { row =>
      val fieldX = ... // extract something from field2
      (field1, fileldX)
    }.filter { ... /* filter out field 3 that's not valid */ }
    newdf.write.mode(...).parquet(destPath)

This job worked correct without a problem. But it's doesn't look working ok (the job looks like hanged) when adding more fields. The refactored job looks as below
    ... 
    val newdf = spark.sql("seslect field1, field2, ... fieldN from mytable").as[MyCaseClassWithMoreFields].map { row =>
        ...
        NewCaseClassWithMoreFields(...) // all fields plus fieldX
    }.filter { ... }
    newdf.write.mode(...).parquet(destPath)

Basically what the job does is extracting some info from one of a field in db table, appends that newly extracted field to the original row, and then dumps the whole new table to parquet.

    new filed + (original field1 + ... + original fieldN)
    ...
    ...

Records loaded by spark sql to spark job (before refactored) are around 8MM, this remains the same, but when the refactored spark runs, it looks hanging there without progress. The only output on the console is (there is no crash, no exceptions thrown) 

    WARN  HeartbeatReceiver:66 - Removing executor driver with no recent heartbeats: 137128 ms exceeds timeout 120000 ms

Memory in top command looks like

    VIRT     RES     SHR        %CPU %MEM
    15.866g 8.001g  41.4m     740.3   25.6

The command used to  submit spark job is

    spark-submit --class ... --master local[*] --driver-memory 10g --executor-memory 10g ... --files ... --driver-class-path ... <jar file> ...

How can I debug or check which part of my code might cause the problem (so I can improve it)?

Thanks

 


The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: [External Sender] How to debug Spark job

James Starks
Got the root cause eventually as it throws java.lang.OutOfMemoryError: Java heap space. Increasing --driver-memory temporarily fixes the problem. Thanks.  

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On 7 September 2018 12:32 PM, Femi Anthony <[hidden email]> wrote:

One way I would go about this would be to try running a new_df.show(numcols, truncate=False) on a few columns before you try writing to parquet to force computation of newdf and see whether the hanging is occurring at that point or during the write. You may also try doing a newdf.count() as well.

Femi

On Fri, Sep 7, 2018 at 5:48 AM James Starks <[hidden email]> wrote:

I have a Spark job that reads from a postgresql (v9.5) table, and write result to parquet. The code flow is not complicated, basically

    case class MyCaseClass(field1: String, field2: String)
    val df = spark.read.format("jdbc")...load()
    df.createOrReplaceTempView(...)
    val newdf = spark.sql("seslect field1, field2 from mytable").as[MyCaseClass].map { row =>
      val fieldX = ... // extract something from field2
      (field1, fileldX)
    }.filter { ... /* filter out field 3 that's not valid */ }
    newdf.write.mode(...).parquet(destPath)

This job worked correct without a problem. But it's doesn't look working ok (the job looks like hanged) when adding more fields. The refactored job looks as below
    ... 
    val newdf = spark.sql("seslect field1, field2, ... fieldN from mytable").as[MyCaseClassWithMoreFields].map { row =>
        ...
        NewCaseClassWithMoreFields(...) // all fields plus fieldX
    }.filter { ... }
    newdf.write.mode(...).parquet(destPath)

Basically what the job does is extracting some info from one of a field in db table, appends that newly extracted field to the original row, and then dumps the whole new table to parquet.

    new filed + (original field1 + ... + original fieldN)
    ...
    ...

Records loaded by spark sql to spark job (before refactored) are around 8MM, this remains the same, but when the refactored spark runs, it looks hanging there without progress. The only output on the console is (there is no crash, no exceptions thrown) 

    WARN  HeartbeatReceiver:66 - Removing executor driver with no recent heartbeats: 137128 ms exceeds timeout 120000 ms

Memory in top command looks like

    VIRT     RES     SHR        %CPU %MEM
    15.866g 8.001g  41.4m     740.3   25.6

The command used to  submit spark job is

    spark-submit --class ... --master local[*] --driver-memory 10g --executor-memory 10g ... --files ... --driver-class-path ... <jar file> ...

How can I debug or check which part of my code might cause the problem (so I can improve it)?

Thanks

 



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.


Reply | Threaded
Open this post in threaded view
|

Re: [External Sender] How to debug Spark job

Sonal Goyal
You could also try to profile your program on the executor or driver by using jvisualvm or yourkit to see if there is any memory/cpu optimization you could do. 

Thanks,
Sonal
Nube Technologies 





On Fri, Sep 7, 2018 at 6:35 PM, James Starks <[hidden email]> wrote:
Got the root cause eventually as it throws java.lang.OutOfMemoryError: Java heap space. Increasing --driver-memory temporarily fixes the problem. Thanks.  

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On 7 September 2018 12:32 PM, Femi Anthony <[hidden email]> wrote:

One way I would go about this would be to try running a new_df.show(numcols, truncate=False) on a few columns before you try writing to parquet to force computation of newdf and see whether the hanging is occurring at that point or during the write. You may also try doing a newdf.count() as well.

Femi

On Fri, Sep 7, 2018 at 5:48 AM James Starks <[hidden email].invalid> wrote:

I have a Spark job that reads from a postgresql (v9.5) table, and write result to parquet. The code flow is not complicated, basically

    case class MyCaseClass(field1: String, field2: String)
    val df = spark.read.format("jdbc")...load()
    df.createOrReplaceTempView(...)
    val newdf = spark.sql("seslect field1, field2 from mytable").as[MyCaseClass].map { row =>
      val fieldX = ... // extract something from field2
      (field1, fileldX)
    }.filter { ... /* filter out field 3 that's not valid */ }
    newdf.write.mode(...).parquet(destPath)

This job worked correct without a problem. But it's doesn't look working ok (the job looks like hanged) when adding more fields. The refactored job looks as below
    ... 
    val newdf = spark.sql("seslect field1, field2, ... fieldN from mytable").as[MyCaseClassWithMoreFields].map { row =>
        ...
        NewCaseClassWithMoreFields(...) // all fields plus fieldX
    }.filter { ... }
    newdf.write.mode(...).parquet(destPath)

Basically what the job does is extracting some info from one of a field in db table, appends that newly extracted field to the original row, and then dumps the whole new table to parquet.

    new filed + (original field1 + ... + original fieldN)
    ...
    ...

Records loaded by spark sql to spark job (before refactored) are around 8MM, this remains the same, but when the refactored spark runs, it looks hanging there without progress. The only output on the console is (there is no crash, no exceptions thrown) 

    WARN  HeartbeatReceiver:66 - Removing executor driver with no recent heartbeats: 137128 ms exceeds timeout 120000 ms

Memory in top command looks like

    VIRT     RES     SHR        %CPU %MEM
    15.866g 8.001g  41.4m     740.3   25.6

The command used to  submit spark job is

    spark-submit --class ... --master local[*] --driver-memory 10g --executor-memory 10g ... --files ... --driver-class-path ... <jar file> ...

How can I debug or check which part of my code might cause the problem (so I can improve it)?

Thanks

 



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.



Reply | Threaded
Open this post in threaded view
|

Re: How to debug Spark job

Marco Mistroni
In reply to this post by James Starks
Hi
 Might sound like a dumb advice. But try to break apart your process. Sounds like you
Are doing ETL
start basic with just ET. and do the changes that results in issues
If no problem add the load step
Enable spark logging so that you can post error message to the list
I think you can have a look at spark console to see if your process has memory issues
Another thing you can do is to run with subset of data and increase the load until you find the it blows
Sorry hth 

On Sep 7, 2018 10:48 AM, "James Starks" <[hidden email]> wrote:

I have a Spark job that reads from a postgresql (v9.5) table, and write result to parquet. The code flow is not complicated, basically

    case class MyCaseClass(field1: String, field2: String)
    val df = spark.read.format("jdbc")...load()
    df.createOrReplaceTempView(...)
    val newdf = spark.sql("seslect field1, field2 from mytable").as[MyCaseClass].map { row =>
      val fieldX = ... // extract something from field2
      (field1, fileldX)
    }.filter { ... /* filter out field 3 that's not valid */ }
    newdf.write.mode(...).parquet(destPath)

This job worked correct without a problem. But it's doesn't look working ok (the job looks like hanged) when adding more fields. The refactored job looks as below
    ... 
    val newdf = spark.sql("seslect field1, field2, ... fieldN from mytable").as[MyCaseClassWithMoreFields].map { row =>
        ...
        NewCaseClassWithMoreFields(...) // all fields plus fieldX
    }.filter { ... }
    newdf.write.mode(...).parquet(destPath)

Basically what the job does is extracting some info from one of a field in db table, appends that newly extracted field to the original row, and then dumps the whole new table to parquet.

    new filed + (original field1 + ... + original fieldN)
    ...
    ...

Records loaded by spark sql to spark job (before refactored) are around 8MM, this remains the same, but when the refactored spark runs, it looks hanging there without progress. The only output on the console is (there is no crash, no exceptions thrown) 

    WARN  HeartbeatReceiver:66 - Removing executor driver with no recent heartbeats: 137128 ms exceeds timeout 120000 ms

Memory in top command looks like

    VIRT     RES     SHR        %CPU %MEM
    15.866g 8.001g  41.4m     740.3   25.6

The command used to  submit spark job is

    spark-submit --class ... --master local[*] --driver-memory 10g --executor-memory 10g ... --files ... --driver-class-path ... <jar file> ...

How can I debug or check which part of my code might cause the problem (so I can improve it)?

Thanks