Writing files to s3 with out temporary directory

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing files to s3 with out temporary directory

Sung Jae Lee
This post has NOT been accepted by the mailing list yet.
Hello, all

I was wondering if there is a way I can save RDD object to s3 without creating temporary folder on s3.
What I observe with my job is due to too much files to be saved, it takes significant time on renaming all those files from temporary to real destination in s3.

I've googled about this and found out
1. DirectParquetOutputCommitter is deprecated on spark 2.0.0 - I am not saving as Parquet.
2. Save it to HDFS first then s3distcp - This could work, but I'm questioning if there is more native way to solve the issue.
3. Changing output committer algorithm to 2 - It still does renaming of file one by one. Not directly writing files to destination path.

Thanks in advance for any ideas, suggestions.
Reply | Threaded
Open this post in threaded view
|

Re: Writing files to s3 with out temporary directory

Jim Carroll
I have this exact issue. I was going to intercept the call in the filesystem
if I had to (since we're using the S3 filesystem from Presto anyway) but if
there's simply a way to do this correctly I'd much prefer it. This basically
doubles the time to write parquet files to s3.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Writing files to s3 with out temporary directory

Tayler Lawrence Jones
It is an open issue with Hadoop file committer, not spark. The simple workaround is to write to hdfs then copy to s3. Netflix did a talk about their custom output committer at the last spark summit which is a clever efficient way of doing that - I’d check it out on YouTube. They have open sourced their implementation, but it does not work (out the box) with parquet.

-TJ

On Mon, Nov 20, 2017 at 11:48 Jim Carroll <[hidden email]> wrote:
I have this exact issue. I was going to intercept the call in the filesystem
if I had to (since we're using the S3 filesystem from Presto anyway) but if
there's simply a way to do this correctly I'd much prefer it. This basically
doubles the time to write parquet files to s3.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Writing files to s3 with out temporary directory

lucas.gary@gmail.com
You can expect to see some fixes for this sort of issue in the medium term future (multiple months, probably not years).

As Taylor notes, it's a Hadoop problem, not a spark problem.  So whichever version of hadoop includes the fix will then wait for a spark release to get built against it.  Last I checked they were targeting v3.0 for hadoop.

Other's have listed some middle-ware style fixes which we haven't tried.  We've just been writing to the local FS and then using boto to copy them up.  Our use case has lots of slack in the timeliness though so although we know it's an issue, it's not something that's a serious enough problem to try to fix on our own at this point.  

Gary

On 20 November 2017 at 12:56, Tayler Lawrence Jones <[hidden email]> wrote:
It is an open issue with Hadoop file committer, not spark. The simple workaround is to write to hdfs then copy to s3. Netflix did a talk about their custom output committer at the last spark summit which is a clever efficient way of doing that - I’d check it out on YouTube. They have open sourced their implementation, but it does not work (out the box) with parquet.

-TJ

On Mon, Nov 20, 2017 at 11:48 Jim Carroll <[hidden email]> wrote:
I have this exact issue. I was going to intercept the call in the filesystem
if I had to (since we're using the S3 filesystem from Presto anyway) but if
there's simply a way to do this correctly I'd much prefer it. This basically
doubles the time to write parquet files to s3.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Writing files to s3 with out temporary directory

Jim Carroll
Thanks. In the meantime I might just write a custom file system that maps
writes to parquet file parts to their final locations and then skips the
move.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Writing files to s3 with out temporary directory

lucas.gary@gmail.com
That sounds like allot of work and if I understand you correctly it sounds like a piece of middleware that already exists (I could be wrong).  Alluxio?

Good luck and let us know how it goes!

Gary

On 20 November 2017 at 14:10, Jim Carroll <[hidden email]> wrote:
Thanks. In the meantime I might just write a custom file system that maps
writes to parquet file parts to their final locations and then skips the
move.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Writing files to s3 with out temporary directory

Jim Carroll
This post was updated on .
It's not actually that tough. We already use a custom Hadoop FileSystem for
S3 because when we started using Spark with S3 the native FileSystem was
very unreliable. Our's is based on the code from Presto. (see
https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java
).

I already have a version that introduces a hash to the filename for the file
that's actually written to the S3 to see if it makes a difference per
https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html
. FWIW, it doesn't. I'm going to modify that experiment to override the key
name like before except actually move the file, keep track of the state, and
override the rename method.

The problems with this approach are: 1) it's brittle because it depends on
the internal directory and file naming conventions in Hadoop and Parquet. 2)
It will assume (as seems to be currently the case) that the 'rename' call is
done for all files from the driver. But it should do until there's a better
solution in the Hadoop committer.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org

Reply | Threaded
Open this post in threaded view
|

Re: Writing files to s3 with out temporary directory

Jim Carroll
I got it working. It's much faster.

If someone else wants to try it I:
1) Was already using the code from the Presto S3 Hadoop FileSystem
implementation modified to sever it from the rest of the Presto codebase.
2) I extended it and overrode the method "keyFromPath" so that anytime the
Path referred to a "_temporary" parquet file "part" it returned a "key" to
the final location of the file.
3) I registered the filesystem through sparkContext.hadoopConfiguration by
setting fs.s3.impl, fs.s3n.impl, and fs.s3a.impl.

I realize I'm risking a file corruption but it's WAAAAY faster than it was.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Writing files to s3 with out temporary directory

Haoyuan Li
In reply to this post by lucas.gary@gmail.com
This blog / tutorial maybe helpful to run Spark in the Cloud with Alluxio.

Best regards,

Haoyuan

On Mon, Nov 20, 2017 at 2:12 PM, [hidden email] <[hidden email]> wrote:
That sounds like allot of work and if I understand you correctly it sounds like a piece of middleware that already exists (I could be wrong).  Alluxio?

Good luck and let us know how it goes!

Gary

On 20 November 2017 at 14:10, Jim Carroll <[hidden email]> wrote:
Thanks. In the meantime I might just write a custom file system that maps
writes to parquet file parts to their final locations and then skips the
move.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: Writing files to s3 with out temporary directory

Steve Loughran
In reply to this post by Jim Carroll


Hadoop trunk (i.e 3.1 when it comes out), has the code to do 0-rename commits



if you want to play today, you can build Hadoop trunk & spark master,  + a little glue JAR of mine to get Parquet to play properly




On 21 Nov 2017, at 15:03, Jim Carroll <[hidden email]> wrote:

It's not actually that tough. We already use a custom Hadoop FileSystem for
S3 because when we started using Spark with S3 the native FileSystem was
very unreliable. Our's is based on the code from Presto. (see
https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java
).

I already have a version that introduces a hash to the filename for the file
that's actually written to the S3 to see if it makes a difference per
https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html#get-workload-considerations
. FWIW, it doesn't.

AFAIK, the more the hash appears up the the directory tree, the better it is. The classic partitioned layout here is exactly what y don't want.


I'm going to modify that experiment to override the key
name like before except actually mode the file, keep track of the state, and
override the rename method.


you might find this intersting too  https://arxiv.org/abs/1709.01812 . 

IBM's stocator FS remaps from dest/_temporary/$jobAttemp/$taskAttempt/part-0000 to  a file dest/part-$jobAttempt-$taskAttempt-000

This makes it possible to cleanup failed tasks & jobs; without that on any task failure the entire job needs to be failed. 



The problems with this approach are: 1) it's brittle because it depends on
the internal directory and file naming conventions in Hadoop and Parquet.


They do, but the actual workers have the right to generate files with different names than part-0000.$suffix , stick in summary files, etc. Even: not create files, which is what ORC does when there are no results for that part


2)
It will assume (as seems to be currently the case) that the 'rename' call is
done for all files from the driver.


The first step to the new committers was look at all the code where the old ones were called, including stepping through with a debugger to work out exactly what the two intermingled commit algorithms were up to


But it should do until there's a better
solution in the Hadoop committer.

 

If you are at the stage where you have your own FS implementation, you are probably ready to pick up & play with the new s3a committers.