GraphX pregel with Java

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

GraphX pregel with Java

dali
This post has NOT been accepted by the mailing list yet.
Hi,
I testes this code:
Object obj = Predef1.reflexivity();
                scala.Predef.$eq$colon$eq<String,MutableList<PartialSolution>> ev = (scala.Predef.$eq$colon$eq<String,MutableList<PartialSolution>>) obj;
                Graph<MutableList<PartialSolution>,String> initializedGraph = graph.mapVertices(vertexValueChange
                ,scala.reflect.ClassTag$.MODULE$.<MutableList<PartialSolution>>apply(MutableList.class),ev);

Function3<Object,MutableList<PartialSolution>,MutableList<PartialSolution>,MutableList<PartialSolution>> vprog =
                                new SerializableFunction3<Object,MutableList<PartialSolution>,MutableList<PartialSolution>,MutableList<PartialSolution>>(){

                                        @Override
                                        public MutableList<PartialSolution> apply(Object vertexId, MutableList<PartialSolution> vertex,
                                                        MutableList<PartialSolution> msg) {
                                                // TODO Auto-generated method stub
                                                return msg;
                                        }
                       
                };
               
               
                Function1<EdgeTriplet<MutableList<PartialSolution>,String>, Iterator<Tuple2<Object,MutableList<PartialSolution>>>> sendMsg  
                = new SerializableFunction1<EdgeTriplet<MutableList<PartialSolution>,String>, Iterator<Tuple2<Object,MutableList<PartialSolution>>>>(){

                        @Override
                        public Iterator<Tuple2<Object,MutableList<PartialSolution>>> apply(EdgeTriplet<MutableList<PartialSolution>, String> edgeTriplet) {
                                // TODO Auto-generated method stub
                                MutableList<PartialSolution> msg = new MutableList<PartialSolution>();
                                if(pattern.isMatched(edgeTriplet)
                                        msg.$plus$eq(new PartialSolution(pattern));
                                MutableList<Tuple2<Object,MutableList<PartialSolution>>> returned = new MutableList<Tuple2<Object,MutableList<PartialSolution>>>();
                                returned.$plus$eq(new Tuple2<Object,MutableList<PartialSolution>>(edgeTriplet.dstId(),msg));
                                return returned.iterator();
                        }  
                        };
                        Function2<MutableList<PartialSolution>,MutableList<PartialSolution>,MutableList<PartialSolution>> mergeMsg  = new SerializableFunction2<MutableList<PartialSolution>,MutableList<PartialSolution>,MutableList<PartialSolution>>(){

                                @Override
                                public MutableList<PartialSolution> apply(MutableList<PartialSolution> list1,
                                                MutableList<PartialSolution> list2) {
                                        // TODO Auto-generated method stub
                                        if(list1.size()==0)
                                                if(list2.size()==0)
                                                        return list1;
                                                else
                                                        return list2;
                                        else
                                                if(list2.size()==0)
                                                        return list1;
                                                else
                                                        return (MutableList<PartialSolution>) list1.$plus$plus$eq(list2);
                                       
                                                                               
                                }  
                                };
                               
                               
                                Graph<MutableList<PartialSolution>,String> newInitializedGraph = Pregel.apply(initializedGraph, new MutableList<PartialSolution>(),
                                2,EdgeDirection.Out(),
                                vprog, sendMsg,
                                mergeMsg,scala.reflect.ClassTag$.MODULE$.<MutableList<PartialSolution>>apply(MutableList.class),
                                scala.reflect.ClassTag$.MODULE$.<String>apply(String.class),
                                scala.reflect.ClassTag$.MODULE$.<MutableList<PartialSolution>>apply(MutableList.class));

but I have this error:

java.lang.ArrayStoreException: scala.collection.mutable.MutableList
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
        at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
        at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:71)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
17/03/14 10:43:02 INFO TaskSetManager: Starting task 1.0 in stage 28.0 (TID 47, localhost, partition 1, PROCESS_LOCAL, 6061 bytes)
17/03/14 10:43:02 INFO Executor: Running task 1.0 in stage 28.0 (TID 47)
17/03/14 10:43:02 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID 46, localhost): java.lang.ArrayStoreException: scala.collection.mutable.MutableList
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
        at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
        at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:71)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

17/03/14 10:43:02 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1 times; aborting job
17/03/14 10:43:02 INFO TaskSchedulerImpl: Cancelling stage 28
17/03/14 10:43:02 INFO TaskSchedulerImpl: Stage 28 was cancelled
17/03/14 10:43:02 INFO DAGScheduler: ShuffleMapStage 28 (mapPartitions at GraphImpl.scala:207) failed in 0,128 s
17/03/14 10:43:02 INFO DAGScheduler: Job 9 failed: reduce at VertexRDDImpl.scala:88, took 0,585121 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage 28.0 (TID 46, localhost): java.lang.ArrayStoreException: scala.collection.mutable.MutableList
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
        at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
        at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:71)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1936)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1002)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.RDD.reduce(RDD.scala:984)
        at org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:88)
        at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:128)
        at org.apache.spark.graphx.Pregel.apply(Pregel.scala)
        at resolveMatches.PatternResolution.initializeResolution(PatternResolution.java:195)
        at resolveMatches.PatternResolution.run(PatternResolution.java:262)
        at resolveMatches.Test.main(Test.java:90)
Caused by: java.lang.ArrayStoreException: scala.collection.mutable.MutableList
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
        at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
        at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:71)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)



Can you help me?
Loading...