Issues with large schema tables

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Issues with large schema tables

Ballas, Ryan W

Hello All,

 

Our team is having a lot of issues with the Spark API particularly with large schema tables. We currently have a program written in Scala that utilizes the Apache spark API to create two tables from raw files. We have one particularly very large raw data file that contains around ~4700 columns and ~200,000 rows. Every week we get a new file that shows the updates, inserts and deletes that happened in the last week. Our program will create two files – a master file and a history file. The master file will be the most up to date version of this table while the history table shows all changes inserts and updates that happened to this table and showing what changed. For example, if we have the following schema where A and B are unique:

 

Week 1                                                                                  Week 2

A             B             C                                                              A             B             C

1              2              3                                                              1              2              4

 

Then the master table will now be

A             B             C

1              2              4

 

and History table will be

A             B             change_column  change_type        old_value              new_value

1              2              C                              Update                  3                              4

 

This process is working flawlessly for shorter schema tables. We have a table that has 300 columns but over 100,000,000 rows and this code still runs. The process above for the larger schema table runs for around 15 hours, and then crashes with the following error:

 

Exception in thread "main" java.lang.StackOverflowError

        at scala.collection.generic.Growable$class.loop$1(Growable.scala:52)

        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:57)

        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)

        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)

        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

        at scala.collection.immutable.List.foreach(List.scala:381)

        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)

        at scala.collection.immutable.List.flatMap(List.scala:344)

 

Some other notes… This is running on a very large MAPR cluster. We have tried running the job with upwards of ½ a TB of RAM and this still happens. All of our other smaller schema tables run except for this one. 

 

Here is a code example that takes around 4 hours to run for this larger table, but runs in 20 seconds for other tables:

 

var dataframe_result = dataframe1.join(broadcast(dataframe2), Seq(listOfUniqueIds:_*)).repartition(100).cache()

 

We have tried all of the following with no success:

  • Using hash broad-cast joins (dataframe2 is smaller, dataframe1 is huge)
  • Repartioining on different numbers, as well as not repartitioning at all
  • Caching the result of the dataframe (we originally did not do this).

 

What is causing this error and how do we go about fixing it? This code just takes in 1 parameter (the table to run) so it’s the exact same code for every table. It runs flawlessly for every other table except for this one. The only thing different between this table and all the other ones is the number of columns. This has the most columns at 4700 where the second most is 800.

 

If anyone has any ideas on how to fix this it would be greatly appreciated. Thank you in advance for the help!!

 


This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.

Reply | Threaded
Open this post in threaded view
|

Re: Issues with large schema tables

Gourav Sengupta
Hi Ballas,

in Data Science terms you have 4500 variables without correlations or which are independent of each other. In Data Modelling terms you have an entity with 4500 properties. I have worked on hair splitting financial products, even they do not have properties of a financial product with more than 800 properties through out its lifecycle.

I think that the best way to approach your problem is not to think it as a data engineering problem, but a data architecture problem. Please apply dimensionality reduction, data modelling and MDM to the data before processing it.


Regards,
Gourav

On Wed, Mar 7, 2018 at 6:34 PM, Ballas, Ryan W <[hidden email]> wrote:

Hello All,

 

Our team is having a lot of issues with the Spark API particularly with large schema tables. We currently have a program written in Scala that utilizes the Apache spark API to create two tables from raw files. We have one particularly very large raw data file that contains around ~4700 columns and ~200,000 rows. Every week we get a new file that shows the updates, inserts and deletes that happened in the last week. Our program will create two files – a master file and a history file. The master file will be the most up to date version of this table while the history table shows all changes inserts and updates that happened to this table and showing what changed. For example, if we have the following schema where A and B are unique:

 

Week 1                                                                                  Week 2

A             B             C                                                              A             B             C

1              2              3                                                              1              2              4

 

Then the master table will now be

A             B             C

1              2              4

 

and History table will be

A             B             change_column  change_type        old_value              new_value

1              2              C                              Update                  3                              4

 

This process is working flawlessly for shorter schema tables. We have a table that has 300 columns but over 100,000,000 rows and this code still runs. The process above for the larger schema table runs for around 15 hours, and then crashes with the following error:

 

Exception in thread "main" java.lang.StackOverflowError

        at scala.collection.generic.Growable$class.loop$1(Growable.scala:52)

        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:57)

        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)

        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)

        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

        at scala.collection.immutable.List.foreach(List.scala:381)

        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)

        at scala.collection.immutable.List.flatMap(List.scala:344)

 

Some other notes… This is running on a very large MAPR cluster. We have tried running the job with upwards of ½ a TB of RAM and this still happens. All of our other smaller schema tables run except for this one. 

 

Here is a code example that takes around 4 hours to run for this larger table, but runs in 20 seconds for other tables:

 

var dataframe_result = dataframe1.join(broadcast(dataframe2), Seq(listOfUniqueIds:_*)).repartition(100).cache()

 

We have tried all of the following with no success:

  • Using hash broad-cast joins (dataframe2 is smaller, dataframe1 is huge)
  • Repartioining on different numbers, as well as not repartitioning at all
  • Caching the result of the dataframe (we originally did not do this).

 

What is causing this error and how do we go about fixing it? This code just takes in 1 parameter (the table to run) so it’s the exact same code for every table. It runs flawlessly for every other table except for this one. The only thing different between this table and all the other ones is the number of columns. This has the most columns at 4700 where the second most is 800.

 

If anyone has any ideas on how to fix this it would be greatly appreciated. Thank you in advance for the help!!

 


This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.