writing SparkR reducer functions

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

writing SparkR reducer functions

Justin
So i've been struggling with this now for a bit using SparkR.. I can't
even seem to write a basic mean/median function in R that works when
passing it into reduceByKey() for my very simple dataset. I can pass
in R's base function 'sum' and it works just fine. Looking in help
shows that the signature for sum is sum(...)  while mean and median
are mean(x), so that difference is why it's not working -- i just
can't for the life of me write wrappers for mean/median to work with
reduceByKey(). I've pasted in below a take(5) of the RDD i'm trying to
use reduceByKey with to show the structure (its just a list of list).
As well, i also have pasted in the result of doing a groupByKey()
because presumably this is the same data structure the reduceByKey()
gets before it does its reducing right?

FWIW, i've written this function that works fine if run on the results
of a groupByKey() operation so i must be close to what it should be to
work in a reduceByKey():

avg_g <- function(resultFromGroupByKey){
return( lapply( lapply( lapply( resultFromGroupByKey, "[[", 2 ) ,
unlist ) , mean ) )
}

my best guess as to how to convert this to working with reduceByKey is
this below, since it works when i call a single Value from the (K,V)
pair returned from groupByKey(). Unforutnately using it in reduceByKey
results in a java NPE.

avg <- function(x){ return( mean(unlist(x,recursive=FALSE)) ) }


Any help would be appreciated.... and here comes the cut/paste of the
data, and the NPE trace


THE DATA  (take 5, just assume the keys in the whole dataset go from A to Z)

[[1]]
[[1]][[1]]
[1] "A"

[[1]][[2]]
[1] 136


[[2]]
[[2]][[1]]
[1] "A"

[[2]][[2]]
[1] 136


[[3]]
[[3]][[1]]
[1] "A"

[[3]][[2]]
[1] 136


[[4]]
[[4]][[1]]
[1] "A"

[[4]][[2]]
[1] 136


[[5]]
[[5]][[1]]
[1] "A"

[[5]][[2]]
[1] 136



THE DATA AFTER GROUPBYKEY()

[[1]]
[[1]][[1]]
[1] "B"

[[1]][[2]]
[[1]][[2]][[1]]
[1] 136

[[1]][[2]][[2]]
[1] 136

[[1]][[2]][[3]]
[1] 136

[[1]][[2]][[4]]
[1] 136

[[1]][[2]][[5]]
[1] 136

[[1]][[2]][[6]]
[1] 136

[[1]][[2]][[7]]
[1] 136

[[1]][[2]][[8]]
[1] 136





> take( reduceByKey( sparkData_map2, avg, 2L ) , 5 )
Error in (function (x)  : unused argument (136)
Calls: do.call ... FUN -> lapply -> lapply -> FUN -> do.call -> <Anonymous>
Execution halted
14/02/03 16:37:47 ERROR Executor: Exception in task ID 1407
java.lang.NullPointerException
at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
14/02/03 16:37:47 WARN TaskSetManager: Lost TID 1407 (task 1415.0:0)
14/02/03 16:37:47 WARN TaskSetManager: Loss was due to
java.lang.NullPointerException
java.lang.NullPointerException
at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
14/02/03 16:37:47 ERROR TaskSetManager: Task 1415.0:0 failed 1 times;
aborting job
Error in .jcall(rdd@jrdd, "[Ljava/util/List;", "collectPartitions",
.jarray(as.integer(index))) :
  org.apache.spark.SparkException: Job aborted: Task 1415.0:0 failed 1
times (most recent failure: Exception failure:
java.lang.NullPointerException)
Reply | Threaded
Open this post in threaded view
|

Re: writing SparkR reducer functions

Justin
after googling around I realize how ridiculous my question is :(   

being new to Spark, for some reason I thought all of the basic "stats" function were implemented in a first class way out of the box over the mapreduce framework... oops! sorry for the spam :)



On Monday, February 3, 2014, Justin Lent <[hidden email]> wrote:
So i've been struggling with this now for a bit using SparkR.. I can't
even seem to write a basic mean/median function in R that works when
passing it into reduceByKey() for my very simple dataset. I can pass
in R's base function 'sum' and it works just fine. Looking in help
shows that the signature for sum is sum(...)  while mean and median
are mean(x), so that difference is why it's not working -- i just
can't for the life of me write wrappers for mean/median to work with
reduceByKey(). I've pasted in below a take(5) of the RDD i'm trying to
use reduceByKey with to show the structure (its just a list of list).
As well, i also have pasted in the result of doing a groupByKey()
because presumably this is the same data structure the reduceByKey()
gets before it does its reducing right?

FWIW, i've written this function that works fine if run on the results
of a groupByKey() operation so i must be close to what it should be to
work in a reduceByKey():

avg_g <- function(resultFromGroupByKey){
return( lapply( lapply( lapply( resultFromGroupByKey, "[[", 2 ) ,
unlist ) , mean ) )
}

my best guess as to how to convert this to working with reduceByKey is
this below, since it works when i call a single Value from the (K,V)
pair returned from groupByKey(). Unforutnately using it in reduceByKey
results in a java NPE.

avg <- function(x){ return( mean(unlist(x,recursive=FALSE)) ) }


Any help would be appreciated.... and here comes the cut/paste of the
data, and the NPE trace


THE DATA  (take 5, just assume the keys in the whole dataset go from A to Z)

[[1]]
[[1]][[1]]
[1] "A"

[[1]][[2]]
[1] 136


[[2]]
[[2]][[1]]
[1] "A"

[[2]][[2]]
[1] 136


[[3]]
[[3]][[1]]
[1] "A"

[[3]][[2]]
[1] 136


[[4]]
[[4]][[1]]
[1] "A"

[[4]][[2]]
[1] 136


[[5]]
[[5]][[1]]
[1] "A"

[[5]][[2]]
[1] 136



THE DATA AFTER GROUPBYKEY()

[[1]]
[[1]][[1]]
[1] "B"

[[1]][[2]]
[[1]][[2]][[1]]
[1] 136

[[1]][[2]][[2]]
[1] 136

[[1]][[2]][[3]]
[1] 136

[[1]][[2]][[4]]
[1] 136

[[1]][[2]][[5]]
[1] 136

[[1]][[2]][[6]]
[1] 136

[[1]][[2]][[7]]
[1] 136

[[1]][[2]][[8]]
[1] 136





> take( reduceByKey( sparkData_map2, avg, 2L ) , 5 )
Error in (function (x)  : unused argument (136)
Calls: do.call ... FUN -> lapply -> lapply -> FUN -> do.call -> <Anonymous>
Execution halted
14/02/03 16:37:47 ERROR Executor: Exception in task ID 1407
java.lang.NullPointerException
at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
14/02/03 16:37:47 WARN TaskSetManager: Lost TID 1407 (task 1415.0:0)
14/02/03 16:37:47 WARN TaskSetManager: Loss was due to
java.lang.NullPointerException
java.lang.NullPointerException
at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
14/02/03 16:37:47 ERROR TaskSetManager: Task 1415.0:0 failed 1 times;
aborting job
Error in .jcall(rdd@jrdd, "[Ljava/util/List;", "collectPartitions",
.jarray(as.integer(index))) :
  org.apache.spark.SparkException: Job aborted: Task 1415.0:0 failed 1
times (most recent failure: Exception failure:
java.lang.NullPointerException)


--
** Sent from my iPhone *