Best practices: Parallelized write to / read from S3

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Best practices: Parallelized write to / read from S3

Nick Chammas
Howdy-doody,

I have a single, very large file sitting in S3 that I want to read in with sc.textFile(). What are the best practices for reading in this file as quickly as possible? How do I parallelize the read as much as possible?

Similarly, say I have a single, very large RDD sitting in memory that I want to write out to S3 with RDD.saveAsTextFile(). What are the best practices for writing this file out as quickly as possible?

Nick

Reply | Threaded
Open this post in threaded view
|

Re: Best practices: Parallelized write to / read from S3

Aaron Davidson
Spark will only use each core for one task at a time, so doing

sc.textFile(<s3 location>, <num reducers>) 

where you set "num reducers" to at least as many as the total number of cores in your cluster, is about as fast you can get out of the box. Same goes for saveAsTextFile.


On Mon, Mar 31, 2014 at 8:49 AM, Nicholas Chammas <[hidden email]> wrote:
Howdy-doody,

I have a single, very large file sitting in S3 that I want to read in with sc.textFile(). What are the best practices for reading in this file as quickly as possible? How do I parallelize the read as much as possible?

Similarly, say I have a single, very large RDD sitting in memory that I want to write out to S3 with RDD.saveAsTextFile(). What are the best practices for writing this file out as quickly as possible?

Nick



View this message in context: Best practices: Parallelized write to / read from S3
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Best practices: Parallelized write to / read from S3

Nick Chammas
So setting minSplits will set the parallelism on the read in SparkContext.textFile(), assuming I have the cores in the cluster to deliver that level of parallelism. And if I don't explicitly provide it, Spark will set the minSplits to 2.

So for example, say I have a cluster with 4 cores total, and it takes 40 minutes to read a single file from S3 with minSplits at 2. Tt should take roughly 20 minutes to read the same file if I up minSplits to 4.

Did I understand that correctly?

RDD.saveAsTextFile() doesn't have an analog to minSplits, so I'm guessing that's not an operation the user can tune.


On Mon, Mar 31, 2014 at 12:29 PM, Aaron Davidson <[hidden email]> wrote:
Spark will only use each core for one task at a time, so doing

sc.textFile(<s3 location>, <num reducers>) 

where you set "num reducers" to at least as many as the total number of cores in your cluster, is about as fast you can get out of the box. Same goes for saveAsTextFile.


On Mon, Mar 31, 2014 at 8:49 AM, Nicholas Chammas <[hidden email]> wrote:
Howdy-doody,

I have a single, very large file sitting in S3 that I want to read in with sc.textFile(). What are the best practices for reading in this file as quickly as possible? How do I parallelize the read as much as possible?

Similarly, say I have a single, very large RDD sitting in memory that I want to write out to S3 with RDD.saveAsTextFile(). What are the best practices for writing this file out as quickly as possible?

Nick




Reply | Threaded
Open this post in threaded view
|

Re: Best practices: Parallelized write to / read from S3

Aaron Davidson
Note that you may have minSplits set to more than the number of cores in the cluster, and Spark will just run as many as possible at a time. This is better if certain nodes may be slow, for instance.

In general, it is not necessarily the case that doubling the number of cores doing IO will double the throughput, because you could be saturating the throughput with fewer cores. However, S3 is odd in that each connection gets way less bandwidth than your network link can provide, and it does seem to scale linearly with the number of connections. So, yes, taking minSplits up to 4 (or higher) will likely result in a 2x performance improvement.

saveAsTextFile() will use as many partitions (aka splits) as the RDD it's being called on. So for instance:

sc.textFile(myInputFile, 15).map(lambda x: x + "!!!").saveAsTextFile(myOutputFile)

will use 15 partitions to read the text file (i.e., up to 15 cores at a time) and then again to save back to S3.



On Mon, Mar 31, 2014 at 9:46 AM, Nicholas Chammas <[hidden email]> wrote:
So setting minSplits will set the parallelism on the read in SparkContext.textFile(), assuming I have the cores in the cluster to deliver that level of parallelism. And if I don't explicitly provide it, Spark will set the minSplits to 2.

So for example, say I have a cluster with 4 cores total, and it takes 40 minutes to read a single file from S3 with minSplits at 2. Tt should take roughly 20 minutes to read the same file if I up minSplits to 4.

Did I understand that correctly?

RDD.saveAsTextFile() doesn't have an analog to minSplits, so I'm guessing that's not an operation the user can tune.


On Mon, Mar 31, 2014 at 12:29 PM, Aaron Davidson <[hidden email]> wrote:
Spark will only use each core for one task at a time, so doing

sc.textFile(<s3 location>, <num reducers>) 

where you set "num reducers" to at least as many as the total number of cores in your cluster, is about as fast you can get out of the box. Same goes for saveAsTextFile.


On Mon, Mar 31, 2014 at 8:49 AM, Nicholas Chammas <[hidden email]> wrote:
Howdy-doody,

I have a single, very large file sitting in S3 that I want to read in with sc.textFile(). What are the best practices for reading in this file as quickly as possible? How do I parallelize the read as much as possible?

Similarly, say I have a single, very large RDD sitting in memory that I want to write out to S3 with RDD.saveAsTextFile(). What are the best practices for writing this file out as quickly as possible?

Nick





Reply | Threaded
Open this post in threaded view
|

Re: Best practices: Parallelized write to / read from S3

Nick Chammas
OK sweet. Thanks for walking me through that.

I wish this were StackOverflow so I could bestow some nice rep on all you helpful people.


On Mon, Mar 31, 2014 at 1:06 PM, Aaron Davidson <[hidden email]> wrote:
Note that you may have minSplits set to more than the number of cores in the cluster, and Spark will just run as many as possible at a time. This is better if certain nodes may be slow, for instance.

In general, it is not necessarily the case that doubling the number of cores doing IO will double the throughput, because you could be saturating the throughput with fewer cores. However, S3 is odd in that each connection gets way less bandwidth than your network link can provide, and it does seem to scale linearly with the number of connections. So, yes, taking minSplits up to 4 (or higher) will likely result in a 2x performance improvement.

saveAsTextFile() will use as many partitions (aka splits) as the RDD it's being called on. So for instance:

sc.textFile(myInputFile, 15).map(lambda x: x + "!!!").saveAsTextFile(myOutputFile)

will use 15 partitions to read the text file (i.e., up to 15 cores at a time) and then again to save back to S3.



On Mon, Mar 31, 2014 at 9:46 AM, Nicholas Chammas <[hidden email]> wrote:
So setting minSplits will set the parallelism on the read in SparkContext.textFile(), assuming I have the cores in the cluster to deliver that level of parallelism. And if I don't explicitly provide it, Spark will set the minSplits to 2.

So for example, say I have a cluster with 4 cores total, and it takes 40 minutes to read a single file from S3 with minSplits at 2. Tt should take roughly 20 minutes to read the same file if I up minSplits to 4.

Did I understand that correctly?

RDD.saveAsTextFile() doesn't have an analog to minSplits, so I'm guessing that's not an operation the user can tune.


On Mon, Mar 31, 2014 at 12:29 PM, Aaron Davidson <[hidden email]> wrote:
Spark will only use each core for one task at a time, so doing

sc.textFile(<s3 location>, <num reducers>) 

where you set "num reducers" to at least as many as the total number of cores in your cluster, is about as fast you can get out of the box. Same goes for saveAsTextFile.


On Mon, Mar 31, 2014 at 8:49 AM, Nicholas Chammas <[hidden email]> wrote:
Howdy-doody,

I have a single, very large file sitting in S3 that I want to read in with sc.textFile(). What are the best practices for reading in this file as quickly as possible? How do I parallelize the read as much as possible?

Similarly, say I have a single, very large RDD sitting in memory that I want to write out to S3 with RDD.saveAsTextFile(). What are the best practices for writing this file out as quickly as possible?

Nick






Reply | Threaded
Open this post in threaded view
|

Re: Best practices: Parallelized write to / read from S3

Nick Chammas
Alright, so I've upped the minSplits parameter on my call to textFile, but the resulting RDD still has only 1 partition, which I assume means it was read in on a single process. I am checking the number of partitions in pyspark by using the rdd._jrdd.splits().size() trick I picked up on this list.

The source file is a gzipped text file. I have heard things about gzipped files not being splittable. 

Is this the reason that opening the file with minSplits = 10 still gives me an RDD with one partition? If so, I guess the only way to speed up the load would be to change the source file's format to something splittable. 

Otherwise, if I want to speed up subsequent computation on the RDD, I should explicitly partition it with a call to RDD.partitionBy(10). 

Is that correct?


On Mon, Mar 31, 2014 at 1:15 PM, Nicholas Chammas <[hidden email]> wrote:
OK sweet. Thanks for walking me through that.

I wish this were StackOverflow so I could bestow some nice rep on all you helpful people.


On Mon, Mar 31, 2014 at 1:06 PM, Aaron Davidson <[hidden email]> wrote:
Note that you may have minSplits set to more than the number of cores in the cluster, and Spark will just run as many as possible at a time. This is better if certain nodes may be slow, for instance.

In general, it is not necessarily the case that doubling the number of cores doing IO will double the throughput, because you could be saturating the throughput with fewer cores. However, S3 is odd in that each connection gets way less bandwidth than your network link can provide, and it does seem to scale linearly with the number of connections. So, yes, taking minSplits up to 4 (or higher) will likely result in a 2x performance improvement.

saveAsTextFile() will use as many partitions (aka splits) as the RDD it's being called on. So for instance:

sc.textFile(myInputFile, 15).map(lambda x: x + "!!!").saveAsTextFile(myOutputFile)

will use 15 partitions to read the text file (i.e., up to 15 cores at a time) and then again to save back to S3.



On Mon, Mar 31, 2014 at 9:46 AM, Nicholas Chammas <[hidden email]> wrote:
So setting minSplits will set the parallelism on the read in SparkContext.textFile(), assuming I have the cores in the cluster to deliver that level of parallelism. And if I don't explicitly provide it, Spark will set the minSplits to 2.

So for example, say I have a cluster with 4 cores total, and it takes 40 minutes to read a single file from S3 with minSplits at 2. Tt should take roughly 20 minutes to read the same file if I up minSplits to 4.

Did I understand that correctly?

RDD.saveAsTextFile() doesn't have an analog to minSplits, so I'm guessing that's not an operation the user can tune.


On Mon, Mar 31, 2014 at 12:29 PM, Aaron Davidson <[hidden email]> wrote:
Spark will only use each core for one task at a time, so doing

sc.textFile(<s3 location>, <num reducers>) 

where you set "num reducers" to at least as many as the total number of cores in your cluster, is about as fast you can get out of the box. Same goes for saveAsTextFile.


On Mon, Mar 31, 2014 at 8:49 AM, Nicholas Chammas <[hidden email]> wrote:
Howdy-doody,

I have a single, very large file sitting in S3 that I want to read in with sc.textFile(). What are the best practices for reading in this file as quickly as possible? How do I parallelize the read as much as possible?

Similarly, say I have a single, very large RDD sitting in memory that I want to write out to S3 with RDD.saveAsTextFile(). What are the best practices for writing this file out as quickly as possible?

Nick







Reply | Threaded
Open this post in threaded view
|

Re: Best practices: Parallelized write to / read from S3

Aaron Davidson
Looks like you're right that gzip files are not easily splittable [1], and also about everything else you said.



On Tue, Apr 1, 2014 at 1:51 PM, Nicholas Chammas <[hidden email]> wrote:
Alright, so I've upped the minSplits parameter on my call to textFile, but the resulting RDD still has only 1 partition, which I assume means it was read in on a single process. I am checking the number of partitions in pyspark by using the rdd._jrdd.splits().size() trick I picked up on this list.

The source file is a gzipped text file. I have heard things about gzipped files not being splittable. 

Is this the reason that opening the file with minSplits = 10 still gives me an RDD with one partition? If so, I guess the only way to speed up the load would be to change the source file's format to something splittable. 

Otherwise, if I want to speed up subsequent computation on the RDD, I should explicitly partition it with a call to RDD.partitionBy(10). 

Is that correct?


On Mon, Mar 31, 2014 at 1:15 PM, Nicholas Chammas <[hidden email]> wrote:
OK sweet. Thanks for walking me through that.

I wish this were StackOverflow so I could bestow some nice rep on all you helpful people.


On Mon, Mar 31, 2014 at 1:06 PM, Aaron Davidson <[hidden email]> wrote:
Note that you may have minSplits set to more than the number of cores in the cluster, and Spark will just run as many as possible at a time. This is better if certain nodes may be slow, for instance.

In general, it is not necessarily the case that doubling the number of cores doing IO will double the throughput, because you could be saturating the throughput with fewer cores. However, S3 is odd in that each connection gets way less bandwidth than your network link can provide, and it does seem to scale linearly with the number of connections. So, yes, taking minSplits up to 4 (or higher) will likely result in a 2x performance improvement.

saveAsTextFile() will use as many partitions (aka splits) as the RDD it's being called on. So for instance:

sc.textFile(myInputFile, 15).map(lambda x: x + "!!!").saveAsTextFile(myOutputFile)

will use 15 partitions to read the text file (i.e., up to 15 cores at a time) and then again to save back to S3.



On Mon, Mar 31, 2014 at 9:46 AM, Nicholas Chammas <[hidden email]> wrote:
So setting minSplits will set the parallelism on the read in SparkContext.textFile(), assuming I have the cores in the cluster to deliver that level of parallelism. And if I don't explicitly provide it, Spark will set the minSplits to 2.

So for example, say I have a cluster with 4 cores total, and it takes 40 minutes to read a single file from S3 with minSplits at 2. Tt should take roughly 20 minutes to read the same file if I up minSplits to 4.

Did I understand that correctly?

RDD.saveAsTextFile() doesn't have an analog to minSplits, so I'm guessing that's not an operation the user can tune.


On Mon, Mar 31, 2014 at 12:29 PM, Aaron Davidson <[hidden email]> wrote:
Spark will only use each core for one task at a time, so doing

sc.textFile(<s3 location>, <num reducers>) 

where you set "num reducers" to at least as many as the total number of cores in your cluster, is about as fast you can get out of the box. Same goes for saveAsTextFile.


On Mon, Mar 31, 2014 at 8:49 AM, Nicholas Chammas <[hidden email]> wrote:
Howdy-doody,

I have a single, very large file sitting in S3 that I want to read in with sc.textFile(). What are the best practices for reading in this file as quickly as possible? How do I parallelize the read as much as possible?

Similarly, say I have a single, very large RDD sitting in memory that I want to write out to S3 with RDD.saveAsTextFile(). What are the best practices for writing this file out as quickly as possible?

Nick








Reply | Threaded
Open this post in threaded view
|

Re: Best practices: Parallelized write to / read from S3

Nick Chammas
Alright! 

Thanks for that link. I did little research based on it and it looks like Snappy or LZO + some container would be better alternatives to gzip.

I confirmed that gzip was cramping my style by trying sc.textFile() on an uncompressed version of the text file. With the uncompressed file, setting minSplits gives me an RDD that is partitioned as expected. This makes all my subsequent operations, obviously, much faster. 

I dunno if it would be appropriate to have Spark issue some kind of warning that "Hey, your file is compressed using gzip so..."

Anyway, mystery solved!

Nick


On Tue, Apr 1, 2014 at 5:03 PM, Aaron Davidson <[hidden email]> wrote:
Looks like you're right that gzip files are not easily splittable [1], and also about everything else you said.



On Tue, Apr 1, 2014 at 1:51 PM, Nicholas Chammas <[hidden email]> wrote:
Alright, so I've upped the minSplits parameter on my call to textFile, but the resulting RDD still has only 1 partition, which I assume means it was read in on a single process. I am checking the number of partitions in pyspark by using the rdd._jrdd.splits().size() trick I picked up on this list.

The source file is a gzipped text file. I have heard things about gzipped files not being splittable. 

Is this the reason that opening the file with minSplits = 10 still gives me an RDD with one partition? If so, I guess the only way to speed up the load would be to change the source file's format to something splittable. 

Otherwise, if I want to speed up subsequent computation on the RDD, I should explicitly partition it with a call to RDD.partitionBy(10). 

Is that correct?


On Mon, Mar 31, 2014 at 1:15 PM, Nicholas Chammas <[hidden email]> wrote:
OK sweet. Thanks for walking me through that.

I wish this were StackOverflow so I could bestow some nice rep on all you helpful people.


On Mon, Mar 31, 2014 at 1:06 PM, Aaron Davidson <[hidden email]> wrote:
Note that you may have minSplits set to more than the number of cores in the cluster, and Spark will just run as many as possible at a time. This is better if certain nodes may be slow, for instance.

In general, it is not necessarily the case that doubling the number of cores doing IO will double the throughput, because you could be saturating the throughput with fewer cores. However, S3 is odd in that each connection gets way less bandwidth than your network link can provide, and it does seem to scale linearly with the number of connections. So, yes, taking minSplits up to 4 (or higher) will likely result in a 2x performance improvement.

saveAsTextFile() will use as many partitions (aka splits) as the RDD it's being called on. So for instance:

sc.textFile(myInputFile, 15).map(lambda x: x + "!!!").saveAsTextFile(myOutputFile)

will use 15 partitions to read the text file (i.e., up to 15 cores at a time) and then again to save back to S3.



On Mon, Mar 31, 2014 at 9:46 AM, Nicholas Chammas <[hidden email]> wrote:
So setting minSplits will set the parallelism on the read in SparkContext.textFile(), assuming I have the cores in the cluster to deliver that level of parallelism. And if I don't explicitly provide it, Spark will set the minSplits to 2.

So for example, say I have a cluster with 4 cores total, and it takes 40 minutes to read a single file from S3 with minSplits at 2. Tt should take roughly 20 minutes to read the same file if I up minSplits to 4.

Did I understand that correctly?

RDD.saveAsTextFile() doesn't have an analog to minSplits, so I'm guessing that's not an operation the user can tune.


On Mon, Mar 31, 2014 at 12:29 PM, Aaron Davidson <[hidden email]> wrote:
Spark will only use each core for one task at a time, so doing

sc.textFile(<s3 location>, <num reducers>) 

where you set "num reducers" to at least as many as the total number of cores in your cluster, is about as fast you can get out of the box. Same goes for saveAsTextFile.


On Mon, Mar 31, 2014 at 8:49 AM, Nicholas Chammas <[hidden email]> wrote:
Howdy-doody,

I have a single, very large file sitting in S3 that I want to read in with sc.textFile(). What are the best practices for reading in this file as quickly as possible? How do I parallelize the read as much as possible?

Similarly, say I have a single, very large RDD sitting in memory that I want to write out to S3 with RDD.saveAsTextFile(). What are the best practices for writing this file out as quickly as possible?

Nick