Quantcast

Problems with broadcast large datastructure

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Problems with broadcast large datastructure

Sebastian Schelter
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Problems with broadcast large datastructure

Aureliano Buendia
What's the size of your large object to be broadcast?


On Tue, Jan 7, 2014 at 8:55 AM, Sebastian Schelter <[hidden email]> wrote:
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Problems with broadcast large datastructure

Khanderao kand
In reply to this post by Sebastian Schelter
If your object size > 10MB you may need to change spark.akka.frameSize.

What is your spark, spark.akka.timeOut ?

did you change   spark.akka.heartbeat.interval  ?

BTW based on large size getting broadcasted across 25 nodes, you may want to consider the frequency of such transfer and evaluate alternative patterns.




On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter <[hidden email]> wrote:
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Problems with broadcast large datastructure

lihu
I have occurred the same problem with you .
I have a node of 20 machines, and I just run the broadcast example, what I do is just change the data size in the example, to 400M, this is really a small data size. but I occurred the same problem with you .
So I wonder maybe the broadcast capacity is weak in the spark system?


here is my config:

SPARK_MEM=12g
SPARK_MASTER_WEBUI_PORT=12306
SPARK_WORKER_MEMORY=12g
SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g -Dspark.akka.timeout=600  -Dspark.local.dir=/disk3/lee/tmp -Dspark.worker.timeout=600 -Dspark.akka.frameSize=10000 -Dspark.akka.askTimeout=300 -Dspark.storage.blockManagerTimeoutIntervalMs=100000 -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"













On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand <[hidden email]> wrote:
If your object size > 10MB you may need to change spark.akka.frameSize.

What is your spark, spark.akka.timeOut ?

did you change   spark.akka.heartbeat.interval  ?

BTW based on large size getting broadcasted across 25 nodes, you may want to consider the frequency of such transfer and evaluate alternative patterns.




On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter <[hidden email]> wrote:
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Problems with broadcast large datastructure

Aureliano Buendia



On Mon, Jan 13, 2014 at 4:17 AM, lihu <[hidden email]> wrote:
I have occurred the same problem with you .
I have a node of 20 machines, and I just run the broadcast example, what I do is just change the data size in the example, to 400M, this is really a small data size.

Is 400 MB a really small size for broadcasting?

I had the impression that broadcast is for object much much smaller, about less than 10 MB.
 
but I occurred the same problem with you .
So I wonder maybe the broadcast capacity is weak in the spark system?


here is my config:

SPARK_MEM=12g
SPARK_MASTER_WEBUI_PORT=12306
SPARK_WORKER_MEMORY=12g
SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g -Dspark.akka.timeout=600  -Dspark.local.dir=/disk3/lee/tmp -Dspark.worker.timeout=600 -Dspark.akka.frameSize=10000 -Dspark.akka.askTimeout=300 -Dspark.storage.blockManagerTimeoutIntervalMs=100000 -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"













On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand <[hidden email]> wrote:
If your object size > 10MB you may need to change spark.akka.frameSize.

What is your spark, spark.akka.timeOut ?

did you change   spark.akka.heartbeat.interval  ?

BTW based on large size getting broadcasted across 25 nodes, you may want to consider the frequency of such transfer and evaluate alternative patterns.




On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter <[hidden email]> wrote:
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian





Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Problems with broadcast large datastructure

Mosharaf Chowdhury
400MB isn't really that big. Broadcast is expected to work with several GB of data and in even larger clusters (100s of machines).

if you are using the default HttpBroadcast, then akka isn't used to move the broadcasted data. But block manager can run out of memory if you repetitively broadcast large objects. Another scenario is that the master isn't receiving any heartbeats from the blockmanager because the control messages are getting dropped due to bulk data movement. Can you provide a bit more details on your network setup?

Also, you can try doing a binary search over the size of broadcasted data to see at what size it breaks (i.e, try to broadcast 10, then 20, then 40 etc etc.)? Also, limit each run to a single iteration in the example (right now, it tries to broadcast 3 consecutive times). 

If you are using a newer branch, you can also try the new TorrentBroadcast implementation.


--
Mosharaf Chowdhury
http://www.mosharaf.com/


On Sun, Jan 12, 2014 at 8:22 PM, Aureliano Buendia <[hidden email]> wrote:



On Mon, Jan 13, 2014 at 4:17 AM, lihu <[hidden email]> wrote:
I have occurred the same problem with you .
I have a node of 20 machines, and I just run the broadcast example, what I do is just change the data size in the example, to 400M, this is really a small data size.

Is 400 MB a really small size for broadcasting?

I had the impression that broadcast is for object much much smaller, about less than 10 MB.
 
but I occurred the same problem with you .
So I wonder maybe the broadcast capacity is weak in the spark system?


here is my config:

SPARK_MEM=12g
SPARK_MASTER_WEBUI_PORT=12306
SPARK_WORKER_MEMORY=12g
SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g -Dspark.akka.timeout=600  -Dspark.local.dir=/disk3/lee/tmp -Dspark.worker.timeout=600 -Dspark.akka.frameSize=10000 -Dspark.akka.askTimeout=300 -Dspark.storage.blockManagerTimeoutIntervalMs=100000 -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"













On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand <[hidden email]> wrote:
If your object size > 10MB you may need to change spark.akka.frameSize.

What is your spark, spark.akka.timeOut ?

did you change   spark.akka.heartbeat.interval  ?

BTW based on large size getting broadcasted across 25 nodes, you may want to consider the frequency of such transfer and evaluate alternative patterns.




On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter <[hidden email]> wrote:
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian






Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Problems with broadcast large datastructure

lihu
In reply to this post by Aureliano Buendia
In my opinion, the spark system is for big data, then 400M seem not big .

I read slides about the broadcast, in my understanding, the executor will send the broadcast variable back to the driver. each executor own a complete copy of the broadcast variable.

In my experiment, I have 20 machine, each machine own 2 executor, and I used the default parallelize, which is 8, so there  320  tasks in one stage in total.

then the workers will send 320*(400M/8)=16G data back to the driver, this seem very big. but I get from log that after serialize, the data size send back to driver is just 446 byte in each task. 

org.apache.spark.storage.BlockManager - Found block broadcast_5 locally
org.apache.spark.executor.Executor - Serialized size of result for 1901 is 446
org.apache.spark.executor.Executor - Sending result for 1901 directly to driver

So the total data send back to driver just 320*446byte=142720byte. this is really small in my opinion.

---------------
In summary

1.  Spark system is for big data, then 400M is not big in my opinion.
2.  I do not sure if my understanding for the broadcast is right, then the data send back to the driver may bigger?
3.  I just wonder why the serialize rate is so hight, it can serialize the 400/8=50M to just 446 byte?
4.  If it is my fault that do not run the broadcast experiment in the right way,  then I I hope the spark community can give more examples about the broadcast, this may benefit many users.






On Mon, Jan 13, 2014 at 12:22 PM, Aureliano Buendia <[hidden email]> wrote:



On Mon, Jan 13, 2014 at 4:17 AM, lihu <[hidden email]> wrote:
I have occurred the same problem with you .
I have a node of 20 machines, and I just run the broadcast example, what I do is just change the data size in the example, to 400M, this is really a small data size.

Is 400 MB a really small size for broadcasting?

I had the impression that broadcast is for object much much smaller, about less than 10 MB.
 
but I occurred the same problem with you .
So I wonder maybe the broadcast capacity is weak in the spark system?


here is my config:

SPARK_MEM=12g
SPARK_MASTER_WEBUI_PORT=12306
SPARK_WORKER_MEMORY=12g
SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g -Dspark.akka.timeout=600  -Dspark.local.dir=/disk3/lee/tmp -Dspark.worker.timeout=600 -Dspark.akka.frameSize=10000 -Dspark.akka.askTimeout=300 -Dspark.storage.blockManagerTimeoutIntervalMs=100000 -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"













On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand <[hidden email]> wrote:
If your object size > 10MB you may need to change spark.akka.frameSize.

What is your spark, spark.akka.timeOut ?

did you change   spark.akka.heartbeat.interval  ?

BTW based on large size getting broadcasted across 25 nodes, you may want to consider the frequency of such transfer and evaluate alternative patterns.




On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter <[hidden email]> wrote:
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian









Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Problems with broadcast large datastructure

Mosharaf Chowdhury
broadcast is supposed to send data from the driver to the executors and not the other direction. can you share the code snippet you are using to broadcast? 

--
Mosharaf Chowdhury
http://www.mosharaf.com/


On Sun, Jan 12, 2014 at 8:52 PM, lihu <[hidden email]> wrote:
In my opinion, the spark system is for big data, then 400M seem not big .

I read slides about the broadcast, in my understanding, the executor will send the broadcast variable back to the driver. each executor own a complete copy of the broadcast variable.

In my experiment, I have 20 machine, each machine own 2 executor, and I used the default parallelize, which is 8, so there  320  tasks in one stage in total.

then the workers will send 320*(400M/8)=16G data back to the driver, this seem very big. but I get from log that after serialize, the data size send back to driver is just 446 byte in each task. 

org.apache.spark.storage.BlockManager - Found block broadcast_5 locally
org.apache.spark.executor.Executor - Serialized size of result for 1901 is 446
org.apache.spark.executor.Executor - Sending result for 1901 directly to driver

So the total data send back to driver just 320*446byte=142720byte. this is really small in my opinion.

---------------
In summary

1.  Spark system is for big data, then 400M is not big in my opinion.
2.  I do not sure if my understanding for the broadcast is right, then the data send back to the driver may bigger?
3.  I just wonder why the serialize rate is so hight, it can serialize the 400/8=50M to just 446 byte?
4.  If it is my fault that do not run the broadcast experiment in the right way,  then I I hope the spark community can give more examples about the broadcast, this may benefit many users.






On Mon, Jan 13, 2014 at 12:22 PM, Aureliano Buendia <[hidden email]> wrote:



On Mon, Jan 13, 2014 at 4:17 AM, lihu <[hidden email]> wrote:
I have occurred the same problem with you .
I have a node of 20 machines, and I just run the broadcast example, what I do is just change the data size in the example, to 400M, this is really a small data size.

Is 400 MB a really small size for broadcasting?

I had the impression that broadcast is for object much much smaller, about less than 10 MB.
 
but I occurred the same problem with you .
So I wonder maybe the broadcast capacity is weak in the spark system?


here is my config:

SPARK_MEM=12g
SPARK_MASTER_WEBUI_PORT=12306
SPARK_WORKER_MEMORY=12g
SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g -Dspark.akka.timeout=600  -Dspark.local.dir=/disk3/lee/tmp -Dspark.worker.timeout=600 -Dspark.akka.frameSize=10000 -Dspark.akka.askTimeout=300 -Dspark.storage.blockManagerTimeoutIntervalMs=100000 -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"













On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand <[hidden email]> wrote:
If your object size > 10MB you may need to change spark.akka.frameSize.

What is your spark, spark.akka.timeOut ?

did you change   spark.akka.heartbeat.interval  ?

BTW based on large size getting broadcasted across 25 nodes, you may want to consider the frequency of such transfer and evaluate alternative patterns.




On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter <[hidden email]> wrote:
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian










Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Problems with broadcast large datastructure

lihu
Yes, I just using the code snippet from the broadcast example, and using the spark-shell run this code.
I thought the broadcast is driver send to the executor, and the executor will send back,  is there some wrong for  calculate the broadcast size? 

val MAX_ITER = 30
val num = 100000000
var arr1 = new Array[Int](num)
    for (i <- 0 until arr1.length) {
      arr1(i) = i
}  
for (i <- 0 until MAX_ITER) {
      println("Iteration " + i)
      println("===========")
      val startTime = System.nanoTime
      val barr1 = sc.broadcast(arr1)
      sc.parallelize(1 to 10).foreach {
        i => println(barr1.value.size)
}
   println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
 }

I also try the TorrentBroadcast , it faster than the default, this is very helpful, thanks again! 
but I also get stuck during iteration, here is the log info from the master, it seem that this is the heartbeat problem.

[sparkMaster-akka.actor.default-dispatcher-26] WARN  org.apache.spark.deploy.master.Master - Got heartbeat from unregistered worker worker-20140102202153-m023.corp.typingx.me-8139
[sparkMaster-akka.actor.default-dispatcher-26] WARN  org.apache.spark.deploy.master.Master - Got heartbeat from unregistered worker worker-20140102202153-m023.corp.typingx.me-40447




On Mon, Jan 13, 2014 at 1:01 PM, Mosharaf Chowdhury <[hidden email]> wrote:
broadcast is supposed to send data from the driver to the executors and not the other direction. can you share the code snippet you are using to broadcast? 

--
Mosharaf Chowdhury
http://www.mosharaf.com/


On Sun, Jan 12, 2014 at 8:52 PM, lihu <[hidden email]> wrote:
In my opinion, the spark system is for big data, then 400M seem not big .

I read slides about the broadcast, in my understanding, the executor will send the broadcast variable back to the driver. each executor own a complete copy of the broadcast variable.

In my experiment, I have 20 machine, each machine own 2 executor, and I used the default parallelize, which is 8, so there  320  tasks in one stage in total.

then the workers will send 320*(400M/8)=16G data back to the driver, this seem very big. but I get from log that after serialize, the data size send back to driver is just 446 byte in each task. 

org.apache.spark.storage.BlockManager - Found block broadcast_5 locally
org.apache.spark.executor.Executor - Serialized size of result for 1901 is 446
org.apache.spark.executor.Executor - Sending result for 1901 directly to driver

So the total data send back to driver just 320*446byte=142720byte. this is really small in my opinion.

---------------
In summary

1.  Spark system is for big data, then 400M is not big in my opinion.
2.  I do not sure if my understanding for the broadcast is right, then the data send back to the driver may bigger?
3.  I just wonder why the serialize rate is so hight, it can serialize the 400/8=50M to just 446 byte?
4.  If it is my fault that do not run the broadcast experiment in the right way,  then I I hope the spark community can give more examples about the broadcast, this may benefit many users.






On Mon, Jan 13, 2014 at 12:22 PM, Aureliano Buendia <[hidden email]> wrote:



On Mon, Jan 13, 2014 at 4:17 AM, lihu <[hidden email]> wrote:
I have occurred the same problem with you .
I have a node of 20 machines, and I just run the broadcast example, what I do is just change the data size in the example, to 400M, this is really a small data size.

Is 400 MB a really small size for broadcasting?

I had the impression that broadcast is for object much much smaller, about less than 10 MB.
 
but I occurred the same problem with you .
So I wonder maybe the broadcast capacity is weak in the spark system?


here is my config:

SPARK_MEM=12g
SPARK_MASTER_WEBUI_PORT=12306
SPARK_WORKER_MEMORY=12g
SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g -Dspark.akka.timeout=600  -Dspark.local.dir=/disk3/lee/tmp -Dspark.worker.timeout=600 -Dspark.akka.frameSize=10000 -Dspark.akka.askTimeout=300 -Dspark.storage.blockManagerTimeoutIntervalMs=100000 -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"













On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand <[hidden email]> wrote:
If your object size > 10MB you may need to change spark.akka.frameSize.

What is your spark, spark.akka.timeOut ?

did you change   spark.akka.heartbeat.interval  ?

BTW based on large size getting broadcasted across 25 nodes, you may want to consider the frequency of such transfer and evaluate alternative patterns.




On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter <[hidden email]> wrote:
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian













--
Best Wishes!

Li Hu(李浒) | Graduate Student
Institute for Interdisciplinary Information Sciences(IIIS)
Tsinghua University, China

Tel  : +86 15120081920


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Problems with broadcast large datastructure

Mosharaf Chowdhury
Size calculation is correct, but broadcast happens from the driver to the workers. 

btw, your code is broadcasting 400MB 30 times, which are not being evicted from the cache fast enough, which, I think, is causing blockManagers to run out of memory.

On Sun, Jan 12, 2014 at 9:34 PM, lihu <[hidden email]> wrote:
Yes, I just using the code snippet from the broadcast example, and using the spark-shell run this code.
I thought the broadcast is driver send to the executor, and the executor will send back,  is there some wrong for  calculate the broadcast size? 

val MAX_ITER = 30
val num = 100000000
var arr1 = new Array[Int](num)
    for (i <- 0 until arr1.length) {
      arr1(i) = i
}  
for (i <- 0 until MAX_ITER) {
      println("Iteration " + i)
      println("===========")
      val startTime = System.nanoTime
      val barr1 = sc.broadcast(arr1)
      sc.parallelize(1 to 10).foreach {
        i => println(barr1.value.size)
}
   println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
 }

I also try the TorrentBroadcast , it faster than the default, this is very helpful, thanks again! 
but I also get stuck during iteration, here is the log info from the master, it seem that this is the heartbeat problem.

[sparkMaster-akka.actor.default-dispatcher-26] WARN  org.apache.spark.deploy.master.Master - Got heartbeat from unregistered worker worker-20140102202153-m023.corp.typingx.me-8139
[sparkMaster-akka.actor.default-dispatcher-26] WARN  org.apache.spark.deploy.master.Master - Got heartbeat from unregistered worker worker-20140102202153-m023.corp.typingx.me-40447




On Mon, Jan 13, 2014 at 1:01 PM, Mosharaf Chowdhury <[hidden email]> wrote:
broadcast is supposed to send data from the driver to the executors and not the other direction. can you share the code snippet you are using to broadcast? 

--
Mosharaf Chowdhury
http://www.mosharaf.com/


On Sun, Jan 12, 2014 at 8:52 PM, lihu <[hidden email]> wrote:
In my opinion, the spark system is for big data, then 400M seem not big .

I read slides about the broadcast, in my understanding, the executor will send the broadcast variable back to the driver. each executor own a complete copy of the broadcast variable.

In my experiment, I have 20 machine, each machine own 2 executor, and I used the default parallelize, which is 8, so there  320  tasks in one stage in total.

then the workers will send 320*(400M/8)=16G data back to the driver, this seem very big. but I get from log that after serialize, the data size send back to driver is just 446 byte in each task. 

org.apache.spark.storage.BlockManager - Found block broadcast_5 locally
org.apache.spark.executor.Executor - Serialized size of result for 1901 is 446
org.apache.spark.executor.Executor - Sending result for 1901 directly to driver

So the total data send back to driver just 320*446byte=142720byte. this is really small in my opinion.

---------------
In summary

1.  Spark system is for big data, then 400M is not big in my opinion.
2.  I do not sure if my understanding for the broadcast is right, then the data send back to the driver may bigger?
3.  I just wonder why the serialize rate is so hight, it can serialize the 400/8=50M to just 446 byte?
4.  If it is my fault that do not run the broadcast experiment in the right way,  then I I hope the spark community can give more examples about the broadcast, this may benefit many users.






On Mon, Jan 13, 2014 at 12:22 PM, Aureliano Buendia <[hidden email]> wrote:



On Mon, Jan 13, 2014 at 4:17 AM, lihu <[hidden email]> wrote:
I have occurred the same problem with you .
I have a node of 20 machines, and I just run the broadcast example, what I do is just change the data size in the example, to 400M, this is really a small data size.

Is 400 MB a really small size for broadcasting?

I had the impression that broadcast is for object much much smaller, about less than 10 MB.
 
but I occurred the same problem with you .
So I wonder maybe the broadcast capacity is weak in the spark system?


here is my config:

SPARK_MEM=12g
SPARK_MASTER_WEBUI_PORT=12306
SPARK_WORKER_MEMORY=12g
SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g -Dspark.akka.timeout=600  -Dspark.local.dir=/disk3/lee/tmp -Dspark.worker.timeout=600 -Dspark.akka.frameSize=10000 -Dspark.akka.askTimeout=300 -Dspark.storage.blockManagerTimeoutIntervalMs=100000 -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"













On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand <[hidden email]> wrote:
If your object size > 10MB you may need to change spark.akka.frameSize.

What is your spark, spark.akka.timeOut ?

did you change   spark.akka.heartbeat.interval  ?

BTW based on large size getting broadcasted across 25 nodes, you may want to consider the frequency of such transfer and evaluate alternative patterns.




On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter <[hidden email]> wrote:
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian













--
Best Wishes!

Li Hu(李浒) | Graduate Student
Institute for Interdisciplinary Information Sciences(IIIS)
Tsinghua University, China

Tel  : <a href="tel:%2B86%2015120081920" value="+8615120081920" target="_blank">+86 15120081920



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Problems with broadcast large datastructure

lihu
Oh, I misleading by the following log info, that I thought the  broadcast variable is send back to driver. then the sending result to driver has no relationship with the broadcast variable, but what it is , since there seem no data will send back?

org.apache.spark.executor.Executor - Serialized size of result for 1901 is 446
org.apache.spark.executor.Executor - Sending result for 1901 directly to driver


btw, For the 30 times. every time I just run the third or fourth iteration, then spark get stuck, and when I use the TorrentBroadcast, it get stuck in the second iteration , can the blockManager use the disk to store the data when there is no  memory ? and is there any good method to see the memory usage in  blockManager , or  maybe I can read the data from files to avoid such problem.




On Mon, Jan 13, 2014 at 2:04 PM, Mosharaf Chowdhury <[hidden email]> wrote:
Size calculation is correct, but broadcast happens from the driver to the workers. 

btw, your code is broadcasting 400MB 30 times, which are not being evicted from the cache fast enough, which, I think, is causing blockManagers to run out of memory.


On Sun, Jan 12, 2014 at 9:34 PM, lihu <[hidden email]> wrote:
Yes, I just using the code snippet from the broadcast example, and using the spark-shell run this code.
I thought the broadcast is driver send to the executor, and the executor will send back,  is there some wrong for  calculate the broadcast size? 

val MAX_ITER = 30
val num = 100000000
var arr1 = new Array[Int](num)
    for (i <- 0 until arr1.length) {
      arr1(i) = i
}  
for (i <- 0 until MAX_ITER) {
      println("Iteration " + i)
      println("===========")
      val startTime = System.nanoTime
      val barr1 = sc.broadcast(arr1)
      sc.parallelize(1 to 10).foreach {
        i => println(barr1.value.size)
}
   println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
 }

I also try the TorrentBroadcast , it faster than the default, this is very helpful, thanks again! 
but I also get stuck during iteration, here is the log info from the master, it seem that this is the heartbeat problem.

[sparkMaster-akka.actor.default-dispatcher-26] WARN  org.apache.spark.deploy.master.Master - Got heartbeat from unregistered worker worker-20140102202153-m023.corp.typingx.me-8139
[sparkMaster-akka.actor.default-dispatcher-26] WARN  org.apache.spark.deploy.master.Master - Got heartbeat from unregistered worker worker-20140102202153-m023.corp.typingx.me-40447




On Mon, Jan 13, 2014 at 1:01 PM, Mosharaf Chowdhury <[hidden email]> wrote:
broadcast is supposed to send data from the driver to the executors and not the other direction. can you share the code snippet you are using to broadcast? 

--
Mosharaf Chowdhury
http://www.mosharaf.com/


On Sun, Jan 12, 2014 at 8:52 PM, lihu <[hidden email]> wrote:
In my opinion, the spark system is for big data, then 400M seem not big .

I read slides about the broadcast, in my understanding, the executor will send the broadcast variable back to the driver. each executor own a complete copy of the broadcast variable.

In my experiment, I have 20 machine, each machine own 2 executor, and I used the default parallelize, which is 8, so there  320  tasks in one stage in total.

then the workers will send 320*(400M/8)=16G data back to the driver, this seem very big. but I get from log that after serialize, the data size send back to driver is just 446 byte in each task. 

org.apache.spark.storage.BlockManager - Found block broadcast_5 locally
org.apache.spark.executor.Executor - Serialized size of result for 1901 is 446
org.apache.spark.executor.Executor - Sending result for 1901 directly to driver

So the total data send back to driver just 320*446byte=142720byte. this is really small in my opinion.

---------------
In summary

1.  Spark system is for big data, then 400M is not big in my opinion.
2.  I do not sure if my understanding for the broadcast is right, then the data send back to the driver may bigger?
3.  I just wonder why the serialize rate is so hight, it can serialize the 400/8=50M to just 446 byte?
4.  If it is my fault that do not run the broadcast experiment in the right way,  then I I hope the spark community can give more examples about the broadcast, this may benefit many users.






On Mon, Jan 13, 2014 at 12:22 PM, Aureliano Buendia <[hidden email]> wrote:



On Mon, Jan 13, 2014 at 4:17 AM, lihu <[hidden email]> wrote:
I have occurred the same problem with you .
I have a node of 20 machines, and I just run the broadcast example, what I do is just change the data size in the example, to 400M, this is really a small data size.

Is 400 MB a really small size for broadcasting?

I had the impression that broadcast is for object much much smaller, about less than 10 MB.
 
but I occurred the same problem with you .
So I wonder maybe the broadcast capacity is weak in the spark system?


here is my config:

SPARK_MEM=12g
SPARK_MASTER_WEBUI_PORT=12306
SPARK_WORKER_MEMORY=12g
SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g -Dspark.akka.timeout=600  -Dspark.local.dir=/disk3/lee/tmp -Dspark.worker.timeout=600 -Dspark.akka.frameSize=10000 -Dspark.akka.askTimeout=300 -Dspark.storage.blockManagerTimeoutIntervalMs=100000 -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"













On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand <[hidden email]> wrote:
If your object size > 10MB you may need to change spark.akka.frameSize.

What is your spark, spark.akka.timeOut ?

did you change   spark.akka.heartbeat.interval  ?

BTW based on large size getting broadcasted across 25 nodes, you may want to consider the frequency of such transfer and evaluate alternative patterns.




On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter <[hidden email]> wrote:
Spark repeatedly fails broadcast a large object on a cluster of 25
machines for me.

I get log messages like this:

[spark-akka.actor.default-dispatcher-4] WARN
org.apache.spark.storage.BlockManagerMasterActor - Removing BlockManager
BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no recent
heart beats: 134689ms exceeds 45000ms

Is there something wrong with my config? Do I have to increase some timeout?

Thx,
Sebastian













--
Best Wishes!

Li Hu(李浒) | Graduate Student
Institute for Interdisciplinary Information Sciences(IIIS)
Tsinghua University, China

Tel  : <a href="tel:%2B86%2015120081920" value="+8615120081920" target="_blank">+86 15120081920







Loading...