ship MatrixFactorizationModel with each partition?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

ship MatrixFactorizationModel with each partition?

Nan Zhu
Hi, all

I ‘m trying the ALS in mllib

the following is my code

val result = als.run(ratingRDD)
    val allMovies = ratingRDD.map(rating => rating.product).distinct()
    val allUsers = ratingRDD.map(rating => rating.user).distinct()
    val allUserMoviePair = allUsers.cartesian(allMovies)
    val resultRDD = allUserMoviePair.map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })


every time result.predict throws exception like 

scala.MatchError: null
	at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507)
	at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
	at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72)
	at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69)
	at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
	at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
	at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
	at org.apache.spark.scheduler.Task.run(Task.scala:53)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:724)

if I change the code to pull the partitions into an array in the driver program, it works

val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })

so the exception seems to be related to how to share the MatrixFactorizationModel  in each partition?

can anyone give me the hint

Thank you very much!

-- 
Nan Zhu

Reply | Threaded
Open this post in threaded view
|

Re: ship MatrixFactorizationModel with each partition?

Matei Zaharia
Administrator
Sorry, you actually can’t call predict() on the cluster because the model contains some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files. You can grab the code from that method there (which does a join) and call that yourself in Spark 0.8.x.

Matei

On Jan 7, 2014, at 10:23 PM, Nan Zhu <[hidden email]> wrote:

Hi, all

I ‘m trying the ALS in mllib

the following is my code

val result = als.run(ratingRDD)
    val allMovies = ratingRDD.map(rating => rating.product).distinct()
    val allUsers = ratingRDD.map(rating => rating.user).distinct()
    val allUserMoviePair = allUsers.cartesian(allMovies)
    val resultRDD = allUserMoviePair.map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })


every time result.predict throws exception like 

scala.MatchError: null
	at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507)
	at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
	at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72)
	at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69)
	at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
	at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
	at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
	at org.apache.spark.scheduler.Task.run(Task.scala:53)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:724)

if I change the code to pull the partitions into an array in the driver program, it works

val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })

so the exception seems to be related to how to share the MatrixFactorizationModel  in each partition?

can anyone give me the hint

Thank you very much!

-- 
Nan Zhu


Reply | Threaded
Open this post in threaded view
|

Re: ship MatrixFactorizationModel with each partition?

Nan Zhu
great

thank you Matei

-- 
Nan Zhu


On Wednesday, January 8, 2014 at 12:33 AM, Matei Zaharia wrote:

Sorry, you actually can’t call predict() on the cluster because the model contains some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files. You can grab the code from that method there (which does a join) and call that yourself in Spark 0.8.x.

Matei

On Jan 7, 2014, at 10:23 PM, Nan Zhu <[hidden email]> wrote:

Hi, all

I ‘m trying the ALS in mllib

the following is my code

val result = als.run(ratingRDD)
    val allMovies = ratingRDD.map(rating => rating.product).distinct()
    val allUsers = ratingRDD.map(rating => rating.user).distinct()
    val allUserMoviePair = allUsers.cartesian(allMovies)
    val resultRDD = allUserMoviePair.map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })


every time result.predict throws exception like 

scala.MatchError: null
	at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507)
	at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
	at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72)
	at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69)
	at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
	at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
	at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
	at org.apache.spark.scheduler.Task.run(Task.scala:53)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:724)

if I change the code to pull the partitions into an array in the driver program, it works

val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })

so the exception seems to be related to how to share the MatrixFactorizationModel  in each partition?

can anyone give me the hint

Thank you very much!

-- 
Nan Zhu



Reply | Threaded
Open this post in threaded view
|

Re: ship MatrixFactorizationModel with each partition?

Nan Zhu
In reply to this post by Matei Zaharia
Hi, Matei, 

Do you mean when we transform a certain RDD, the closure should not involve the other RDDs?

Best,

-- 
Nan Zhu


On Wednesday, January 8, 2014 at 12:33 AM, Matei Zaharia wrote:

Sorry, you actually can’t call predict() on the cluster because the model contains some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files. You can grab the code from that method there (which does a join) and call that yourself in Spark 0.8.x.

Matei

On Jan 7, 2014, at 10:23 PM, Nan Zhu <[hidden email]> wrote:

Hi, all

I ‘m trying the ALS in mllib

the following is my code

val result = als.run(ratingRDD)
    val allMovies = ratingRDD.map(rating => rating.product).distinct()
    val allUsers = ratingRDD.map(rating => rating.user).distinct()
    val allUserMoviePair = allUsers.cartesian(allMovies)
    val resultRDD = allUserMoviePair.map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })


every time result.predict throws exception like 

scala.MatchError: null
	at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507)
	at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
	at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72)
	at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69)
	at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
	at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
	at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
	at org.apache.spark.scheduler.Task.run(Task.scala:53)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:724)

if I change the code to pull the partitions into an array in the driver program, it works

val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })

so the exception seems to be related to how to share the MatrixFactorizationModel  in each partition?

can anyone give me the hint

Thank you very much!

-- 
Nan Zhu