How to control count / size of output files for

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How to control count / size of output files for

Ivan Petrov
Hi, I'm trying to control the size and/or count of spark output.

Here is my code. I expect to get 5 files  but I get dozens of small files.
Why?

dataset
.repartition(5)
.sort("long_repeated_string_in_this_column") // should be better compressed with snappy
.write
.parquet(outputPath)
Reply | Threaded
Open this post in threaded view
|

Re: How to control count / size of output files for

Attila Zsolt Piros
hi!

It is because of "spark.sql.shuffle.partitions". See the value 200 in the
physical plan at the rangepartitioning:


scala> val df = sc.parallelize(1 to 1000, 10).toDF("v").sort("v")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [v: int]

scala> df.explain()
== Physical Plan ==
*(2) Sort [v#300 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(v#300 ASC NULLS FIRST, 200), true, [id=#334]
   +- *(1) Project [value#297 AS v#300]
      +- *(1) SerializeFromObject [input[0, int, false] AS value#297]
         +- Scan[obj#296]

scala> df.rdd.getNumPartitions
res13: Int = 200

Best Regards,
Attila







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to control count / size of output files for

Ivan Petrov
In reply to this post by Ivan Petrov
Ah... makes sense, thank you. i tried sortWithinPartition before and replaced with sort. It was a mistake.

чт, 25 февр. 2021 г. в 15:25, Pietro Gentile <[hidden email]>:
Hi,

It is because of repartition before the sort method invocation. If you reverse them you'll see 5 output files.

Regards,
Pietro

Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov <[hidden email]> ha scritto:
Hi, I'm trying to control the size and/or count of spark output.

Here is my code. I expect to get 5 files  but I get dozens of small files.
Why?

dataset
.repartition(5)
.sort("long_repeated_string_in_this_column") // should be better compressed with snappy
.write
.parquet(outputPath)
Reply | Threaded
Open this post in threaded view
|

Re: How to control count / size of output files for

Gourav Sengupta
In reply to this post by Ivan Petrov
Hi Ivan,

sorry but it always helps to know the version of SPARK you are using, its environment, and the format that you are writing out your files to, and any other details if possible.


Regards,
Gourav Sengupta 

On Wed, Feb 24, 2021 at 3:43 PM Ivan Petrov <[hidden email]> wrote:
Hi, I'm trying to control the size and/or count of spark output.

Here is my code. I expect to get 5 files  but I get dozens of small files.
Why?

dataset
.repartition(5)
.sort("long_repeated_string_in_this_column") // should be better compressed with snappy
.write
.parquet(outputPath)
Reply | Threaded
Open this post in threaded view
|

Re: How to control count / size of output files for

m li
In reply to this post by Ivan Petrov
Hi Ivan,



If the error you are referring to is that the data is out of order, it may
be that the data is out of order due to the “repartition”. You can try to
use the “repartitionByRange”

scala> val df = sc.parallelize (1 to 1000, 10).toDF("v")

scala> df.repartitionByRange(5,column("v")).sortWithinPartitions("v").
write.parquet(outputPath)



Best Regards,

m li
Ivan Petrov wrote
> Ah... makes sense, thank you. i tried sortWithinPartition before and
> replaced with sort. It was a mistake.
>
> чт, 25 февр. 2021 г. в 15:25, Pietro Gentile <

> pietro.gentile89.developer@

>>:
>
>> Hi,
>>
>> It is because of *repartition* before the *sort* method invocation. If
>> you reverse them you'll see 5 output files.
>>
>> Regards,
>> Pietro
>>
>> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov &lt;

> capacytron@

> &gt;
>> ha scritto:
>>
>>> Hi, I'm trying to control the size and/or count of spark output.
>>>
>>> Here is my code. I expect to get 5 files  but I get dozens of small
>>> files.
>>> Why?
>>>
>>> dataset
>>> .repartition(5)
>>> .sort("long_repeated_string_in_this_column") // should be better
>>> compressed with snappy
>>> .write
>>> .parquet(outputPath)
>>>
>>





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to control count / size of output files for

Gourav Sengupta
Hi,

firstly there is no need to use repartition by range. The repartition, or coalesce clause can come after the sort and everything will be fine.


Secondly to reduce the number of records per file there is no need to use repartition, just try to sort  and then write out the files using the property: spark.sql.files.maxRecordsPerFile unless there is skew in the data things will work out fine.


Regards,
Gourav Sengupta

On Mon, Mar 8, 2021 at 4:01 PM m li <[hidden email]> wrote:
Hi Ivan,



If the error you are referring to is that the data is out of order, it may
be that the data is out of order due to the “repartition”. You can try to
use the “repartitionByRange”

scala> val df = sc.parallelize (1 to 1000, 10).toDF("v")

scala> df.repartitionByRange(5,column("v")).sortWithinPartitions("v").
write.parquet(outputPath)



Best Regards,

m li
Ivan Petrov wrote
> Ah... makes sense, thank you. i tried sortWithinPartition before and
> replaced with sort. It was a mistake.
>
> чт, 25 февр. 2021 г. в 15:25, Pietro Gentile <

> pietro.gentile89.developer@

>>:
>
>> Hi,
>>
>> It is because of *repartition* before the *sort* method invocation. If
>> you reverse them you'll see 5 output files.
>>
>> Regards,
>> Pietro
>>
>> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov &lt;

> capacytron@

> &gt;
>> ha scritto:
>>
>>> Hi, I'm trying to control the size and/or count of spark output.
>>>
>>> Here is my code. I expect to get 5 files  but I get dozens of small
>>> files.
>>> Why?
>>>
>>> dataset
>>> .repartition(5)
>>> .sort("long_repeated_string_in_this_column") // should be better
>>> compressed with snappy
>>> .write
>>> .parquet(outputPath)
>>>
>>





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to control count / size of output files for

m li
hi

Thank you. The suggestion is very good. There is no need to use
"repartitionByRange",

However, there is a little doubt that if the output file is required to be
globally ordered, "repartition" will disrupt the order of the data, and the
result of using "coalesce" is correct

Best Regards,
m li


Gourav Sengupta wrote

> Hi,
>
> firstly there is no need to use repartition by range. The repartition, or
> coalesce clause can come after the sort and everything will be fine.
>
>
> Secondly to reduce the number of records per file there is no need to use
> repartition, just try to sort  and then write out the files using the
> property: spark.sql.files.maxRecordsPerFile unless there is skew in the
> data things will work out fine.
>
>
> Regards,
> Gourav Sengupta
>
> On Mon, Mar 8, 2021 at 4:01 PM m li &lt;

> xiyunanmenwai@

> &gt; wrote:
>
>> Hi Ivan,
>>
>>
>>
>> If the error you are referring to is that the data is out of order, it
>> may
>> be that the data is out of order due to the “repartition”. You can try to
>> use the “repartitionByRange”
>>
>> scala> val df = sc.parallelize (1 to 1000, 10).toDF("v")
>>
>> scala> df.repartitionByRange(5,column("v")).sortWithinPartitions("v").
>> write.parquet(outputPath)
>>
>>
>>
>> Best Regards,
>>
>> m li
>> Ivan Petrov wrote
>> > Ah... makes sense, thank you. i tried sortWithinPartition before and
>> > replaced with sort. It was a mistake.
>> >
>> > чт, 25 февр. 2021 г. в 15:25, Pietro Gentile <
>>
>> > pietro.gentile89.developer@
>>
>> >>:
>> >
>> >> Hi,
>> >>
>> >> It is because of *repartition* before the *sort* method invocation. If
>> >> you reverse them you'll see 5 output files.
>> >>
>> >> Regards,
>> >> Pietro
>> >>
>> >> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov &lt;
>>
>> > capacytron@
>>
>> > &gt;
>> >> ha scritto:
>> >>
>> >>> Hi, I'm trying to control the size and/or count of spark output.
>> >>>
>> >>> Here is my code. I expect to get 5 files  but I get dozens of small
>> >>> files.
>> >>> Why?
>> >>>
>> >>> dataset
>> >>> .repartition(5)
>> >>> .sort("long_repeated_string_in_this_column") // should be better
>> >>> compressed with snappy
>> >>> .write
>> >>> .parquet(outputPath)
>> >>>
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail:

> user-unsubscribe@.apache

>>
>>





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]