Temp hdfs files picked up by textFileStream dstream

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Temp hdfs files picked up by textFileStream dstream

Chris Regnier
Hello,

I have a spark streaming textFileStream watching a hdfs folder for new
csv files every minute. I must've gotten the timing just right since I
copied in 5 new csv files ("part-0004" was one of them) into the watched
folder and then got an IO exception about "part-0004._COPYING_" path
does not exist as the dstream kicked off a job. It looks like 'hdfs dfs
put' command creates a temp *._COPYING_ file first which got picked up
as one of the new files during the scan process, but by the time it
tried to read in the data the temp file had already been removed, thus
giving a path does not exist error.

To get around this I expect I'll have to use fileStream instead and pass
in a filter function on filenames, but I don't suppose anyone has
already created a filter that weeds out other common temp files that I
haven't run into yet?

On another note, this problem seems to make the textFileStream dstream
error prone, if not useless as is. Perhaps a file filter should be a
required parameter to create one, in which case some defaults would also
be nice.


Chris Regnier
-------------------------
Visualization Developer
Oculus Info Inc.
Reply | Threaded
Open this post in threaded view
|

Re: Temp hdfs files picked up by textFileStream dstream

Patrick Wendell
Hey Chris,

This is a good point. The textFileStream actually assumes that files
are created atomically via a move operation. The reason why hdfs put
creates a temporary file is because "move" is the only way to
atomically create a file in HDFS. So in your case yep, you'll want to
use a filter.

Default filters would be nice. It's a bit tricky because there are
many conventions for how people deal with this in HDFS. One is to have
a _COPYING_ file and move it to the final name. Another is to have a
suffix and rename files with something like XX_COMPLETED once they
finish. It depends a lot on the application so we just take a filter
and you can define it however you want.

The no-filter constructor is not necessarily useless because some
people create spooling directories in HDFS into which they only move
files.

All this needs to be much more clear in the documentation, and in the
master branch (and coming in 0.9) we've updated the documentation to
make it more clear what assumptions are made by the input stream.
Thanks for bringing it up.

- Patrick

On Mon, Jan 6, 2014 at 1:16 PM, Chris Regnier <[hidden email]> wrote:

> Hello,
>
> I have a spark streaming textFileStream watching a hdfs folder for new
> csv files every minute. I must've gotten the timing just right since I
> copied in 5 new csv files ("part-0004" was one of them) into the watched
> folder and then got an IO exception about "part-0004._COPYING_" path
> does not exist as the dstream kicked off a job. It looks like 'hdfs dfs
> put' command creates a temp *._COPYING_ file first which got picked up
> as one of the new files during the scan process, but by the time it
> tried to read in the data the temp file had already been removed, thus
> giving a path does not exist error.
>
> To get around this I expect I'll have to use fileStream instead and pass
> in a filter function on filenames, but I don't suppose anyone has
> already created a filter that weeds out other common temp files that I
> haven't run into yet?
>
> On another note, this problem seems to make the textFileStream dstream
> error prone, if not useless as is. Perhaps a file filter should be a
> required parameter to create one, in which case some defaults would also
> be nice.
>
>
> Chris Regnier
> -------------------------
> Visualization Developer
> Oculus Info Inc.