How to set Akka frame size

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

How to set Akka frame size

leosandylh@gmail.com
Hi, everyone
 
I have a question about the arg spark.akka.frameSize , it default value is 10m .
I execute the JavaWordCount read data from hdfs , there is a 7G file .
there is a oom error caused by
some task result exceeded Akka frame size .
but when I modify the arg 1G ,2G , 10G , it show me
ERROR ClusterScheduler: Lost executor 1 on ocnosql84: remote Akka client shutdown
13/12/24 19:41:14 ERROR StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down.
 
Sometimes it show me different error info :
[lh1@ocnosql84 src]$ java MyWordCount spark://ocnosql84:7077 hdfs://ocnosql76:8030/user/lh1/cdr_ismp_20130218 15000 1g 120
13/12/24 19:20:33 ERROR Client$ClientActor: Failed to connect to master
org.jboss.netty.channel.ChannelPipelineException: Failed to initialize a pipeline.
        at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:209)
        at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:183)
        at akka.remote.netty.ActiveRemoteClient$$anonfun$connect$1.apply$mcV$sp(Client.scala:173)
        at akka.util.Switch.liftedTree1$1(LockUtil.scala:33)
        at akka.util.Switch.transcend(LockUtil.scala:32)
        at akka.util.Switch.switchOn(LockUtil.scala:55)
        at akka.remote.netty.ActiveRemoteClient.connect(Client.scala:158)
        at akka.remote.netty.NettyRemoteTransport.send(NettyRemoteSupport.scala:153)
        at akka.remote.RemoteActorRef.$bang(RemoteActorRefProvider.scala:247)
        at org.apache.spark.deploy.client.Client$ClientActor.preStart(Client.scala:61)
        at akka.actor.ActorCell.create$1(ActorCell.scala:508)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:600)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:209)
        at akka.dispatch.Mailbox.run(Mailbox.scala:178)
        at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
        at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
        at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
        at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
        at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Caused by: java.lang.IllegalArgumentException: maxFrameLength must be a positive integer: -1451229184
        at org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:270)
        at org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:236)
        at akka.remote.netty.ActiveRemoteClientPipelineFactory.getPipeline(Client.scala:340)
        at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:207)
        ... 18 more
13/12/24 19:20:33 ERROR SparkDeploySchedulerBackend: Disconnected from Spark cluster!
13/12/24 19:20:33 ERROR ClusterScheduler: Exiting due to error from cluster scheduler: Disconnected from Spark cluster
 
It seems caused by
LengthFieldBasedFrameDecoder lenDec = new LengthFieldBasedFrameDecoder(this.client.netty().settings().MessageFrameSize(), 0, 4, 0, 4);
I don't know what's the value of  this.client.netty().settings().MessageFrameSize() and how  to calculate  this value .
 
my spark args :
export SPARK_DAEMON_MEMORY=4000m
export SPARK_MEM=1000m
export SPARK_WORKER_MEMORY=8g
spark.akka.frameSize = 1000 / 2000 / 5000 / 10000 / 15000
spark.executor.memory  = 1g
spark.akka.askTimeout = 120
 
Any help or reply is very appriciated !  Thanks very much
   

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: How to set Akka frame size

Aaron Davidson
The error you're receiving is because the Akka frame size must be a positive Java Integer, i.e., less than 2^31. However, the frame size is not intended to be nearly the size of the job memory -- it is the smallest unit of data transfer that Spark does. In this case, your "task result" size is exceeding 10MB, which means that returning the results for a single partition of your data is >10MB.

It appears that the default JavaWordCount example has a minSplits value of 1 (ctx.textFile(args[1], 1)). This really means that the number of partitions will be max(1, # hdfs blocks in file). If you have an HDFS block of size ~64MB and all distinct words, the resulting task set may be around the same size, which is >10MB.

You have two collaborating solutions:
  1. Increase the value of minSplits to reduce the size of any single TaskResult set, like: ctx.textFile(args[1], 256)
  2. Increase the Akka frame size by a small amount (e.g., to 20-70MB).
Please note that this issue, while annoying, is in good part due to the lack of realism of this example. You very rarely call collect() in Spark in actual usage, as that will put all your output data on the driver machine. Much more likely you'd save to an HDFS file or compute the top 100 words or something like that, which would not have this problem.

(One final note about your configuration, the Spark Worker is simply responsible for spawning Executors, which do the actual computation. As such, it is typical not to change the Worker memory at all [as it needs very little] but rather to give the majority of a machine's memory distributed amongst the Executors. If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.)


On Tue, Dec 24, 2013 at 3:58 AM, [hidden email] <[hidden email]> wrote:
Hi, everyone
 
I have a question about the arg spark.akka.frameSize , it default value is 10m .
I execute the JavaWordCount read data from hdfs , there is a 7G file .
there is a oom error caused by
some task result exceeded Akka frame size .
but when I modify the arg 1G ,2G , 10G , it show me
ERROR ClusterScheduler: Lost executor 1 on ocnosql84: remote Akka client shutdown
13/12/24 19:41:14 ERROR StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down.
 
Sometimes it show me different error info :
[lh1@ocnosql84 src]$ java MyWordCount spark://ocnosql84:7077 hdfs://ocnosql76:8030/user/lh1/cdr_ismp_20130218 15000 1g 120
13/12/24 19:20:33 ERROR Client$ClientActor: Failed to connect to master
org.jboss.netty.channel.ChannelPipelineException: Failed to initialize a pipeline.
        at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:209)
        at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:183)
        at akka.remote.netty.ActiveRemoteClient$$anonfun$connect$1.apply$mcV$sp(Client.scala:173)
        at akka.util.Switch.liftedTree1$1(LockUtil.scala:33)
        at akka.util.Switch.transcend(LockUtil.scala:32)
        at akka.util.Switch.switchOn(LockUtil.scala:55)
        at akka.remote.netty.ActiveRemoteClient.connect(Client.scala:158)
        at akka.remote.netty.NettyRemoteTransport.send(NettyRemoteSupport.scala:153)
        at akka.remote.RemoteActorRef.$bang(RemoteActorRefProvider.scala:247)
        at org.apache.spark.deploy.client.Client$ClientActor.preStart(Client.scala:61)
        at akka.actor.ActorCell.create$1(ActorCell.scala:508)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:600)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:209)
        at akka.dispatch.Mailbox.run(Mailbox.scala:178)
        at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
        at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
        at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
        at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
        at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Caused by: java.lang.IllegalArgumentException: maxFrameLength must be a positive integer: -1451229184
        at org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:270)
        at org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:236)
        at akka.remote.netty.ActiveRemoteClientPipelineFactory.getPipeline(Client.scala:340)
        at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:207)
        ... 18 more
13/12/24 19:20:33 ERROR SparkDeploySchedulerBackend: Disconnected from Spark cluster!
13/12/24 19:20:33 ERROR ClusterScheduler: Exiting due to error from cluster scheduler: Disconnected from Spark cluster
 
It seems caused by
LengthFieldBasedFrameDecoder lenDec = new LengthFieldBasedFrameDecoder(this.client.netty().settings().MessageFrameSize(), 0, 4, 0, 4);
I don't know what's the value of  this.client.netty().settings().MessageFrameSize() and how  to calculate  this value .
 
my spark args :
export SPARK_DAEMON_MEMORY=4000m
export SPARK_MEM=1000m
export SPARK_WORKER_MEMORY=8g
spark.akka.frameSize = 1000 / 2000 / 5000 / 10000 / 15000
spark.executor.memory  = 1g
spark.akka.askTimeout = 120
 
Any help or reply is very appriciated !  Thanks very much
   


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Re: How to set Akka frame size

leosandylh@gmail.com

Thank you very much !
 

 
Date: 2013-12-25 02:48
Subject: Re: How to set Akka frame size
The error you're receiving is because the Akka frame size must be a positive Java Integer, i.e., less than 2^31. However, the frame size is not intended to be nearly the size of the job memory -- it is the smallest unit of data transfer that Spark does. In this case, your "task result" size is exceeding 10MB, which means that returning the results for a single partition of your data is >10MB.

It appears that the default JavaWordCount example has a minSplits value of 1 (ctx.textFile(args[1], 1)). This really means that the number of partitions will be max(1, # hdfs blocks in file). If you have an HDFS block of size ~64MB and all distinct words, the resulting task set may be around the same size, which is >10MB.

You have two collaborating solutions:
  1. Increase the value of minSplits to reduce the size of any single TaskResult set, like: ctx.textFile(args[1], 256)
  2. Increase the Akka frame size by a small amount (e.g., to 20-70MB).
Please note that this issue, while annoying, is in good part due to the lack of realism of this example. You very rarely call collect() in Spark in actual usage, as that will put all your output data on the driver machine. Much more likely you'd save to an HDFS file or compute the top 100 words or something like that, which would not have this problem.

(One final note about your configuration, the Spark Worker is simply responsible for spawning Executors, which do the actual computation. As such, it is typical not to change the Worker memory at all [as it needs very little] but rather to give the majority of a machine's memory distributed amongst the Executors. If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.)


On Tue, Dec 24, 2013 at 3:58 AM, [hidden email] <[hidden email]> wrote:
Hi, everyone
 
I have a question about the arg spark.akka.frameSize , it default value is 10m .
I execute the JavaWordCount read data from hdfs , there is a 7G file .
there is a oom error caused by
some task result exceeded Akka frame size .
but when I modify the arg 1G ,2G , 10G , it show me
ERROR ClusterScheduler: Lost executor 1 on ocnosql84: remote Akka client shutdown
13/12/24 19:41:14 ERROR StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down.
 
Sometimes it show me different error info :
[lh1@ocnosql84 src]$ java MyWordCount spark://ocnosql84:7077 hdfs://ocnosql76:8030/user/lh1/cdr_ismp_20130218 15000 1g 120
13/12/24 19:20:33 ERROR Client$ClientActor: Failed to connect to master
org.jboss.netty.channel.ChannelPipelineException: Failed to initialize a pipeline.
        at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:209)
        at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:183)
        at akka.remote.netty.ActiveRemoteClient$$anonfun$connect$1.apply$mcV$sp(Client.scala:173)
        at akka.util.Switch.liftedTree1$1(LockUtil.scala:33)
        at akka.util.Switch.transcend(LockUtil.scala:32)
        at akka.util.Switch.switchOn(LockUtil.scala:55)
        at akka.remote.netty.ActiveRemoteClient.connect(Client.scala:158)
        at akka.remote.netty.NettyRemoteTransport.send(NettyRemoteSupport.scala:153)
        at akka.remote.RemoteActorRef.$bang(RemoteActorRefProvider.scala:247)
        at org.apache.spark.deploy.client.Client$ClientActor.preStart(Client.scala:61)
        at akka.actor.ActorCell.create$1(ActorCell.scala:508)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:600)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:209)
        at akka.dispatch.Mailbox.run(Mailbox.scala:178)
        at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
        at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
        at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
        at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
        at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Caused by: java.lang.IllegalArgumentException: maxFrameLength must be a positive integer: -1451229184
        at org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:270)
        at org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:236)
        at akka.remote.netty.ActiveRemoteClientPipelineFactory.getPipeline(Client.scala:340)
        at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:207)
        ... 18 more
13/12/24 19:20:33 ERROR SparkDeploySchedulerBackend: Disconnected from Spark cluster!
13/12/24 19:20:33 ERROR ClusterScheduler: Exiting due to error from cluster scheduler: Disconnected from Spark cluster
 
It seems caused by
LengthFieldBasedFrameDecoder lenDec = new LengthFieldBasedFrameDecoder(this.client.netty().settings().MessageFrameSize(), 0, 4, 0, 4);
I don't know what's the value of  this.client.netty().settings().MessageFrameSize() and how  to calculate  this value .
 
my spark args :
export SPARK_DAEMON_MEMORY=4000m
export SPARK_MEM=1000m
export SPARK_WORKER_MEMORY=8g
spark.akka.frameSize = 1000 / 2000 / 5000 / 10000 / 15000
spark.executor.memory  = 1g
spark.akka.askTimeout = 120
 
Any help or reply is very appriciated !  Thanks very much
   


Loading...