Hello,
Is it possible in Spark to map partitions such that partitions are columnbased and not rowbased? My usecase is to compute temporal series of numerical values. I.e: Exponential moving averages over the values of a given dataset's column. Suppose there is a dataset with roughly 200 columns, a high percentage of which are numerical (> 60%) and at least one timestamp column, as shown in the attached file. I want to shuffle data to executors such that each executor has a smaller dataset with only 2 columns, [Col0: Timestamp, Col<X>: Numerical type]. Over which I can then sort the dataset by increasing timestamp and then iterate over the rows with a custom function which receives a tuple: {timestamp; value}. Partitoning by column value does not make sense for me since there is a temporal lineage of values which I must keep. On the other hand I would like to parallelize this workload as my datasets can be quite big (> 2 billion rows). The only way I see how is to distribute the entire columns so that each executor has 2B timestamp + numerical values rather than 2B*size of an entire row. Is this possible in Spark? Can someone point in the right direction? A code snippet example (not working is fine if the logic is sound) would be highly appreciated! Thank you for your time.  The content of this email is confidential and intended for the recipient specified in message only. It is strictly prohibited to share any part of this message with any third party, without a written consent of the sender. If you received this message by mistake, please reply to this message and follow with its deletion, so that we can ensure such a mistake does not occur in the future.  To unsubscribe email: [hidden email] Spark Partition Entire Columns.jpg (52K) Download Attachment 
You could covert columns to rows. Some thing like this val cols = [“A”, “B”, “C”] df.flatMap( row => { cols.map(c => (row.getAsTimeStamp(“timestamp”), row.getAsInt(c), c) ) }).toDF(“timestamp”, “value”, “colName”) If you are using dataframes, all of your columns are of the same type. If they aren’t, you will need to add logic to convert them to the same type, or use Dataset of tuples From: Pedro Cardoso <[hidden email]>
Hello, Is it possible in Spark to map partitions such that partitions are columnbased and not rowbased? My usecase is to compute temporal series of numerical values. I.e: Exponential moving averages over the values of a given dataset's column. Suppose there is a dataset with roughly 200 columns, a high percentage of which are numerical (> 60%) and at least one timestamp column, as shown in the attached file. I want to shuffle data to executors such that each executor has a smaller dataset with only 2 columns, [Col0: Timestamp, Col<X>: Numerical type]. Over which I can then sort the dataset by increasing timestamp and then iterate over the rows
with a custom function which receives a tuple: {timestamp; value}. Partitoning by column value does not make sense for me since there is a temporal lineage of values which I must keep. On the other hand I would like to parallelize this workload as my datasets can be quite big (> 2 billion rows). The only
way I see how is to distribute the entire columns so that each executor has 2B timestamp + numerical values rather than 2B*size of an entire row. Is this possible in Spark? Can someone point in the right direction? A code snippet example (not working is fine if the logic is sound) would be highly appreciated! Thank you for your time.  The content of this email is confidential and intended for the recipient specified in message only. It is strictly prohibited to share any part of this message
with any third party, without a written consent of the sender. If you received this message by mistake, please reply to this message and follow with its deletion, so that we can ensure such a mistake does not occur in the future.

In reply to this post by Pedro Cardoso
I think you can just select the columns you need into new DataFrames, then process those separately. val dfFirstTwo = ds.select("Col1", "Col2") # do whatever with this one dfFirstTwo.sort(...) # similar for the next two columns val dfNextTwo = ds.select("Col3", "Col4") dfNextTwo.sort(...) These should result in separate tasks, which you could confirm by checking the Spark UI when the application is submitted. On Thu, Sep 24, 2020 at 7:01 AM Pedro Cardoso <[hidden email]> wrote:

Free forum by Nabble  Edit this page 