Distribute entire columns to executors

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Distribute entire columns to executors

Pedro Cardoso
Hello,

Is it possible in Spark to map partitions such that partitions are column-based and not row-based?
My use-case is to compute temporal series of numerical values. 
I.e: Exponential moving averages over the values of a given dataset's column.

Suppose there is a dataset with roughly 200 columns, a high percentage of which are numerical (> 60%) and at least one timestamp column, as shown in the attached file.

I want to shuffle data to executors such that each executor has a smaller dataset with only 2 columns, [Col0: Timestamp, Col<X>: Numerical type]. Over which I can then sort the dataset by increasing timestamp and then iterate over the rows with a custom function which receives a tuple: {timestamp; value}.

Partitoning by column value does not make sense for me since there is a temporal lineage of values which I must keep. On the other hand I would like to parallelize this workload as my datasets can be quite big (> 2 billion rows). The only way I see how is to distribute the entire columns so that each executor has 2B timestamp + numerical values rather than 2B*size of an entire row.

Is this possible in Spark? Can someone point in the right direction? A code snippet example (not working is fine if the logic is sound) would be highly appreciated!

Thank you for your time.
--

Pedro Cardoso

Research Engineer

[hidden email]


Follow Feedzai on Facebook.Follow Feedzai on Twitter!Connect with Feedzai on LinkedIn!                                                     

Feedzai best in class aite report


The content of this email is confidential and intended for the recipient specified in message only. It is strictly prohibited to share any part of this message with any third party, without a written consent of the sender. If you received this message by mistake, please reply to this message and follow with its deletion, so that we can ensure such a mistake does not occur in the future.


The content of this email is confidential and intended for the recipient specified in message only. It is strictly prohibited to share any part of this message with any third party, without a written consent of the sender. If you received this message by mistake, please reply to this message and follow with its deletion, so that we can ensure such a mistake does not occur in the future.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Spark Partition Entire Columns.jpg (52K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Distribute entire columns to executors

Lalwani, Jayesh

You could covert columns to rows. Some thing like this

 

val cols = [“A”, “B”, “C”]

df.flatMap( row => {

     cols.map(c => (row.getAsTimeStamp(“timestamp”), row.getAsInt(c), c) )

}).toDF(“timestamp”, “value”, “colName”)

 

If you are using dataframes, all of your columns are of the same type. If they aren’t, you will need to add logic to convert them to the same type, or use Dataset of tuples

 

From: Pedro Cardoso <[hidden email]>
Date: Thursday, September 24, 2020 at 8:02 AM
To: "[hidden email]" <[hidden email]>
Subject: [EXTERNAL] Distribute entire columns to executors

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

Hello,

 

Is it possible in Spark to map partitions such that partitions are column-based and not row-based?

My use-case is to compute temporal series of numerical values. 

I.e: Exponential moving averages over the values of a given dataset's column.

 

Suppose there is a dataset with roughly 200 columns, a high percentage of which are numerical (> 60%) and at least one timestamp column, as shown in the attached file.

 

I want to shuffle data to executors such that each executor has a smaller dataset with only 2 columns, [Col0: Timestamp, Col<X>: Numerical type]. Over which I can then sort the dataset by increasing timestamp and then iterate over the rows with a custom function which receives a tuple: {timestamp; value}.

 

Partitoning by column value does not make sense for me since there is a temporal lineage of values which I must keep. On the other hand I would like to parallelize this workload as my datasets can be quite big (> 2 billion rows). The only way I see how is to distribute the entire columns so that each executor has 2B timestamp + numerical values rather than 2B*size of an entire row.

 

Is this possible in Spark? Can someone point in the right direction? A code snippet example (not working is fine if the logic is sound) would be highly appreciated!

 

Thank you for your time.

--

Pedro Cardoso

Research Engineer

[hidden email]

 

                                                     

 

The content of this email is confidential and intended for the recipient specified in message only. It is strictly prohibited to share any part of this message with any third party, without a written consent of the sender. If you received this message by mistake, please reply to this message and follow with its deletion, so that we can ensure such a mistake does not occur in the future.


The content of this email is confidential and intended for the recipient specified in message only. It is strictly prohibited to share any part of this message with any third party, without a written consent of the sender. If you received this message by mistake, please reply to this message and follow with its deletion, so that we can ensure such a mistake does not occur in the future.

Reply | Threaded
Open this post in threaded view
|

Re: Distribute entire columns to executors

Jeff Evans
In reply to this post by Pedro Cardoso
I think you can just select the columns you need into new DataFrames, then process those separately.

val dfFirstTwo = ds.select("Col1", "Col2")
# do whatever with this one
dfFirstTwo.sort(...)
# similar for the next two columns
val dfNextTwo = ds.select("Col3", "Col4")
dfNextTwo.sort(...)

These should result in separate tasks, which you could confirm by checking the Spark UI when the application is submitted.

On Thu, Sep 24, 2020 at 7:01 AM Pedro Cardoso <[hidden email]> wrote:
Hello,

Is it possible in Spark to map partitions such that partitions are column-based and not row-based?
My use-case is to compute temporal series of numerical values. 
I.e: Exponential moving averages over the values of a given dataset's column.

Suppose there is a dataset with roughly 200 columns, a high percentage of which are numerical (> 60%) and at least one timestamp column, as shown in the attached file.

I want to shuffle data to executors such that each executor has a smaller dataset with only 2 columns, [Col0: Timestamp, Col<X>: Numerical type]. Over which I can then sort the dataset by increasing timestamp and then iterate over the rows with a custom function which receives a tuple: {timestamp; value}.

Partitoning by column value does not make sense for me since there is a temporal lineage of values which I must keep. On the other hand I would like to parallelize this workload as my datasets can be quite big (> 2 billion rows). The only way I see how is to distribute the entire columns so that each executor has 2B timestamp + numerical values rather than 2B*size of an entire row.

Is this possible in Spark? Can someone point in the right direction? A code snippet example (not working is fine if the logic is sound) would be highly appreciated!

Thank you for your time.
--

Pedro Cardoso

Research Engineer

[hidden email]


Follow Feedzai on Facebook.Follow Feedzai on Twitter!Connect with Feedzai on LinkedIn!                                                     

Feedzai best in class aite report


The content of this email is confidential and intended for the recipient specified in message only. It is strictly prohibited to share any part of this message with any third party, without a written consent of the sender. If you received this message by mistake, please reply to this message and follow with its deletion, so that we can ensure such a mistake does not occur in the future.


The content of this email is confidential and intended for the recipient specified in message only. It is strictly prohibited to share any part of this message with any third party, without a written consent of the sender. If you received this message by mistake, please reply to this message and follow with its deletion, so that we can ensure such a mistake does not occur in the future.
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]