Spark S3 Performance

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark S3 Performance

Nitay Joffe
I have a simple S3 job to read a text file and do a line count. Specifically I'm doing sc.textFile("s3n://mybucket/myfile").count.The file is about 1.2GB. My setup is standalone spark cluster with 4 workers each with 2 cores / 16GB ram. I'm using branch-1.2 code built against hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).

The whole count is taking on the order of a couple of minutes, which seems extremely slow.
I've been looking into it and so far have noticed two things, hoping the community has seen this before and knows what to do...

1) Every executor seems to make an S3 call to read the entire file before making another call to read just it's split. Here's a paste I've cleaned up to show just one task: http://goo.gl/XCfyZA. I've verified this happens in every task. It is taking a long time (40-50 seconds), I don't see why it is doing this?
2) I've tried a few numPartitions parameters. When I make the parameter anything below 21 it seems to get ignored. Under the hood FileInputFormat is doing something that always ends up with at least 21 partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and have seen that the performance only gets worse as I increase it beyond 21. I would like to try 8 just to see, but again I don't see how to force it to go below 21.

Thanks for the help,
- Nitay
Founder & CTO

Reply | Threaded
Open this post in threaded view
|

Re: Spark S3 Performance

Nitay Joffe
Anyone have any thoughts on this? Trying to understand especially #2 if it's a legit bug or something I'm doing wrong.

- Nitay
Founder & CTO


On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <[hidden email]> wrote:
I have a simple S3 job to read a text file and do a line count. Specifically I'm doing sc.textFile("s3n://mybucket/myfile").count.The file is about 1.2GB. My setup is standalone spark cluster with 4 workers each with 2 cores / 16GB ram. I'm using branch-1.2 code built against hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).

The whole count is taking on the order of a couple of minutes, which seems extremely slow.
I've been looking into it and so far have noticed two things, hoping the community has seen this before and knows what to do...

1) Every executor seems to make an S3 call to read the entire file before making another call to read just it's split. Here's a paste I've cleaned up to show just one task: http://goo.gl/XCfyZA. I've verified this happens in every task. It is taking a long time (40-50 seconds), I don't see why it is doing this?
2) I've tried a few numPartitions parameters. When I make the parameter anything below 21 it seems to get ignored. Under the hood FileInputFormat is doing something that always ends up with at least 21 partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and have seen that the performance only gets worse as I increase it beyond 21. I would like to try 8 just to see, but again I don't see how to force it to go below 21.

Thanks for the help,
- Nitay
Founder & CTO


Reply | Threaded
Open this post in threaded view
|

Re: Spark S3 Performance

Nitay Joffe
Err I meant #1 :)

- Nitay
Founder & CTO


On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <[hidden email]> wrote:
Anyone have any thoughts on this? Trying to understand especially #2 if it's a legit bug or something I'm doing wrong.

- Nitay
Founder & CTO


On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <[hidden email]> wrote:
I have a simple S3 job to read a text file and do a line count. Specifically I'm doing sc.textFile("s3n://mybucket/myfile").count.The file is about 1.2GB. My setup is standalone spark cluster with 4 workers each with 2 cores / 16GB ram. I'm using branch-1.2 code built against hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).

The whole count is taking on the order of a couple of minutes, which seems extremely slow.
I've been looking into it and so far have noticed two things, hoping the community has seen this before and knows what to do...

1) Every executor seems to make an S3 call to read the entire file before making another call to read just it's split. Here's a paste I've cleaned up to show just one task: http://goo.gl/XCfyZA. I've verified this happens in every task. It is taking a long time (40-50 seconds), I don't see why it is doing this?
2) I've tried a few numPartitions parameters. When I make the parameter anything below 21 it seems to get ignored. Under the hood FileInputFormat is doing something that always ends up with at least 21 partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and have seen that the performance only gets worse as I increase it beyond 21. I would like to try 8 just to see, but again I don't see how to force it to go below 21.

Thanks for the help,
- Nitay
Founder & CTO



Reply | Threaded
Open this post in threaded view
|

Re: Spark S3 Performance

Andrei
Not that I'm professional user of Amazon services, but I have a guess about your performance issues. From [1], there are two different filesystems over S3:

 - native that behaves just like regular files (schema: s3n)
 - block-based that looks more like HDFS (schema: s3)

Since you use "s3n" in your URL, each Spark worker seems to treat the file as unsplittable piece of data and downloads it all (though, probably, applies functions to specific regions only). If I understand it right, using "s3" instead will allow Spark workers see data as a sequence of blocks and download each block separately.

But anyway, using s3 Implies loss of data locality, so data will be transferred to workers instead of code being transferred to data. Given data size of 1.2Gb, consider also storing data in Hadoop's HDFS instead of S3 (as far as I remember, Amazon allows using both at the same time).

Please, let us know if it works.

On Sat, Nov 22, 2014 at 6:21 PM, Nitay Joffe <[hidden email]> wrote:
Err I meant #1 :)

- Nitay
Founder & CTO


On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <[hidden email]> wrote:
Anyone have any thoughts on this? Trying to understand especially #2 if it's a legit bug or something I'm doing wrong.

- Nitay
Founder & CTO


On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <[hidden email]> wrote:
I have a simple S3 job to read a text file and do a line count. Specifically I'm doing sc.textFile("s3n://mybucket/myfile").count.The file is about 1.2GB. My setup is standalone spark cluster with 4 workers each with 2 cores / 16GB ram. I'm using branch-1.2 code built against hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).

The whole count is taking on the order of a couple of minutes, which seems extremely slow.
I've been looking into it and so far have noticed two things, hoping the community has seen this before and knows what to do...

1) Every executor seems to make an S3 call to read the entire file before making another call to read just it's split. Here's a paste I've cleaned up to show just one task: http://goo.gl/XCfyZA. I've verified this happens in every task. It is taking a long time (40-50 seconds), I don't see why it is doing this?
2) I've tried a few numPartitions parameters. When I make the parameter anything below 21 it seems to get ignored. Under the hood FileInputFormat is doing something that always ends up with at least 21 partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and have seen that the performance only gets worse as I increase it beyond 21. I would like to try 8 just to see, but again I don't see how to force it to go below 21.

Thanks for the help,
- Nitay
Founder & CTO




Reply | Threaded
Open this post in threaded view
|

Re: Spark S3 Performance

Ashish Rangole
In reply to this post by Nitay Joffe

What makes you think that each executor is reading the whole file? If that is the case then the count value returned to the driver will be actual X NumOfExecutors. Is that the case when compared with actual lines in the input file? If the count returned is same as actual then you probably don't have an extra read problem.

I also see this in your logs which indicates that it is a read that starts from an offset and reading one split size (64MB) worth of data:

14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input split: s3n://mybucket/myfile:335544320+67108864

On Nov 22, 2014 7:23 AM, "Nitay Joffe" <[hidden email]> wrote:
Err I meant #1 :)

- Nitay
Founder & CTO


On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <[hidden email]> wrote:
Anyone have any thoughts on this? Trying to understand especially #2 if it's a legit bug or something I'm doing wrong.

- Nitay
Founder & CTO


On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <[hidden email]> wrote:
I have a simple S3 job to read a text file and do a line count. Specifically I'm doing sc.textFile("s3n://mybucket/myfile").count.The file is about 1.2GB. My setup is standalone spark cluster with 4 workers each with 2 cores / 16GB ram. I'm using branch-1.2 code built against hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).

The whole count is taking on the order of a couple of minutes, which seems extremely slow.
I've been looking into it and so far have noticed two things, hoping the community has seen this before and knows what to do...

1) Every executor seems to make an S3 call to read the entire file before making another call to read just it's split. Here's a paste I've cleaned up to show just one task: http://goo.gl/XCfyZA. I've verified this happens in every task. It is taking a long time (40-50 seconds), I don't see why it is doing this?
2) I've tried a few numPartitions parameters. When I make the parameter anything below 21 it seems to get ignored. Under the hood FileInputFormat is doing something that always ends up with at least 21 partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and have seen that the performance only gets worse as I increase it beyond 21. I would like to try 8 just to see, but again I don't see how to force it to go below 21.

Thanks for the help,
- Nitay
Founder & CTO



Reply | Threaded
Open this post in threaded view
|

Re: Spark S3 Performance

Andrei
Concerning your second question, I believe you try to set number of partitions with something like this:

    rdd = sc.textFile(..., 8)

but things like `textFile()` don't actually take fixed number of partitions. Instead, they expect minimal number of partitions. Since in your file you have 21 blocks of data, it creates exactly 21 worker (which is greater than 8, as expected). To set exact number of partitions, use `repartition()` or its full version - `coalesce()` (see example [1])

[1]: http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#coalesce



On Sat, Nov 22, 2014 at 10:04 PM, Ashish Rangole <[hidden email]> wrote:

What makes you think that each executor is reading the whole file? If that is the case then the count value returned to the driver will be actual X NumOfExecutors. Is that the case when compared with actual lines in the input file? If the count returned is same as actual then you probably don't have an extra read problem.

I also see this in your logs which indicates that it is a read that starts from an offset and reading one split size (64MB) worth of data:

14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input split: s3n://mybucket/myfile:335544320+67108864

On Nov 22, 2014 7:23 AM, "Nitay Joffe" <[hidden email]> wrote:
Err I meant #1 :)

- Nitay
Founder & CTO


On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <[hidden email]> wrote:
Anyone have any thoughts on this? Trying to understand especially #2 if it's a legit bug or something I'm doing wrong.

- Nitay
Founder & CTO


On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <[hidden email]> wrote:
I have a simple S3 job to read a text file and do a line count. Specifically I'm doing sc.textFile("s3n://mybucket/myfile").count.The file is about 1.2GB. My setup is standalone spark cluster with 4 workers each with 2 cores / 16GB ram. I'm using branch-1.2 code built against hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).

The whole count is taking on the order of a couple of minutes, which seems extremely slow.
I've been looking into it and so far have noticed two things, hoping the community has seen this before and knows what to do...

1) Every executor seems to make an S3 call to read the entire file before making another call to read just it's split. Here's a paste I've cleaned up to show just one task: http://goo.gl/XCfyZA. I've verified this happens in every task. It is taking a long time (40-50 seconds), I don't see why it is doing this?
2) I've tried a few numPartitions parameters. When I make the parameter anything below 21 it seems to get ignored. Under the hood FileInputFormat is doing something that always ends up with at least 21 partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and have seen that the performance only gets worse as I increase it beyond 21. I would like to try 8 just to see, but again I don't see how to force it to go below 21.

Thanks for the help,
- Nitay
Founder & CTO




Reply | Threaded
Open this post in threaded view
|

Re: Spark S3 Performance

Nitay Joffe
Andrei, Ashish,

To be clear, I don't think it's *counting* the entire file. It just seems from the logging and the timing that it is doing a get of the entire file, then figures out it only needs some certain blocks, does another get of only the specific block.

Regarding # partitions - I think I see now it has to do with Hadoop's block size being set at 64MB. This is not a big deal to me, the main issue is the first one, why is every worker doing a call to get the entire file followed by the real call to get only the specific partitions it needs.

Best,

- Nitay
Founder & CTO


On Sat, Nov 22, 2014 at 8:28 PM, Andrei <[hidden email]> wrote:
Concerning your second question, I believe you try to set number of partitions with something like this:

    rdd = sc.textFile(..., 8)

but things like `textFile()` don't actually take fixed number of partitions. Instead, they expect minimal number of partitions. Since in your file you have 21 blocks of data, it creates exactly 21 worker (which is greater than 8, as expected). To set exact number of partitions, use `repartition()` or its full version - `coalesce()` (see example [1])

[1]: http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#coalesce



On Sat, Nov 22, 2014 at 10:04 PM, Ashish Rangole <[hidden email]> wrote:

What makes you think that each executor is reading the whole file? If that is the case then the count value returned to the driver will be actual X NumOfExecutors. Is that the case when compared with actual lines in the input file? If the count returned is same as actual then you probably don't have an extra read problem.

I also see this in your logs which indicates that it is a read that starts from an offset and reading one split size (64MB) worth of data:

14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input split: s3n://mybucket/myfile:335544320+67108864

On Nov 22, 2014 7:23 AM, "Nitay Joffe" <[hidden email]> wrote:
Err I meant #1 :)

- Nitay
Founder & CTO


On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <[hidden email]> wrote:
Anyone have any thoughts on this? Trying to understand especially #2 if it's a legit bug or something I'm doing wrong.

- Nitay
Founder & CTO


On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <[hidden email]> wrote:
I have a simple S3 job to read a text file and do a line count. Specifically I'm doing sc.textFile("s3n://mybucket/myfile").count.The file is about 1.2GB. My setup is standalone spark cluster with 4 workers each with 2 cores / 16GB ram. I'm using branch-1.2 code built against hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).

The whole count is taking on the order of a couple of minutes, which seems extremely slow.
I've been looking into it and so far have noticed two things, hoping the community has seen this before and knows what to do...

1) Every executor seems to make an S3 call to read the entire file before making another call to read just it's split. Here's a paste I've cleaned up to show just one task: http://goo.gl/XCfyZA. I've verified this happens in every task. It is taking a long time (40-50 seconds), I don't see why it is doing this?
2) I've tried a few numPartitions parameters. When I make the parameter anything below 21 it seems to get ignored. Under the hood FileInputFormat is doing something that always ends up with at least 21 partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and have seen that the performance only gets worse as I increase it beyond 21. I would like to try 8 just to see, but again I don't see how to force it to go below 21.

Thanks for the help,
- Nitay
Founder & CTO





Reply | Threaded
Open this post in threaded view
|

Re: Spark S3 Performance

Daniil Osipov
Can you verify that its reading the entire file on each worker using network monitoring stats? If it does, that would be a bug in my opinion.

On Mon, Nov 24, 2014 at 2:06 PM, Nitay Joffe <[hidden email]> wrote:
Andrei, Ashish,

To be clear, I don't think it's *counting* the entire file. It just seems from the logging and the timing that it is doing a get of the entire file, then figures out it only needs some certain blocks, does another get of only the specific block.

Regarding # partitions - I think I see now it has to do with Hadoop's block size being set at 64MB. This is not a big deal to me, the main issue is the first one, why is every worker doing a call to get the entire file followed by the real call to get only the specific partitions it needs.

Best,

- Nitay
Founder & CTO


On Sat, Nov 22, 2014 at 8:28 PM, Andrei <[hidden email]> wrote:
Concerning your second question, I believe you try to set number of partitions with something like this:

    rdd = sc.textFile(..., 8)

but things like `textFile()` don't actually take fixed number of partitions. Instead, they expect minimal number of partitions. Since in your file you have 21 blocks of data, it creates exactly 21 worker (which is greater than 8, as expected). To set exact number of partitions, use `repartition()` or its full version - `coalesce()` (see example [1])

[1]: http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#coalesce



On Sat, Nov 22, 2014 at 10:04 PM, Ashish Rangole <[hidden email]> wrote:

What makes you think that each executor is reading the whole file? If that is the case then the count value returned to the driver will be actual X NumOfExecutors. Is that the case when compared with actual lines in the input file? If the count returned is same as actual then you probably don't have an extra read problem.

I also see this in your logs which indicates that it is a read that starts from an offset and reading one split size (64MB) worth of data:

14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input split: s3n://mybucket/myfile:335544320+67108864

On Nov 22, 2014 7:23 AM, "Nitay Joffe" <[hidden email]> wrote:
Err I meant #1 :)

- Nitay
Founder & CTO


On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <[hidden email]> wrote:
Anyone have any thoughts on this? Trying to understand especially #2 if it's a legit bug or something I'm doing wrong.

- Nitay
Founder & CTO


On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <[hidden email]> wrote:
I have a simple S3 job to read a text file and do a line count. Specifically I'm doing sc.textFile("s3n://mybucket/myfile").count.The file is about 1.2GB. My setup is standalone spark cluster with 4 workers each with 2 cores / 16GB ram. I'm using branch-1.2 code built against hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).

The whole count is taking on the order of a couple of minutes, which seems extremely slow.
I've been looking into it and so far have noticed two things, hoping the community has seen this before and knows what to do...

1) Every executor seems to make an S3 call to read the entire file before making another call to read just it's split. Here's a paste I've cleaned up to show just one task: http://goo.gl/XCfyZA. I've verified this happens in every task. It is taking a long time (40-50 seconds), I don't see why it is doing this?
2) I've tried a few numPartitions parameters. When I make the parameter anything below 21 it seems to get ignored. Under the hood FileInputFormat is doing something that always ends up with at least 21 partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and have seen that the performance only gets worse as I increase it beyond 21. I would like to try 8 just to see, but again I don't see how to force it to go below 21.

Thanks for the help,
- Nitay
Founder & CTO