Writing to Google BigQuery from Spark throws Error caught: Java heap space or sits frozen

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing to Google BigQuery from Spark throws Error caught: Java heap space or sits frozen

Mich Talebzadeh
Hi,

I am reading data from Hive table On-Prem directly through JDBC in GCP.

Google Cloud offers managed Spark and Yarn on a cluster using Dataproc VM servers. I have three Dataproc VMs with 32GB of RAM each.

The version of Spark is 2.3.3.

Data is read through JDBC using Cloudera JDBC 4.3 drivers. As I have not found any suitable BigQuery drivers that work, the data is written to BigQuery through Spark API for BigQuery.

I use spark shell in client mode as below

spark-shell \
     --driver-class-path /home/hduser/jars/ddhybrid.jar \
     --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar \ 
     --master yarn --deploy-mode client

The append to BigQuery works for batches of 10,000. So if I have 30K new rows to be added from Hive to BigQuery table, then this is the way it is done. It is not pretty but it works.

HiveDF.createOrReplaceTempView("hiveTable")  // Hive
xistingRows.createOrReplaceTempView("ExistingRows")  // BigQuery
var IDStart = 0
var IDLoop = 0
var i = 1
println(s"\nSource table has ${maxSourceID} rows")
println(s"Target table has ${maxBQID} rows")
var diff = maxSourceID - maxBQID
val (quotient, remainder) = diff /% batchSize
var records = maxSourceID
var sqltextRS = ""
if (maxBQID > 0) {
  sqltextRS = """SELECT hiveTable.ID FROM hiveTable WHERE NOT EXISTS(SELECT 1 FROM ExistingRows WHERE hiveTable.ID = ExistingRows.ID) ORDER BY hiveTable.ID"""
  val rs  = sql(sqltextRS)
  records = rs.count.toInt
}
if (records == 0) {
  println("\nNo rows missing from the target table")
} else {
  println("\nRows missing = " + records)
  println(s"\nAdding missing rows in batches of ${batchSize}")

  while ( i <= quotient)
  {
    if (i == 1) IDStart = maxBQID
    var IDLoop = IDStart + batchSize
    sqltextRS = s"""SELECT * from hiveTable WHERE ID > ${IDStart} AND ID <= ${IDLoop} """
    var rsBatch = sql(sqltextRS)
    rsBatch.
      write.
      format("bigquery").
      mode(org.apache.spark.sql.SaveMode.Append).
      option("table", fullyQualifiedOutputTableId).
      save()
    i = i + 1
    IDStart = IDLoop
    println(s"\nrows added so far is ${IDStart}")
  }
  if (remainder > 0) {
     println(s"adding remainder rows of ${remainder}")
    sqltextRS = s"""SELECT * from hiveTable WHERE ID > ${IDStart} AND ID <= ${maxSourceID} """
    var rsBatch = sql(sqltextRS)
    rsBatch.
      write.
      format("bigquery").
      mode(org.apache.spark.sql.SaveMode.Append).
      option("table", fullyQualifiedOutputTableId).
      save()
  }
}

When I increase the batch size from 10K to 30K it falls over with

Caused by: java.sql.SQLException: [DataDirect][JDBC Hybrid driver][Service](500540) Error caught in BackgroundFetcher. Foreground thread ID: 399. Background thread ID: 420. Error caught: Java heap space.
  
I added --conf spark.yarn.executor.memoryOverhead=3000 to spark shell and now it is sitting there doing

Rows missing = 68482

Adding missing rows in batches of 30000
[Stage 15:===================================================>     (9 + 1) / 10]

This is GUI stages showing 

image.png


It is sitting there for 19 minutes. 


And the executor page shows three dead executors and two running


image.png

BrigQuery uses temporary staging tables to  write the query results. However, I don't think that is the issue. I believe the issue is Spark resource limitation and there is a fair bit of shuffling going on. Any suggestions given that Spark is offered as managed services on GCP and I don't want to mess around with Spark parameters, although I can spin another node if needed.


Thanks




Reply | Threaded
Open this post in threaded view
|

Re: Writing to Google BigQuery from Spark throws Error caught: Java heap space or sits frozen

☼ R Nair (रविशंकर नायर)
We can use Presto, and can run Presto engine on Spark. Prestto must have two connectors configured, one for source another for bigquery. Then just execute create table in bigquery.... As select * from source. I copied about 3 million rows in this way.

Best, Ravion

On Fri, Oct 2, 2020, 12:52 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

I am reading data from Hive table On-Prem directly through JDBC in GCP.

Google Cloud offers managed Spark and Yarn on a cluster using Dataproc VM servers. I have three Dataproc VMs with 32GB of RAM each.

The version of Spark is 2.3.3.

Data is read through JDBC using Cloudera JDBC 4.3 drivers. As I have not found any suitable BigQuery drivers that work, the data is written to BigQuery through Spark API for BigQuery.

I use spark shell in client mode as below

spark-shell \
     --driver-class-path /home/hduser/jars/ddhybrid.jar \
     --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar \ 
     --master yarn --deploy-mode client

The append to BigQuery works for batches of 10,000. So if I have 30K new rows to be added from Hive to BigQuery table, then this is the way it is done. It is not pretty but it works.

HiveDF.createOrReplaceTempView("hiveTable")  // Hive
xistingRows.createOrReplaceTempView("ExistingRows")  // BigQuery
var IDStart = 0
var IDLoop = 0
var i = 1
println(s"\nSource table has ${maxSourceID} rows")
println(s"Target table has ${maxBQID} rows")
var diff = maxSourceID - maxBQID
val (quotient, remainder) = diff /% batchSize
var records = maxSourceID
var sqltextRS = ""
if (maxBQID > 0) {
  sqltextRS = """SELECT hiveTable.ID FROM hiveTable WHERE NOT EXISTS(SELECT 1 FROM ExistingRows WHERE hiveTable.ID = ExistingRows.ID) ORDER BY hiveTable.ID"""
  val rs  = sql(sqltextRS)
  records = rs.count.toInt
}
if (records == 0) {
  println("\nNo rows missing from the target table")
} else {
  println("\nRows missing = " + records)
  println(s"\nAdding missing rows in batches of ${batchSize}")

  while ( i <= quotient)
  {
    if (i == 1) IDStart = maxBQID
    var IDLoop = IDStart + batchSize
    sqltextRS = s"""SELECT * from hiveTable WHERE ID > ${IDStart} AND ID <= ${IDLoop} """
    var rsBatch = sql(sqltextRS)
    rsBatch.
      write.
      format("bigquery").
      mode(org.apache.spark.sql.SaveMode.Append).
      option("table", fullyQualifiedOutputTableId).
      save()
    i = i + 1
    IDStart = IDLoop
    println(s"\nrows added so far is ${IDStart}")
  }
  if (remainder > 0) {
     println(s"adding remainder rows of ${remainder}")
    sqltextRS = s"""SELECT * from hiveTable WHERE ID > ${IDStart} AND ID <= ${maxSourceID} """
    var rsBatch = sql(sqltextRS)
    rsBatch.
      write.
      format("bigquery").
      mode(org.apache.spark.sql.SaveMode.Append).
      option("table", fullyQualifiedOutputTableId).
      save()
  }
}

When I increase the batch size from 10K to 30K it falls over with

Caused by: java.sql.SQLException: [DataDirect][JDBC Hybrid driver][Service](500540) Error caught in BackgroundFetcher. Foreground thread ID: 399. Background thread ID: 420. Error caught: Java heap space.
  
I added --conf spark.yarn.executor.memoryOverhead=3000 to spark shell and now it is sitting there doing

Rows missing = 68482

Adding missing rows in batches of 30000
[Stage 15:===================================================>     (9 + 1) / 10]

This is GUI stages showing 

image.png


It is sitting there for 19 minutes. 


And the executor page shows three dead executors and two running


image.png

BrigQuery uses temporary staging tables to  write the query results. However, I don't think that is the issue. I believe the issue is Spark resource limitation and there is a fair bit of shuffling going on. Any suggestions given that Spark is offered as managed services on GCP and I don't want to mess around with Spark parameters, although I can spin another node if needed.


Thanks




Reply | Threaded
Open this post in threaded view
|

Re: Writing to Google BigQuery from Spark throws Error caught: Java heap space or sits frozen

Mich Talebzadeh
Thanks Ravion for the suggestion. That will be a consideration.

However, for now I want to find the resource limitation or configuration causing this Spark issue.

Regards,

Mich




LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 2 Oct 2020 at 18:12, ☼ R Nair <[hidden email]> wrote:
We can use Presto, and can run Presto engine on Spark. Prestto must have two connectors configured, one for source another for bigquery. Then just execute create table in bigquery.... As select * from source. I copied about 3 million rows in this way.

Best, Ravion

On Fri, Oct 2, 2020, 12:52 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

I am reading data from Hive table On-Prem directly through JDBC in GCP.

Google Cloud offers managed Spark and Yarn on a cluster using Dataproc VM servers. I have three Dataproc VMs with 32GB of RAM each.

The version of Spark is 2.3.3.

Data is read through JDBC using Cloudera JDBC 4.3 drivers. As I have not found any suitable BigQuery drivers that work, the data is written to BigQuery through Spark API for BigQuery.

I use spark shell in client mode as below

spark-shell \
     --driver-class-path /home/hduser/jars/ddhybrid.jar \
     --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar \ 
     --master yarn --deploy-mode client

The append to BigQuery works for batches of 10,000. So if I have 30K new rows to be added from Hive to BigQuery table, then this is the way it is done. It is not pretty but it works.

HiveDF.createOrReplaceTempView("hiveTable")  // Hive
xistingRows.createOrReplaceTempView("ExistingRows")  // BigQuery
var IDStart = 0
var IDLoop = 0
var i = 1
println(s"\nSource table has ${maxSourceID} rows")
println(s"Target table has ${maxBQID} rows")
var diff = maxSourceID - maxBQID
val (quotient, remainder) = diff /% batchSize
var records = maxSourceID
var sqltextRS = ""
if (maxBQID > 0) {
  sqltextRS = """SELECT hiveTable.ID FROM hiveTable WHERE NOT EXISTS(SELECT 1 FROM ExistingRows WHERE hiveTable.ID = ExistingRows.ID) ORDER BY hiveTable.ID"""
  val rs  = sql(sqltextRS)
  records = rs.count.toInt
}
if (records == 0) {
  println("\nNo rows missing from the target table")
} else {
  println("\nRows missing = " + records)
  println(s"\nAdding missing rows in batches of ${batchSize}")

  while ( i <= quotient)
  {
    if (i == 1) IDStart = maxBQID
    var IDLoop = IDStart + batchSize
    sqltextRS = s"""SELECT * from hiveTable WHERE ID > ${IDStart} AND ID <= ${IDLoop} """
    var rsBatch = sql(sqltextRS)
    rsBatch.
      write.
      format("bigquery").
      mode(org.apache.spark.sql.SaveMode.Append).
      option("table", fullyQualifiedOutputTableId).
      save()
    i = i + 1
    IDStart = IDLoop
    println(s"\nrows added so far is ${IDStart}")
  }
  if (remainder > 0) {
     println(s"adding remainder rows of ${remainder}")
    sqltextRS = s"""SELECT * from hiveTable WHERE ID > ${IDStart} AND ID <= ${maxSourceID} """
    var rsBatch = sql(sqltextRS)
    rsBatch.
      write.
      format("bigquery").
      mode(org.apache.spark.sql.SaveMode.Append).
      option("table", fullyQualifiedOutputTableId).
      save()
  }
}

When I increase the batch size from 10K to 30K it falls over with

Caused by: java.sql.SQLException: [DataDirect][JDBC Hybrid driver][Service](500540) Error caught in BackgroundFetcher. Foreground thread ID: 399. Background thread ID: 420. Error caught: Java heap space.
  
I added --conf spark.yarn.executor.memoryOverhead=3000 to spark shell and now it is sitting there doing

Rows missing = 68482

Adding missing rows in batches of 30000
[Stage 15:===================================================>     (9 + 1) / 10]

This is GUI stages showing 

image.png


It is sitting there for 19 minutes. 


And the executor page shows three dead executors and two running


image.png

BrigQuery uses temporary staging tables to  write the query results. However, I don't think that is the issue. I believe the issue is Spark resource limitation and there is a fair bit of shuffling going on. Any suggestions given that Spark is offered as managed services on GCP and I don't want to mess around with Spark parameters, although I can spin another node if needed.


Thanks




Reply | Threaded
Open this post in threaded view
|

Re: Writing to Google BigQuery from Spark throws Error caught: Java heap space or sits frozen

Mich Talebzadeh


That job stuck eventually failed with message


Caused by: org.apache.spark.SparkException: Task failed while writing rows.

  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)

  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)

  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)

  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

  at org.apache.spark.scheduler.Task.run(Task.scala:109)

  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

  at java.lang.Thread.run(Thread.java:748)

Caused by: java.sql.SQLException: [DataDirect][JDBC Hybrid driver][Service](500540) Error caught in BackgroundFetcher. Foreground thread ID: 463. Background thread ID: 466. Error caught: Java heap space.

  at com.ddtek.jdbc.ddhybridbase.BaseExceptions.a(Unknown Source)




Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 2 Oct 2020 at 18:20, Mich Talebzadeh <[hidden email]> wrote:
Thanks Ravion for the suggestion. That will be a consideration.

However, for now I want to find the resource limitation or configuration causing this Spark issue.

Regards,

Mich




LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 2 Oct 2020 at 18:12, ☼ R Nair <[hidden email]> wrote:
We can use Presto, and can run Presto engine on Spark. Prestto must have two connectors configured, one for source another for bigquery. Then just execute create table in bigquery.... As select * from source. I copied about 3 million rows in this way.

Best, Ravion

On Fri, Oct 2, 2020, 12:52 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

I am reading data from Hive table On-Prem directly through JDBC in GCP.

Google Cloud offers managed Spark and Yarn on a cluster using Dataproc VM servers. I have three Dataproc VMs with 32GB of RAM each.

The version of Spark is 2.3.3.

Data is read through JDBC using Cloudera JDBC 4.3 drivers. As I have not found any suitable BigQuery drivers that work, the data is written to BigQuery through Spark API for BigQuery.

I use spark shell in client mode as below

spark-shell \
     --driver-class-path /home/hduser/jars/ddhybrid.jar \
     --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar \ 
     --master yarn --deploy-mode client

The append to BigQuery works for batches of 10,000. So if I have 30K new rows to be added from Hive to BigQuery table, then this is the way it is done. It is not pretty but it works.

HiveDF.createOrReplaceTempView("hiveTable")  // Hive
xistingRows.createOrReplaceTempView("ExistingRows")  // BigQuery
var IDStart = 0
var IDLoop = 0
var i = 1
println(s"\nSource table has ${maxSourceID} rows")
println(s"Target table has ${maxBQID} rows")
var diff = maxSourceID - maxBQID
val (quotient, remainder) = diff /% batchSize
var records = maxSourceID
var sqltextRS = ""
if (maxBQID > 0) {
  sqltextRS = """SELECT hiveTable.ID FROM hiveTable WHERE NOT EXISTS(SELECT 1 FROM ExistingRows WHERE hiveTable.ID = ExistingRows.ID) ORDER BY hiveTable.ID"""
  val rs  = sql(sqltextRS)
  records = rs.count.toInt
}
if (records == 0) {
  println("\nNo rows missing from the target table")
} else {
  println("\nRows missing = " + records)
  println(s"\nAdding missing rows in batches of ${batchSize}")

  while ( i <= quotient)
  {
    if (i == 1) IDStart = maxBQID
    var IDLoop = IDStart + batchSize
    sqltextRS = s"""SELECT * from hiveTable WHERE ID > ${IDStart} AND ID <= ${IDLoop} """
    var rsBatch = sql(sqltextRS)
    rsBatch.
      write.
      format("bigquery").
      mode(org.apache.spark.sql.SaveMode.Append).
      option("table", fullyQualifiedOutputTableId).
      save()
    i = i + 1
    IDStart = IDLoop
    println(s"\nrows added so far is ${IDStart}")
  }
  if (remainder > 0) {
     println(s"adding remainder rows of ${remainder}")
    sqltextRS = s"""SELECT * from hiveTable WHERE ID > ${IDStart} AND ID <= ${maxSourceID} """
    var rsBatch = sql(sqltextRS)
    rsBatch.
      write.
      format("bigquery").
      mode(org.apache.spark.sql.SaveMode.Append).
      option("table", fullyQualifiedOutputTableId).
      save()
  }
}

When I increase the batch size from 10K to 30K it falls over with

Caused by: java.sql.SQLException: [DataDirect][JDBC Hybrid driver][Service](500540) Error caught in BackgroundFetcher. Foreground thread ID: 399. Background thread ID: 420. Error caught: Java heap space.
  
I added --conf spark.yarn.executor.memoryOverhead=3000 to spark shell and now it is sitting there doing

Rows missing = 68482

Adding missing rows in batches of 30000
[Stage 15:===================================================>     (9 + 1) / 10]

This is GUI stages showing 

image.png


It is sitting there for 19 minutes. 


And the executor page shows three dead executors and two running


image.png

BrigQuery uses temporary staging tables to  write the query results. However, I don't think that is the issue. I believe the issue is Spark resource limitation and there is a fair bit of shuffling going on. Any suggestions given that Spark is offered as managed services on GCP and I don't want to mess around with Spark parameters, although I can spin another node if needed.


Thanks