Quantcast

parallelize for a large Seq is extreamly slow.

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

parallelize for a large Seq is extreamly slow.

Earthson
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")

this line is too slow. There are about 2 million elements in word_mapping.

Is there a good style for writing a large collection to hdfs?

import org.apache.spark._
import SparkContext._
import scala.io.Source
object WFilter {
    def main(args: Array[String]) {
        val spark = new SparkContext("yarn-standalone","word filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
        val stopset = Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet
        val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
        val tf_map = spark broadcast file.flatMap(_.split("\t")).map((_,1)).countByKey
        val df_map = spark broadcast file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
        val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
        def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 4 || (stopset contains w)) false else true
        val mapped = file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
        spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
        mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
        spark.stop()
    }

many thx:) 

--

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Perfection is achieved 
not when there is nothing more to add
 but when there is nothing left to take away
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Matei Zaharia
Administrator
Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably faster.

Matei

On Apr 24, 2014, at 8:01 PM, Earthson Lu <[hidden email]> wrote:

spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("<a href="hdfs://ns1/nlp/word_mapping">hdfs://ns1/nlp/word_mapping")

this line is too slow. There are about 2 million elements in word_mapping.

Is there a good style for writing a large collection to hdfs?

import org.apache.spark._
import SparkContext._
import scala.io.Source
object WFilter {
    def main(args: Array[String]) {
        val spark = new SparkContext("yarn-standalone","word filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
        val stopset = Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet
        val file = spark.textFile("<a href="hdfs://ns1/nlp/wiki.splited">hdfs://ns1/nlp/wiki.splited")
        val tf_map = spark broadcast file.flatMap(_.split("\t")).map((_,1)).countByKey
        val df_map = spark broadcast file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
        val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
        def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 4 || (stopset contains w)) false else true
        val mapped = file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
        spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("<a href="hdfs://ns1/nlp/word_mapping">hdfs://ns1/nlp/word_mapping")
        mapped.saveAsTextFile("<a href="hdfs://ns1/nlp/lda/wiki.docs">hdfs://ns1/nlp/lda/wiki.docs")
        spark.stop()
    }

many thx:) 

--

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Perfection is achieved 
not when there is nothing more to add
 but when there is nothing left to take away

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Matei Zaharia
Administrator
In reply to this post by Earthson
Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably faster.

Matei

On Apr 24, 2014, at 8:01 PM, Earthson Lu <[hidden email]> wrote:

spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("<a href="hdfs://ns1/nlp/word_mapping">hdfs://ns1/nlp/word_mapping")

this line is too slow. There are about 2 million elements in word_mapping.

Is there a good style for writing a large collection to hdfs?

import org.apache.spark._
import SparkContext._
import scala.io.Source
object WFilter {
    def main(args: Array[String]) {
        val spark = new SparkContext("yarn-standalone","word filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
        val stopset = Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet
        val file = spark.textFile("<a href="hdfs://ns1/nlp/wiki.splited">hdfs://ns1/nlp/wiki.splited")
        val tf_map = spark broadcast file.flatMap(_.split("\t")).map((_,1)).countByKey
        val df_map = spark broadcast file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
        val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
        def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 4 || (stopset contains w)) false else true
        val mapped = file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
        spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("<a href="hdfs://ns1/nlp/word_mapping">hdfs://ns1/nlp/word_mapping")
        mapped.saveAsTextFile("<a href="hdfs://ns1/nlp/lda/wiki.docs">hdfs://ns1/nlp/lda/wiki.docs")
        spark.stop()
    }

many thx:) 

--

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Perfection is achieved 
not when there is nothing more to add
 but when there is nothing left to take away

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Earthson
In reply to this post by Matei Zaharia
Kryo With Exception below:

com.esotericsoftware.kryo.KryoException (com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1)
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446)
com.esotericsoftware.kryo.io.Output.writeString(Output.java:306)
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153)
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79)
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:21)
com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:19)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:124)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)

~~~~~~~~~~~

package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
        kryo.register(classOf[Map[String,Int]])
    }
}

object WFilter2 {
    def initspark(name:String) = {
        val conf = new SparkConf()
                    .setMaster("yarn-standalone")
                    .setAppName(name)
                    .setSparkHome(System.getenv("SPARK_HOME"))
                    .setJars(SparkContext.jarOfClass(this.getClass))
                    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                    .set("spark.kryo.registrator", "com.semi.nlp.MyRegistrator")
        new SparkContext(conf)
    }

    def main(args: Array[String]) {
        val spark = initspark("word filter mapping")
        val stopset = Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
        val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
        val tf_map = spark broadcast file.flatMap(_.split("\t")).map((_,1)).countByKey
        val df_map = spark broadcast file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
        val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
        def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 4 || (stopset contains w)) false else true
        val mapped = file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
        spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
        mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
        spark.stop()
    }
}
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Earthson
I've tried to set larger buffer, but reduceByKey seems to be failed. need help:)

14/04/26 12:31:12 INFO cluster.CoarseGrainedSchedulerBackend: Shutting down all executors
14/04/26 12:31:12 INFO cluster.CoarseGrainedSchedulerBackend: Asking each executor to shut down
14/04/26 12:31:12 INFO scheduler.DAGScheduler: Failed to run countByKey at filter_2.scala:35
14/04/26 12:31:12 INFO yarn.ApplicationMaster: finishApplicationMaster with FAILED
Exception in thread "Thread-3" org.apache.hadoop.yarn.exceptions.YarnException: Application doesn't exist in cache appattempt_1398305021882_0069_000001
        at org.apache.hadoop.yarn.ipc.RPCUtil.getRemoteException(RPCUtil.java:45)
        at org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.finishApplicationMaster(ApplicationMasterService.java:294)
        at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.finishApplicationMaster(ApplicationMasterProtocolPBServiceImpl.java:75)
        at org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:97)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
        at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
        at org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:101)
        at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:94)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at $Proxy12.finishApplicationMaster(Unknown Source)
        at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.unregisterApplicationMaster(AMRMClientImpl.java:311)
        at org.apache.spark.deploy.yarn.ApplicationMaster.finishApplicationMaster(ApplicationMaster.scala:320)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:165)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.yarn.exceptions.YarnException): Application doesn't exist in cache appattempt_1398305021882_0069_000001
        at org.apache.hadoop.yarn.ipc.RPCUtil.getRemoteException(RPCUtil.java:45)
        at org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.finishApplicationMaster(ApplicationMasterService.java:294)
        at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.finishApplicationMaster(ApplicationMasterProtocolPBServiceImpl.java:75)
        at org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:97)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

        at org.apache.hadoop.ipc.Client.call(Client.java:1347)
        at org.apache.hadoop.ipc.Client.call(Client.java:1300)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
        at $Proxy11.finishApplicationMaster(Unknown Source)
        at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:91)
        ... 10 more
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Earthson
This error come just because I killed my App:(

Is there something wrong? the reduceByKey operation is extremely slow(than default Serializer).
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Earthson
reduceByKey(_+_).countByKey instead of countByKey seems to be fast.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Earthson
parallelize is still so slow.

~~~~~~~~

package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
        kryo.register(classOf[Map[String,Int]])
        kryo.register(classOf[Map[String,Long]])
        kryo.register(classOf[Seq[(String,Long)]])
        kryo.register(classOf[Seq[(String,Int)]])
    }
}

object WFilter2 {
    def initspark(name:String) = {
        val conf = new SparkConf()
                    .setMaster("yarn-standalone")
                    .setAppName(name)
                    .setSparkHome(System.getenv("SPARK_HOME"))
                    .setJars(SparkContext.jarOfClass(this.getClass))
                    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                    //.set("spark.closure.serializer", "org.apache.spark.serializer.KryoSerializer")
                    .set("spark.kryoserializer.buffer.mb", "256")
                    .set("spark.kryo.registrator", "com.semi.nlp.MyRegistrator")
                    .set("spark.cores.max", "30")
        new SparkContext(conf)
    }

    def main(args: Array[String]) {
        val spark = initspark("word filter mapping")
        val stopset = spark broadcast Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
        val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
        val tf_map = spark broadcast file.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).countByKey
        val df_map = spark broadcast file.flatMap(x=>Set(x.split("\t"):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey
        val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
        def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 4 || (stopset.value contains w)) false else true
        val mapped = file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
        spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
        mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
        spark.stop()
    }
}
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Aaron Davidson
Could it be that you're using the default number of partitions of parallelize() is too small in this case? Try something like spark.parallelize(word_mapping.value.toSeq, 60). (Given your setup, it should already be 30, but perhaps that's not the case in YARN mode...)


On Fri, Apr 25, 2014 at 11:38 PM, Earthson <[hidden email]> wrote:
parallelize is still so slow.

~~~~~~~~

package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
        kryo.register(classOf[Map[String,Int]])
        kryo.register(classOf[Map[String,Long]])
        kryo.register(classOf[Seq[(String,Long)]])
        kryo.register(classOf[Seq[(String,Int)]])
    }
}

object WFilter2 {
    def initspark(name:String) = {
        val conf = new SparkConf()
                    .setMaster("yarn-standalone")
                    .setAppName(name)
                    .setSparkHome(System.getenv("SPARK_HOME"))
                    .setJars(SparkContext.jarOfClass(this.getClass))
                    .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
                    //.set("spark.closure.serializer",
"org.apache.spark.serializer.KryoSerializer")
                    .set("spark.kryoserializer.buffer.mb", "256")
                    .set("spark.kryo.registrator",
"com.semi.nlp.MyRegistrator")
                    .set("spark.cores.max", "30")
        new SparkContext(conf)
    }

    def main(args: Array[String]) {
        val spark = initspark("word filter mapping")
        val stopset = spark broadcast
Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
        val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
        val tf_map = spark broadcast
file.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).countByKey
        val df_map = spark broadcast
file.flatMap(x=>Set(x.split("\t"):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey
        val word_mapping = spark broadcast
Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
        def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w)
< 4 || (stopset.value contains w)) false else true
        val mapped =
file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))

spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
        mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
        spark.stop()
    }
}



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Earthson
That's not work. I don't think it is just slow, It never ends(with 30+ hours, and I killed it).
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Matei Zaharia
Administrator
How many values are in that sequence? I.e. what is its size?

You can also profile your program while it’s running to see where it’s spending time. The easiest way is to get a single stack trace with jstack <process-id>. Maybe some of the serialization methods for this data are super inefficient, or toSeq on a map is inefficient. You could try word_mapping.value.toArray. I’m also wondering if something earlier in the program is slow and this is just not obvious from the output.

Matei

On Apr 27, 2014, at 9:47 AM, Earthson <[hidden email]> wrote:

> That's not work. I don't think it is just slow, It never ends(with 30+ hours,
> and I killed it).
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4900.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Earthson
It's my fault! I upload a wrong jar when I changed the number of partitions. and Now it just works fine:)

The size of word_mapping is 2444185.

So it will take very long time for large object serialization? I don't think two million is very large, because the cost at local for such size is typically less than one second.

Thanks for the help:)
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: parallelize for a large Seq is extreamly slow.

Earthson
In reply to this post by Matei Zaharia
I think the real problem is "spark.akka.frameSize". It is to small for passing the data. every executor failed, and there is no executor, then the task hangs up.
Loading...