Using local[N] gets "Too many open files"?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Using local[N] gets "Too many open files"?

Matthew Cheah
Hi everyone,

I'm experimenting with Spark in both a distributed environment and as a multi-threaded local application.

When I set the spark master to local[8] and attempt to read a ~20GB text file on the local file system into an RDD and perform computations on it, I don't get an out of memory error, but rather a "Too many open files" error. Is there a reason why this happens? How aggressively is Spark partitioning the data into intermediate files?

I have also tried splitting the text file into numerous text files - around 100,000 of them - and processing 10,000 of them at a time sequentially. However then Spark seems to get bottlenecked on reading each individual file into the RDD before proceeding with the computation. This has issues even reading 10,000 files at once. I would have thought that Spark could do I/O in parallel with computation, but it seems that Spark does all of the I/O first?

I was wondering if Spark was simply just not built for local applications outside of testing.

Thanks,

-Matt Cheah
Reply | Threaded
Open this post in threaded view
|

Re: Using local[N] gets "Too many open files"?

Aaron Davidson
If you are intentionally opening many files at once and getting that error, then it is a fixable OS issue. Please check out this discussion regarding changing the file limit in /etc/limits.conf: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-td1464.html

If you feel that your job should not be opening so many files at a time, then please give a little more detail about the nature of your job. A few questions bear answering:

Are you using a standard Spark input method, such as sc.textFile()? This should only have one open file per partition per core (so 8 concurrently in your case).

Are you performing any sort of join or shuffle operation? This can create intermediate shuffle or external sorting files. Shuffling an RDD into N partitions will cause us to open N files at a time (per core), so that could be up to 800k in your case. You can reduce this by shuffling your 100k input partitions into many fewer output partitions, assuming that each file is actually small. This can be set as a parameter to any shuffle-inducing operation.

If your job is using external sorting to avoid OOMing (which it will warn you about in the executor logs with messages like "Spilling in-memory map..."), then you may have arbitrarily many files open. This is very unlikely to happen if you've split your input into as many files as you said, though.


On Sun, Feb 16, 2014 at 6:18 PM, Matthew Cheah <[hidden email]> wrote:
Hi everyone,

I'm experimenting with Spark in both a distributed environment and as a multi-threaded local application.

When I set the spark master to local[8] and attempt to read a ~20GB text file on the local file system into an RDD and perform computations on it, I don't get an out of memory error, but rather a "Too many open files" error. Is there a reason why this happens? How aggressively is Spark partitioning the data into intermediate files?

I have also tried splitting the text file into numerous text files - around 100,000 of them - and processing 10,000 of them at a time sequentially. However then Spark seems to get bottlenecked on reading each individual file into the RDD before proceeding with the computation. This has issues even reading 10,000 files at once. I would have thought that Spark could do I/O in parallel with computation, but it seems that Spark does all of the I/O first?

I was wondering if Spark was simply just not built for local applications outside of testing.

Thanks,

-Matt Cheah

Reply | Threaded
Open this post in threaded view
|

Re: Using local[N] gets "Too many open files"?

Matthew Cheah
Thanks!

I'm trying two variants: splitting into lots of files, and using one big file.

For the one big file, I get the "too many open files" error. I'm using sparkcontext.textFile() to obtain the initial RDD. I definitely have (even several) reduceByKey() calls, this is probably the only shuffle-map task I'm doing (everything else is map, filter, and flatMap)..

This is a CSV containing smart meter data, each row containing a house identifier, a time of day, the temperature at that time in that home, and the usage in kilowatt-hours in that hour by that house. There are many homes, so the RDD is keyed by home id and reduceByKey() is done often. I'm performing some linear regressions and trying to compute the regression for all homes at the same time.

When I'm using multiple files, it's split such that a single home's hourly usage is stored in a file. When I run against this data format, a single directory with around 100k CSVs, Spark doesn't crash but rather it just takes a long time to read in the data.

Hope this helps. Let me know if I should provide any more context.


On Sun, Feb 16, 2014 at 10:38 PM, Aaron Davidson <[hidden email]> wrote:
If you are intentionally opening many files at once and getting that error, then it is a fixable OS issue. Please check out this discussion regarding changing the file limit in /etc/limits.conf: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-td1464.html

If you feel that your job should not be opening so many files at a time, then please give a little more detail about the nature of your job. A few questions bear answering:

Are you using a standard Spark input method, such as sc.textFile()? This should only have one open file per partition per core (so 8 concurrently in your case).

Are you performing any sort of join or shuffle operation? This can create intermediate shuffle or external sorting files. Shuffling an RDD into N partitions will cause us to open N files at a time (per core), so that could be up to 800k in your case. You can reduce this by shuffling your 100k input partitions into many fewer output partitions, assuming that each file is actually small. This can be set as a parameter to any shuffle-inducing operation.

If your job is using external sorting to avoid OOMing (which it will warn you about in the executor logs with messages like "Spilling in-memory map..."), then you may have arbitrarily many files open. This is very unlikely to happen if you've split your input into as many files as you said, though.


On Sun, Feb 16, 2014 at 6:18 PM, Matthew Cheah <[hidden email]> wrote:
Hi everyone,

I'm experimenting with Spark in both a distributed environment and as a multi-threaded local application.

When I set the spark master to local[8] and attempt to read a ~20GB text file on the local file system into an RDD and perform computations on it, I don't get an out of memory error, but rather a "Too many open files" error. Is there a reason why this happens? How aggressively is Spark partitioning the data into intermediate files?

I have also tried splitting the text file into numerous text files - around 100,000 of them - and processing 10,000 of them at a time sequentially. However then Spark seems to get bottlenecked on reading each individual file into the RDD before proceeding with the computation. This has issues even reading 10,000 files at once. I would have thought that Spark could do I/O in parallel with computation, but it seems that Spark does all of the I/O first?

I was wondering if Spark was simply just not built for local applications outside of testing.

Thanks,

-Matt Cheah