Custom RDD gets HadoopSplit for compute() call

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom RDD gets HadoopSplit for compute() call

Dmitriy Lyubimov
Hello, 

I have this weird error coming up that i am really at loss to explain. 

I have defined a custom RDD ( reading from a 3rd party store, a really simple one). In a same code, same test running locally (0.8.0) it occasionally receives a compute() call with a HadoopSplit split (instead of its native split). Naturally, it fails on the cast call. 

I don't see any fundamental difference between my code and HadoopRDD or JdbcRDD.

I managed to reproduce this with the stack frame below with local task scheduler. But i am really at loss to explain what's going on here.



  • "pool-2-thread-3"@8,430 in group "main": RUNNING
  • org$hbl$whale$plan$spark$HblRDD$$toHblPartition():41, HblRDD$ {org.hbl.whale.plan.spark}
  • compute():19, HblRDD {org.hbl.whale.plan.spark}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • getOrCompute():70, CacheManager {org.apache.spark}
  • iterator():224, RDD {org.apache.spark.rdd}
  • compute():32, FilteredRDD {org.apache.spark.rdd}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • iterator():226, RDD {org.apache.spark.rdd}
  • compute():29, MappedRDD {org.apache.spark.rdd}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • iterator():226, RDD {org.apache.spark.rdd}
  • compute():36, MapPartitionsRDD {org.apache.spark.rdd}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • iterator():226, RDD {org.apache.spark.rdd}
  • run():149, ShuffleMapTask {org.apache.spark.scheduler}
  • run():88, ShuffleMapTask {org.apache.spark.scheduler}
  • runTask():198, LocalScheduler {org.apache.spark.scheduler.local}
  • run():68, LocalActor$$anonfun$launchTask$1$$anon$1 {org.apache.spark.scheduler.local}
  • call():471, Executors$RunnableAdapter {java.util.concurrent}
  • run():262, FutureTask {java.util.concurrent}
  • runWorker():1145, ThreadPoolExecutor {java.util.concurrent}
  • run():615, ThreadPoolExecutor$Worker {java.util.concurrent}
  • run():744, Thread {java.lang} 
Thank you in advance for your help.
-Dmitriy
Reply | Threaded
Open this post in threaded view
|

Re: Custom RDD gets HadoopSplit for compute() call

Dmitriy Lyubimov
PS the file it is traing to read from seems to be ./indata1392953916265:0+434205 which seems to be a sequence file of (Int, BytesWritable). 
So it looks like it is some sort of a shuffle spill. 

However, why a custom RDD is supposed to support reads from a shuffle spill? any ideas are very much appreciated.
-Dmitriy


On Thu, Feb 20, 2014 at 7:49 PM, Dmitriy Lyubimov <[hidden email]> wrote:
Hello, 

I have this weird error coming up that i am really at loss to explain. 

I have defined a custom RDD ( reading from a 3rd party store, a really simple one). In a same code, same test running locally (0.8.0) it occasionally receives a compute() call with a HadoopSplit split (instead of its native split). Naturally, it fails on the cast call. 

I don't see any fundamental difference between my code and HadoopRDD or JdbcRDD.

I managed to reproduce this with the stack frame below with local task scheduler. But i am really at loss to explain what's going on here.



  • "pool-2-thread-3"@8,430 in group "main": RUNNING
  • org$hbl$whale$plan$spark$HblRDD$$toHblPartition():41, HblRDD$ {org.hbl.whale.plan.spark}
  • compute():19, HblRDD {org.hbl.whale.plan.spark}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • getOrCompute():70, CacheManager {org.apache.spark}
  • iterator():224, RDD {org.apache.spark.rdd}
  • compute():32, FilteredRDD {org.apache.spark.rdd}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • iterator():226, RDD {org.apache.spark.rdd}
  • compute():29, MappedRDD {org.apache.spark.rdd}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • iterator():226, RDD {org.apache.spark.rdd}
  • compute():36, MapPartitionsRDD {org.apache.spark.rdd}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • iterator():226, RDD {org.apache.spark.rdd}
  • run():149, ShuffleMapTask {org.apache.spark.scheduler}
  • run():88, ShuffleMapTask {org.apache.spark.scheduler}
  • runTask():198, LocalScheduler {org.apache.spark.scheduler.local}
  • run():68, LocalActor$$anonfun$launchTask$1$$anon$1 {org.apache.spark.scheduler.local}
  • call():471, Executors$RunnableAdapter {java.util.concurrent}
  • run():262, FutureTask {java.util.concurrent}
  • runWorker():1145, ThreadPoolExecutor {java.util.concurrent}
  • run():615, ThreadPoolExecutor$Worker {java.util.concurrent}
  • run():744, Thread {java.lang} 
Thank you in advance for your help.
-Dmitriy

Reply | Threaded
Open this post in threaded view
|

Re: Custom RDD gets HadoopSplit for compute() call

Dmitriy Lyubimov
ah, no worries. It seems another engineer created multiple spark sessions which created those weird race conditions.


On Thu, Feb 20, 2014 at 7:57 PM, Dmitriy Lyubimov <[hidden email]> wrote:
PS the file it is traing to read from seems to be ./indata1392953916265:0+434205 which seems to be a sequence file of (Int, BytesWritable). 
So it looks like it is some sort of a shuffle spill. 

However, why a custom RDD is supposed to support reads from a shuffle spill? any ideas are very much appreciated.
-Dmitriy


On Thu, Feb 20, 2014 at 7:49 PM, Dmitriy Lyubimov <[hidden email]> wrote:
Hello, 

I have this weird error coming up that i am really at loss to explain. 

I have defined a custom RDD ( reading from a 3rd party store, a really simple one). In a same code, same test running locally (0.8.0) it occasionally receives a compute() call with a HadoopSplit split (instead of its native split). Naturally, it fails on the cast call. 

I don't see any fundamental difference between my code and HadoopRDD or JdbcRDD.

I managed to reproduce this with the stack frame below with local task scheduler. But i am really at loss to explain what's going on here.



  • "pool-2-thread-3"@8,430 in group "main": RUNNING
  • org$hbl$whale$plan$spark$HblRDD$$toHblPartition():41, HblRDD$ {org.hbl.whale.plan.spark}
  • compute():19, HblRDD {org.hbl.whale.plan.spark}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • getOrCompute():70, CacheManager {org.apache.spark}
  • iterator():224, RDD {org.apache.spark.rdd}
  • compute():32, FilteredRDD {org.apache.spark.rdd}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • iterator():226, RDD {org.apache.spark.rdd}
  • compute():29, MappedRDD {org.apache.spark.rdd}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • iterator():226, RDD {org.apache.spark.rdd}
  • compute():36, MapPartitionsRDD {org.apache.spark.rdd}
  • computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
  • iterator():226, RDD {org.apache.spark.rdd}
  • run():149, ShuffleMapTask {org.apache.spark.scheduler}
  • run():88, ShuffleMapTask {org.apache.spark.scheduler}
  • runTask():198, LocalScheduler {org.apache.spark.scheduler.local}
  • run():68, LocalActor$$anonfun$launchTask$1$$anon$1 {org.apache.spark.scheduler.local}
  • call():471, Executors$RunnableAdapter {java.util.concurrent}
  • run():262, FutureTask {java.util.concurrent}
  • runWorker():1145, ThreadPoolExecutor {java.util.concurrent}
  • run():615, ThreadPoolExecutor$Worker {java.util.concurrent}
  • run():744, Thread {java.lang} 
Thank you in advance for your help.
-Dmitriy