Strategies for reading large numbers of files

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Strategies for reading large numbers of files

Landon Kuhn
Hello, I'm trying to use Spark to process a large number of files in S3. I'm running into an issue that I believe is related to the high number of files, and the resources required to build the listing within the driver program. If anyone in the Spark community can provide insight or guidance, it would be greatly appreciated.

The task at hand is to read ~100 million files stored in S3, and repartition the data into a sensible number of files (perhaps 1,000). The files are organized in a directory structure like so:

    s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name

(Note that each file is very small, containing 1-10 records each. Unfortunately this is an artifact of the upstream systems that put data in S3.)

My Spark program is simple, and works when I target a relatively specific subdirectory. For example:

    sparkContext.textFile("s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)

This targets 1 hour's worth of purchase records, containing about 10,000 files. The driver program blocks (I assume it is making S3 calls to traverse the directories), and during this time no activity is visible in the driver UI. After about a minute, the stages and tasks allocate in the UI, and then everything progresses and completes within a few minutes.

I need to process all the data (several year's worth). Something like:

  sparkContext.textFile("s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)

This blocks "forever" (I have only run the program for as long as overnight). The stages and tasks never appear in the UI. I assume Spark is building the file listing, which will either take too long and/or cause the driver to eventually run out of memory.

I would appreciate any comments or suggestions. I'm happy to provide more information if that would be helpful.

Thanks

Landon

Reply | Threaded
Open this post in threaded view
|

Re: Strategies for reading large numbers of files

Nick Chammas
I believe this is known as the "Hadoop Small Files Problem", and it affects Spark as well. The best approach I've seen to merging small files like this is by using s3distcp, as suggested here, as a pre-processing step.

It would be great if Spark could somehow handle this common situation out of the box, but for now I don't think it does.

Nick

On Thu, Oct 2, 2014 at 7:10 PM, Landon Kuhn <[hidden email]> wrote:
Hello, I'm trying to use Spark to process a large number of files in S3. I'm running into an issue that I believe is related to the high number of files, and the resources required to build the listing within the driver program. If anyone in the Spark community can provide insight or guidance, it would be greatly appreciated.

The task at hand is to read ~100 million files stored in S3, and repartition the data into a sensible number of files (perhaps 1,000). The files are organized in a directory structure like so:

    s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name

(Note that each file is very small, containing 1-10 records each. Unfortunately this is an artifact of the upstream systems that put data in S3.)

My Spark program is simple, and works when I target a relatively specific subdirectory. For example:

    sparkContext.textFile("s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)

This targets 1 hour's worth of purchase records, containing about 10,000 files. The driver program blocks (I assume it is making S3 calls to traverse the directories), and during this time no activity is visible in the driver UI. After about a minute, the stages and tasks allocate in the UI, and then everything progresses and completes within a few minutes.

I need to process all the data (several year's worth). Something like:

  sparkContext.textFile("s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)

This blocks "forever" (I have only run the program for as long as overnight). The stages and tasks never appear in the UI. I assume Spark is building the file listing, which will either take too long and/or cause the driver to eventually run out of memory.

I would appreciate any comments or suggestions. I'm happy to provide more information if that would be helpful.

Thanks

Landon


Reply | Threaded
Open this post in threaded view
|

Re: Strategies for reading large numbers of files

Landon Kuhn
Nicholas, thanks for the tip. Your suggestion certainly seemed like the right approach, but after a few days of fiddling I've come to the conclusion that s3distcp will not work for my use case. It is unable to flatten directory hierarchies, which I need because my source directories contain hour/minute/second parts.

See https://forums.aws.amazon.com/message.jspa?messageID=478960. It seems that s3distcp can only combine files in the same path.

Thanks again. That gave me a lot to go on. Any further suggestions?

L


On Thu, Oct 2, 2014 at 4:15 PM, Nicholas Chammas <[hidden email]> wrote:
I believe this is known as the "Hadoop Small Files Problem", and it affects Spark as well. The best approach I've seen to merging small files like this is by using s3distcp, as suggested here, as a pre-processing step.

It would be great if Spark could somehow handle this common situation out of the box, but for now I don't think it does.

Nick

On Thu, Oct 2, 2014 at 7:10 PM, Landon Kuhn <[hidden email]> wrote:
Hello, I'm trying to use Spark to process a large number of files in S3. I'm running into an issue that I believe is related to the high number of files, and the resources required to build the listing within the driver program. If anyone in the Spark community can provide insight or guidance, it would be greatly appreciated.

The task at hand is to read ~100 million files stored in S3, and repartition the data into a sensible number of files (perhaps 1,000). The files are organized in a directory structure like so:

    s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name

(Note that each file is very small, containing 1-10 records each. Unfortunately this is an artifact of the upstream systems that put data in S3.)

My Spark program is simple, and works when I target a relatively specific subdirectory. For example:

    sparkContext.textFile("s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)

This targets 1 hour's worth of purchase records, containing about 10,000 files. The driver program blocks (I assume it is making S3 calls to traverse the directories), and during this time no activity is visible in the driver UI. After about a minute, the stages and tasks allocate in the UI, and then everything progresses and completes within a few minutes.

I need to process all the data (several year's worth). Something like:

  sparkContext.textFile("s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)

This blocks "forever" (I have only run the program for as long as overnight). The stages and tasks never appear in the UI. I assume Spark is building the file listing, which will either take too long and/or cause the driver to eventually run out of memory.

I would appreciate any comments or suggestions. I'm happy to provide more information if that would be helpful.

Thanks

Landon





--
Landon KuhnSoftware ArchitectJanrain, Inc.
E: [hidden email] | M: 971-645-5501 | F: 888-267-9025
Follow Janrain: Facebook | Twitter | YouTube | LinkedIn | Blog
Follow Me: LinkedIn
-------------------------------------------------------------------------------------
Acquire, understand, and engage your users. Watch our video or sign up for a live demo to see what it's all about.
Reply | Threaded
Open this post in threaded view
|

Re: Strategies for reading large numbers of files

Nick Chammas
Unfortunately not. Again, I wonder if adding support targeted at this "small files problem" would make sense for Spark core, as it is a common problem in our space.

Right now, I don't know of any other options.

Nick


On Mon, Oct 6, 2014 at 2:24 PM, Landon Kuhn <[hidden email]> wrote:
Nicholas, thanks for the tip. Your suggestion certainly seemed like the right approach, but after a few days of fiddling I've come to the conclusion that s3distcp will not work for my use case. It is unable to flatten directory hierarchies, which I need because my source directories contain hour/minute/second parts.

See https://forums.aws.amazon.com/message.jspa?messageID=478960. It seems that s3distcp can only combine files in the same path.

Thanks again. That gave me a lot to go on. Any further suggestions?

L


On Thu, Oct 2, 2014 at 4:15 PM, Nicholas Chammas <[hidden email]> wrote:
I believe this is known as the "Hadoop Small Files Problem", and it affects Spark as well. The best approach I've seen to merging small files like this is by using s3distcp, as suggested here, as a pre-processing step.

It would be great if Spark could somehow handle this common situation out of the box, but for now I don't think it does.

Nick

On Thu, Oct 2, 2014 at 7:10 PM, Landon Kuhn <[hidden email]> wrote:
Hello, I'm trying to use Spark to process a large number of files in S3. I'm running into an issue that I believe is related to the high number of files, and the resources required to build the listing within the driver program. If anyone in the Spark community can provide insight or guidance, it would be greatly appreciated.

The task at hand is to read ~100 million files stored in S3, and repartition the data into a sensible number of files (perhaps 1,000). The files are organized in a directory structure like so:

    s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name

(Note that each file is very small, containing 1-10 records each. Unfortunately this is an artifact of the upstream systems that put data in S3.)

My Spark program is simple, and works when I target a relatively specific subdirectory. For example:

    sparkContext.textFile("s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)

This targets 1 hour's worth of purchase records, containing about 10,000 files. The driver program blocks (I assume it is making S3 calls to traverse the directories), and during this time no activity is visible in the driver UI. After about a minute, the stages and tasks allocate in the UI, and then everything progresses and completes within a few minutes.

I need to process all the data (several year's worth). Something like:

  sparkContext.textFile("s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)

This blocks "forever" (I have only run the program for as long as overnight). The stages and tasks never appear in the UI. I assume Spark is building the file listing, which will either take too long and/or cause the driver to eventually run out of memory.

I would appreciate any comments or suggestions. I'm happy to provide more information if that would be helpful.

Thanks

Landon





--
Landon KuhnSoftware ArchitectJanrain, Inc.
E: [hidden email] | M: <a href="tel:971-645-5501" value="+19716455501" target="_blank">971-645-5501 | F: <a href="tel:888-267-9025" value="+18882679025" target="_blank">888-267-9025
Follow Janrain: Facebook | Twitter | YouTube | LinkedIn | Blog
Follow Me: LinkedIn
-------------------------------------------------------------------------------------
Acquire, understand, and engage your users. Watch our video or sign up for a live demo to see what it's all about.

Reply | Threaded
Open this post in threaded view
|

Re: Strategies for reading large numbers of files

Matei Zaharia
Administrator
The problem is that listing the metadata for all these files in S3 takes a long time. Something you can try is the following: split your files into several non-overlapping paths (e.g. <a href="s3n://bucket/purchase/2014/01">s3n://bucket/purchase/2014/01, <a href="s3n://bucket/purchase/2014/02">s3n://bucket/purchase/2014/02, etc), then do sc.parallelize over a list of such path, and in each task use a single-node S3 library to list the contents of that directory only and read them. You can use Hadoop's FileSystem class for example (FileSystem.open("s3n://...") or something like that). That way more nodes will be querying the metadata for these in parallel.

Matei

On Oct 6, 2014, at 12:59 PM, Nicholas Chammas <[hidden email]> wrote:

Unfortunately not. Again, I wonder if adding support targeted at this "small files problem" would make sense for Spark core, as it is a common problem in our space.

Right now, I don't know of any other options.

Nick


On Mon, Oct 6, 2014 at 2:24 PM, Landon Kuhn <[hidden email]> wrote:
Nicholas, thanks for the tip. Your suggestion certainly seemed like the right approach, but after a few days of fiddling I've come to the conclusion that s3distcp will not work for my use case. It is unable to flatten directory hierarchies, which I need because my source directories contain hour/minute/second parts.

See https://forums.aws.amazon.com/message.jspa?messageID=478960. It seems that s3distcp can only combine files in the same path.

Thanks again. That gave me a lot to go on. Any further suggestions?

L


On Thu, Oct 2, 2014 at 4:15 PM, Nicholas Chammas <[hidden email]> wrote:
I believe this is known as the "Hadoop Small Files Problem", and it affects Spark as well. The best approach I've seen to merging small files like this is by using s3distcp, as suggested here, as a pre-processing step.

It would be great if Spark could somehow handle this common situation out of the box, but for now I don't think it does.

Nick

On Thu, Oct 2, 2014 at 7:10 PM, Landon Kuhn <[hidden email]> wrote:
Hello, I'm trying to use Spark to process a large number of files in S3. I'm running into an issue that I believe is related to the high number of files, and the resources required to build the listing within the driver program. If anyone in the Spark community can provide insight or guidance, it would be greatly appreciated.

The task at hand is to read ~100 million files stored in S3, and repartition the data into a sensible number of files (perhaps 1,000). The files are organized in a directory structure like so:

    <a href="s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name">s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name

(Note that each file is very small, containing 1-10 records each. Unfortunately this is an artifact of the upstream systems that put data in S3.)

My Spark program is simple, and works when I target a relatively specific subdirectory. For example:

    sparkContext.textFile("<a href="s3n://bucket/purchase/2014/01/01/00/*/*/*/*">s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)

This targets 1 hour's worth of purchase records, containing about 10,000 files. The driver program blocks (I assume it is making S3 calls to traverse the directories), and during this time no activity is visible in the driver UI. After about a minute, the stages and tasks allocate in the UI, and then everything progresses and completes within a few minutes.

I need to process all the data (several year's worth). Something like:

  sparkContext.textFile("<a href="s3n://bucket/*/*/*/*/*/*/*/*/*">s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)

This blocks "forever" (I have only run the program for as long as overnight). The stages and tasks never appear in the UI. I assume Spark is building the file listing, which will either take too long and/or cause the driver to eventually run out of memory.

I would appreciate any comments or suggestions. I'm happy to provide more information if that would be helpful.

Thanks

Landon





--
Landon KuhnSoftware ArchitectJanrain, Inc.
E: [hidden email] | M: <a href="tel:971-645-5501" value="+19716455501" target="_blank">971-645-5501 | F: <a href="tel:888-267-9025" value="+18882679025" target="_blank">888-267-9025
Follow Janrain: Facebook | Twitter | YouTube | LinkedIn | Blog
Follow Me: LinkedIn
-------------------------------------------------------------------------------------
Acquire, understand, and engage your users. Watch our video or sign up for a live demo to see what it's all about.


Reply | Threaded
Open this post in threaded view
|

Re: Strategies for reading large numbers of files

deenar.toraskar
Hi Landon

I had a problem very similar to your, where we have to process around 5 million relatively small files on NFS. After trying various options, we did something similar to what Matei suggested.

1) take the original path and find the subdirectories under that path and then parallelize the resulting list. you can configure the depth you want to go down to before sending the paths across the cluster.

  def getFileList(srcDir:File, depth:Int) : List[File] = {
    var list : ListBuffer[File] = new ListBuffer[File]()
    if (srcDir.isDirectory()) {
    srcDir.listFiles() .foreach((file: File) =>
       if (file.isFile()) {
          list +=(file)
       } else {
          if (depth > 0 ) {
             list ++= getFileList(file, (depth- 1 ))
          }
   else if (depth < 0) {
        list ++= getFileList(file, (depth))
          }
       else {
          list += file
       }
    })
    }
    else {
       list += srcDir
    }
    list .toList
  }

Reply | Threaded
Open this post in threaded view
|

Re: Strategies for reading large numbers of files

Landon Kuhn
Thanks to folks here for the suggestions. I ended up settling on what seems to be a simple and scalable approach. I am no longer using sparkContext.textFiles with wildcards (it is too slow when working with a large number of files). Instead, I have implemented directory traversal as a Spark job, which enables it to parallelize across the cluster.

First, a couple of functions. One to traverse directories, and another to get the lines in a file:

  def list_file_names(path: String): Seq[String] = {
    val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration)
    def f(path: Path): Seq[String] = {
      Option(fs.listStatus(path)).getOrElse(Array[FileStatus]()).
      flatMap {
        case fileStatus if fileStatus.isDir ⇒ f(fileStatus.getPath)
        case fileStatus ⇒ Seq(fileStatus.getPath.toString)
      }
    }
    f(new Path(path))
  }

  def read_log_file(path: String): Seq[String] = {
    val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration)
    val file = fs.open(new Path(path))
    val source = Source.fromInputStream(file)
    source.getLines.toList
  }

Next, I generate a list of "root" paths to scan:

  val paths =
    for {
      record_type ← record_types
      year ← years
      month ← months
      day ← days
      hour ← hours
    } yield s"s3n://s3-bucket-name/$record_type/$year/$month/$day/$hour/"
  }

(In this case, I generate one path per hour per record type.)

Finally, using Spark, I can build an RDD with the contents of every file in the path list:

val rdd: RDD[String] =
    sparkContext.
    parallelize(paths, paths.size).
    flatMap(list_file_names).
    flatMap(read_log_file)

I am posting this info here with the hope that it will be useful to somebody in the future.

L


On Tue, Oct 7, 2014 at 12:58 AM, deenar.toraskar <[hidden email]> wrote:
Hi Landon

I had a problem very similar to your, where we have to process around 5
million relatively small files on NFS. After trying various options, we did
something similar to what Matei suggested.

1) take the original path and find the subdirectories under that path and
then parallelize the resulting list. you can configure the depth you want to
go down to before sending the paths across the cluster.

  def getFileList(srcDir:File, depth:Int) : List[File] = {
    var list : ListBuffer[File] = new ListBuffer[File]()
    if (srcDir.isDirectory()) {
    srcDir.listFiles() .foreach((file: File) =>
       if (file.isFile()) {
          list +=(file)
       } else {
          if (depth > 0 ) {
             list ++= getFileList(file, (depth- 1 ))
          }
   else if (depth < 0) {
        list ++= getFileList(file, (depth))
          }
       else {
          list += file
       }
    })
    }
    else {
       list += srcDir
    }
    list .toList
  }





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-tp15644p15835.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]




--
Landon KuhnSoftware ArchitectJanrain, Inc.
E: [hidden email] | M: 971-645-5501 | F: 888-267-9025
Follow Janrain: Facebook | Twitter | YouTube | LinkedIn | Blog
Follow Me: LinkedIn
-------------------------------------------------------------------------------------
Acquire, understand, and engage your users. Watch our video or sign up for a live demo to see what it's all about.
Reply | Threaded
Open this post in threaded view
|

Re: Strategies for reading large numbers of files

soojin
Hi Landon,

I tried this but it didn't work for me. I get Task not serializable exception:
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration

How do you make org.apache.hadoop.conf.Configuration hadoopConfiguration available to tasks?
Reply | Threaded
Open this post in threaded view
|

Re: Strategies for reading large numbers of files

guest
This post has NOT been accepted by the mailing list yet.
In reply to this post by Landon Kuhn
Hi Landon,

I am trying to use your solution as I had exactly your same problem when using too many files.
But I ended up with the same error as soojin: java.io.NotSerializableException: org.apache.hadoop.fs.Path

Do you have a solution for this?

Thanks!