Iterator over RDD in PySpark

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Iterator over RDD in PySpark

Andrei
Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array.

Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.  
Reply | Threaded
Open this post in threaded view
|

Re: Iterator over RDD in PySpark

Aaron Davidson
rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though.


On Fri, Aug 1, 2014 at 1:38 AM, Andrei <[hidden email]> wrote:
Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array.

Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.  

Reply | Threaded
Open this post in threaded view
|

Re: Iterator over RDD in PySpark

Andrei
Thanks, Aaron, it should be fine with partitions (I can repartition it anyway, right?).
But rdd.toLocalIterator is purely Java/Scala method. Is there Python interface to it?
I can get Java iterator though rdd._jrdd, but it isn't converted to Python iterator automatically. E.g.:

  >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
  >>> it = rdd._jrdd.toLocalIterator()
  >>> next(it)
  14/08/02 01:02:32 INFO SparkContext: Starting job: apply at Iterator.scala:371
  ...
  14/08/02 01:02:32 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.02064317 s
  bytearray(b'\x80\x02K\x01.')

I understand that returned byte array somehow corresponds to actual data, but how can I get it?



On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson <[hidden email]> wrote:
rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though.


On Fri, Aug 1, 2014 at 1:38 AM, Andrei <[hidden email]> wrote:
Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array.

Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.  


Reply | Threaded
Open this post in threaded view
|

Re: Iterator over RDD in PySpark

Aaron Davidson
Ah, that's unfortunate, that definitely should be added. Using a pyspark-internal method, you could try something like

javaIterator = rdd._jrdd.toLocalIterator()
it = rdd._collect_iterator_through_file(javaIterator)


On Fri, Aug 1, 2014 at 3:04 PM, Andrei <[hidden email]> wrote:
Thanks, Aaron, it should be fine with partitions (I can repartition it anyway, right?).
But rdd.toLocalIterator is purely Java/Scala method. Is there Python interface to it?
I can get Java iterator though rdd._jrdd, but it isn't converted to Python iterator automatically. E.g.:

  >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
  >>> it = rdd._jrdd.toLocalIterator()
  >>> next(it)
  14/08/02 01:02:32 INFO SparkContext: Starting job: apply at Iterator.scala:371
  ...
  14/08/02 01:02:32 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.02064317 s
  bytearray(b'\x80\x02K\x01.')

I understand that returned byte array somehow corresponds to actual data, but how can I get it?



On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson <[hidden email]> wrote:
rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though.


On Fri, Aug 1, 2014 at 1:38 AM, Andrei <[hidden email]> wrote:
Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array.

Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.  



Reply | Threaded
Open this post in threaded view
|

Re: Iterator over RDD in PySpark

Andrei
Excellent, thank you!


On Sat, Aug 2, 2014 at 4:46 AM, Aaron Davidson <[hidden email]> wrote:
Ah, that's unfortunate, that definitely should be added. Using a pyspark-internal method, you could try something like

javaIterator = rdd._jrdd.toLocalIterator()
it = rdd._collect_iterator_through_file(javaIterator)


On Fri, Aug 1, 2014 at 3:04 PM, Andrei <[hidden email]> wrote:
Thanks, Aaron, it should be fine with partitions (I can repartition it anyway, right?).
But rdd.toLocalIterator is purely Java/Scala method. Is there Python interface to it?
I can get Java iterator though rdd._jrdd, but it isn't converted to Python iterator automatically. E.g.:

  >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
  >>> it = rdd._jrdd.toLocalIterator()
  >>> next(it)
  14/08/02 01:02:32 INFO SparkContext: Starting job: apply at Iterator.scala:371
  ...
  14/08/02 01:02:32 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.02064317 s
  bytearray(b'\x80\x02K\x01.')

I understand that returned byte array somehow corresponds to actual data, but how can I get it?



On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson <[hidden email]> wrote:
rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though.


On Fri, Aug 1, 2014 at 1:38 AM, Andrei <[hidden email]> wrote:
Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array.

Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.