[pyspark 2.3+] read/write huge data with smaller block size (128MB per block)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[pyspark 2.3+] read/write huge data with smaller block size (128MB per block)

rishishah.star
Hi All,

I have about 10TB of parquet data on S3, where data files have 128MB sized blocks. Spark would by default pick up one block per task, even though every task within executor has atleast 1.8GB memory. Isn't that wasteful? Is there any way to speed up this processing? Is there a way to force tasks to pick up more files which sum up to a certain block size? or Spark would always entertain block per task? Basically is there an override to make sure spark tasks reads larger block(s)?

Also as seen in the image here - while writing 4 files (partitionby file_date), one file per partition.. Somehow 4 threads are active but two threads seem to be doing nothing. and other 2 threads have taken over the writing for all 4 files. Shouldn't all 4 tasks pick up one task each?

For this example, assume df has 4 file_dates worth data.

df.repartition('file_date').write.partitionBy('file_date').parquet(PATH)


Any suggestions/feedback helps, appreciate it!
--
Regards,

Rishi Shah


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Screen Shot 2020-06-18 at 2.01.53 PM.png (171K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark 2.3+] read/write huge data with smaller block size (128MB per block)

srowen
Yes you'll generally get 1 partition per block, and 1 task per partition.
The amount of RAM isn't directly relevant; it's not loaded into memory. But you may nevertheless get some improvement with larger partitions / tasks, though typically only if your tasks are very small and very fast right now (completing in a few seconds)
You can use minSplitSize to encourage some RDD APIs to choose larger partitions, but not in the DF API.
Instead you can try coalescing to a smaller number of partitions, without a shuffle (the shuffle will probably negate any benefit)

However what I see here is different still -- you have serious data skew because you partitioned by date, and I suppose some dates have lots of data, some have almost none. 


On Fri, Jun 19, 2020 at 12:17 AM Rishi Shah <[hidden email]> wrote:
Hi All,

I have about 10TB of parquet data on S3, where data files have 128MB sized blocks. Spark would by default pick up one block per task, even though every task within executor has atleast 1.8GB memory. Isn't that wasteful? Is there any way to speed up this processing? Is there a way to force tasks to pick up more files which sum up to a certain block size? or Spark would always entertain block per task? Basically is there an override to make sure spark tasks reads larger block(s)?

Also as seen in the image here - while writing 4 files (partitionby file_date), one file per partition.. Somehow 4 threads are active but two threads seem to be doing nothing. and other 2 threads have taken over the writing for all 4 files. Shouldn't all 4 tasks pick up one task each?

For this example, assume df has 4 file_dates worth data.

df.repartition('file_date').write.partitionBy('file_date').parquet(PATH)


Any suggestions/feedback helps, appreciate it!
--
Regards,

Rishi Shah

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark 2.3+] read/write huge data with smaller block size (128MB per block)

rishishah.star
Thanks Sean! To combat the skew I do have another column I partitionby and that has worked well (like below). However in the image I attached in my original email - it looks like 2 tasks processed nothing, may I reading SPARKUI task table right? All 4 dates have date - 2 dates have ~200MB & other 2 have ~800MB... This was just a test run to check the behavior. Shouldn't I see all 4 tasks with some output rows?

df.repartition('file_date', 'part_col').write.partitionBy('file_date').parquet(PATH)


On Fri, Jun 19, 2020 at 9:38 AM Sean Owen <[hidden email]> wrote:
Yes you'll generally get 1 partition per block, and 1 task per partition.
The amount of RAM isn't directly relevant; it's not loaded into memory. But you may nevertheless get some improvement with larger partitions / tasks, though typically only if your tasks are very small and very fast right now (completing in a few seconds)
You can use minSplitSize to encourage some RDD APIs to choose larger partitions, but not in the DF API.
Instead you can try coalescing to a smaller number of partitions, without a shuffle (the shuffle will probably negate any benefit)

However what I see here is different still -- you have serious data skew because you partitioned by date, and I suppose some dates have lots of data, some have almost none. 


On Fri, Jun 19, 2020 at 12:17 AM Rishi Shah <[hidden email]> wrote:
Hi All,

I have about 10TB of parquet data on S3, where data files have 128MB sized blocks. Spark would by default pick up one block per task, even though every task within executor has atleast 1.8GB memory. Isn't that wasteful? Is there any way to speed up this processing? Is there a way to force tasks to pick up more files which sum up to a certain block size? or Spark would always entertain block per task? Basically is there an override to make sure spark tasks reads larger block(s)?

Also as seen in the image here - while writing 4 files (partitionby file_date), one file per partition.. Somehow 4 threads are active but two threads seem to be doing nothing. and other 2 threads have taken over the writing for all 4 files. Shouldn't all 4 tasks pick up one task each?

For this example, assume df has 4 file_dates worth data.

df.repartition('file_date').write.partitionBy('file_date').parquet(PATH)


Any suggestions/feedback helps, appreciate it!
--
Regards,

Rishi Shah

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


--
Regards,

Rishi Shah