Execution blocked when collect()ing some relatively big blocks on spark 0.9

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Execution blocked when collect()ing some relatively big blocks on spark 0.9

Guillaume Pitel
Hi, we've switched from 0.8.1 to 0.9.0 on Monday, and we're facing a problems that does not seem to be obvious.

Basically, we generate a random dense matrix (2M rows * 40 columns), split it in 20, collect() it and then broadcast it.

The generation is ok, but then the workers send the blocks, and nothing happens. Spark is locked forever in this state. Here is what happens on the driver :

14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:13 as 2083 bytes in 0 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:14 as TID 2294 on executor 1: t4.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:14 as 2083 bytes in 0 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:15 as TID 2295 on executor 4: t3.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:15 as 2083 bytes in 1 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:16 as TID 2296 on executor 0: t0.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:16 as 2083 bytes in 2 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:17 as TID 2297 on executor 3: t1.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:17 as 2083 bytes in 1 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:18 as TID 2298 on executor 2: t5.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:18 as 2083 bytes in 1 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:19 as TID 2299 on executor 5: t6.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:19 as 2083 bytes in 1 ms


And on an executor :

14/02/19 15:21:53 INFO Executor: Serialized size of result for 2287 is 17229427
14/02/19 15:21:53 INFO Executor: Sending result for 2287 directly to driver
14/02/19 15:21:53 INFO Executor: Serialized size of result for 2299 is 17229262
14/02/19 15:21:53 INFO Executor: Sending result for 2299 directly to driver
14/02/19 15:21:53 INFO Executor: Finished task ID 2299
14/02/19 15:21:53 INFO Executor: Finished task ID 2287
14/02/19 15:21:53 INFO Executor: Serialized size of result for 2281 is 17229426
14/02/19 15:21:53 INFO Executor: Sending result for 2281 directly to driver
14/02/19 15:21:53 INFO Executor: Finished task ID 2281


And.... that's all.  The driver does not receive the information that the task is finished. I have a akka.frameSize=512 and a kryo buffer on 512mb

DEBUG level does not add anything (at least executor side, I didn't try on driver).

The RDD which is collected is made of (Int, (Array[Int],FloatMatrix))

Any help would be greatly appreciated.

Thanks
Guillaume
--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
Reply | Threaded
Open this post in threaded view
|

Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9

Guillaume Pitel
By the way, I've said the collect()ed blocks are relatively big because just before this collect() there are a few much smaller collectAsMap() and collect() that are not blocking. I'm really not sure it has something to do with the size, though, since I've previously done this with much bigger blocks, 20Mb is not supposed to block anything.

Guillaume
Hi, we've switched from 0.8.1 to 0.9.0 on Monday, and we're facing a problems that does not seem to be obvious.

Basically, we generate a random dense matrix (2M rows * 40 columns), split it in 20, collect() it and then broadcast it.

The generation is ok, but then the workers send the blocks, and nothing happens. Spark is locked forever in this state. Here is what happens on the driver :

14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:13 as 2083 bytes in 0 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:14 as TID 2294 on executor 1: t4.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:14 as 2083 bytes in 0 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:15 as TID 2295 on executor 4: t3.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:15 as 2083 bytes in 1 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:16 as TID 2296 on executor 0: t0.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:16 as 2083 bytes in 2 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:17 as TID 2297 on executor 3: t1.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:17 as 2083 bytes in 1 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:18 as TID 2298 on executor 2: t5.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:18 as 2083 bytes in 1 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:19 as TID 2299 on executor 5: t6.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:19 as 2083 bytes in 1 ms


And on an executor :

14/02/19 15:21:53 INFO Executor: Serialized size of result for 2287 is 17229427
14/02/19 15:21:53 INFO Executor: Sending result for 2287 directly to driver
14/02/19 15:21:53 INFO Executor: Serialized size of result for 2299 is 17229262
14/02/19 15:21:53 INFO Executor: Sending result for 2299 directly to driver
14/02/19 15:21:53 INFO Executor: Finished task ID 2299
14/02/19 15:21:53 INFO Executor: Finished task ID 2287
14/02/19 15:21:53 INFO Executor: Serialized size of result for 2281 is 17229426
14/02/19 15:21:53 INFO Executor: Sending result for 2281 directly to driver
14/02/19 15:21:53 INFO Executor: Finished task ID 2281


And.... that's all.  The driver does not receive the information that the task is finished. I have a akka.frameSize=512 and a kryo buffer on 512mb

DEBUG level does not add anything (at least executor side, I didn't try on driver).

The RDD which is collected is made of (Int, (Array[Int],FloatMatrix))

Any help would be greatly appreciated.

Thanks
Guillaume
--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
Reply | Threaded
Open this post in threaded view
|

Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9

Roshan Nair
Hi Guillaume,

We migrated on Monday as well, and something very similar seems to happen to us. 

In my case, the driver logs and the executor logs are in the same state, except that the tasks the executor reports as finished are from the previous stage. It doesn't seem to have received the new task at all. The ui says this happened in a reduceByKeyLocally. The reduceByKeyLocally is preceded by a broadcast and a mapPartitions. All three show up as a single stage - reduceByKeyLocally.

Yesterday, I was able to get past this, by increasing the amount of memory per worker from 4gb to 5gb, even though the RDD only takes about 600mb per slave (according to the app ui). The exact same code (before recompiling to scala-2.10 and spark-0.9) worked comfortably with 4gb per slave.

Roshan


On Wed, Feb 19, 2014 at 10:06 PM, Guillaume Pitel <[hidden email]> wrote:
By the way, I've said the collect()ed blocks are relatively big because just before this collect() there are a few much smaller collectAsMap() and collect() that are not blocking. I'm really not sure it has something to do with the size, though, since I've previously done this with much bigger blocks, 20Mb is not supposed to block anything.

Guillaume
Hi, we've switched from 0.8.1 to 0.9.0 on Monday, and we're facing a problems that does not seem to be obvious.

Basically, we generate a random dense matrix (2M rows * 40 columns), split it in 20, collect() it and then broadcast it.

The generation is ok, but then the workers send the blocks, and nothing happens. Spark is locked forever in this state. Here is what happens on the driver :

14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:13 as 2083 bytes in 0 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:14 as TID 2294 on executor 1: t4.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:14 as 2083 bytes in 0 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:15 as TID 2295 on executor 4: t3.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:15 as 2083 bytes in 1 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:16 as TID 2296 on executor 0: t0.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:16 as 2083 bytes in 2 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:17 as TID 2297 on executor 3: t1.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:17 as 2083 bytes in 1 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:18 as TID 2298 on executor 2: t5.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:18 as 2083 bytes in 1 ms
14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:19 as TID 2299 on executor 5: t6.exensa.loc (PROCESS_LOCAL)
14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:19 as 2083 bytes in 1 ms


And on an executor :

14/02/19 15:21:53 INFO Executor: Serialized size of result for 2287 is 17229427
14/02/19 15:21:53 INFO Executor: Sending result for 2287 directly to driver
14/02/19 15:21:53 INFO Executor: Serialized size of result for 2299 is 17229262
14/02/19 15:21:53 INFO Executor: Sending result for 2299 directly to driver
14/02/19 15:21:53 INFO Executor: Finished task ID 2299
14/02/19 15:21:53 INFO Executor: Finished task ID 2287
14/02/19 15:21:53 INFO Executor: Serialized size of result for 2281 is 17229426
14/02/19 15:21:53 INFO Executor: Sending result for 2281 directly to driver
14/02/19 15:21:53 INFO Executor: Finished task ID 2281


And.... that's all.  The driver does not receive the information that the task is finished. I have a akka.frameSize=512 and a kryo buffer on 512mb

DEBUG level does not add anything (at least executor side, I didn't try on driver).

The RDD which is collected is made of (Int, (Array[Int],FloatMatrix))

Any help would be greatly appreciated.

Thanks
Guillaume
--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05

Reply | Threaded
Open this post in threaded view
|

Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9

Guillaume Pitel
Thanks for your feedback. It's good to know that we are not alone in the dark :)

I've tried with a higher number of blocks, which naturally leads to smaller blocks being collect()ed, and it seems to work. So there must be a limit somewhere that triggers a bug.

I'm wondering wether the akka's send-buffer-size and receive-buffer-size should be modified adequately together with the maximum-frame-size ?

In akka documentation, they say this :
  • # Sets the send buffer size of the Sockets,
  • # set to 0b for platform default
  • send-buffer-size = 256000b
  •  
  • # Sets the receive buffer size of the Sockets,
  • # set to 0b for platform default
  • receive-buffer-size = 256000b
  •  
  • # Maximum message size the transport will accept, but at least
  • # 32000 bytes.
  • # Please note that UDP does not support arbitrary large datagrams,
  • # so this setting has to be chosen carefully when using UDP.
  • # Both send-buffer-size and receive-buffer-size settings has to
  • # be adjusted to be able to buffer messages of maximum size.
  • maximum- f rame-size = 128000b

  • While in spark.util.AkkaUtils, the configuration is set like that :

        val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
          ConfigFactory.parseString(
          s"""
          |akka.daemonic = on
          |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
          |akka.stdout-loglevel = "ERROR"
          |akka.jvm-exit-on-fatal-error = off
          |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
          |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
          |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
          |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
          |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
          |akka.remote.netty.tcp.hostname = "$host"
          |akka.remote.netty.tcp.port = $port
          |akka.remote.netty.tcp.tcp-nodelay = on
          |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
          |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
          |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
          |akka.actor.default-dispatcher.throughput = $akkaBatchSize
          |akka.log-config-on-start = $logAkkaConfig
          |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
          |akka.log-dead-letters = $lifecycleEvents
          |akka.log-dead-letters-during-shutdown = $lifecycleEvents
          """.stripMargin))

    For now i'm trying to see what will happen if I decrease the maximum frame size.

    Guillaume



    Hi Guillaume,

    We migrated on Monday as well, and something very similar seems to happen to us. 

    In my case, the driver logs and the executor logs are in the same state, except that the tasks the executor reports as finished are from the previous stage. It doesn't seem to have received the new task at all. The ui says this happened in a reduceByKeyLocally. The reduceByKeyLocally is preceded by a broadcast and a mapPartitions. All three show up as a single stage - reduceByKeyLocally.

    Yesterday, I was able to get past this, by increasing the amount of memory per worker from 4gb to 5gb, even though the RDD only takes about 600mb per slave (according to the app ui). The exact same code (before recompiling to scala-2.10 and spark-0.9) worked comfortably with 4gb per slave.

    Roshan


    On Wed, Feb 19, 2014 at 10:06 PM, Guillaume Pitel <[hidden email]> wrote:
    By the way, I've said the collect()ed blocks are relatively big because just before this collect() there are a few much smaller collectAsMap() and collect() that are not blocking. I'm really not sure it has something to do with the size, though, since I've previously done this with much bigger blocks, 20Mb is not supposed to block anything.

    Guillaume
    Hi, we've switched from 0.8.1 to 0.9.0 on Monday, and we're facing a problems that does not seem to be obvious.

    Basically, we generate a random dense matrix (2M rows * 40 columns), split it in 20, collect() it and then broadcast it.

    The generation is ok, but then the workers send the blocks, and nothing happens. Spark is locked forever in this state. Here is what happens on the driver :

    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:13 as 2083 bytes in 0 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:14 as TID 2294 on executor 1: t4.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:14 as 2083 bytes in 0 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:15 as TID 2295 on executor 4: t3.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:15 as 2083 bytes in 1 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:16 as TID 2296 on executor 0: t0.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:16 as 2083 bytes in 2 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:17 as TID 2297 on executor 3: t1.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:17 as 2083 bytes in 1 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:18 as TID 2298 on executor 2: t5.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:18 as 2083 bytes in 1 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:19 as TID 2299 on executor 5: t6.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:19 as 2083 bytes in 1 ms


    And on an executor :

    14/02/19 15:21:53 INFO Executor: Serialized size of result for 2287 is 17229427
    14/02/19 15:21:53 INFO Executor: Sending result for 2287 directly to driver
    14/02/19 15:21:53 INFO Executor: Serialized size of result for 2299 is 17229262
    14/02/19 15:21:53 INFO Executor: Sending result for 2299 directly to driver
    14/02/19 15:21:53 INFO Executor: Finished task ID 2299
    14/02/19 15:21:53 INFO Executor: Finished task ID 2287
    14/02/19 15:21:53 INFO Executor: Serialized size of result for 2281 is 17229426
    14/02/19 15:21:53 INFO Executor: Sending result for 2281 directly to driver
    14/02/19 15:21:53 INFO Executor: Finished task ID 2281


    And.... that's all.  The driver does not receive the information that the task is finished. I have a akka.frameSize=512 and a kryo buffer on 512mb

    DEBUG level does not add anything (at least executor side, I didn't try on driver).

    The RDD which is collected is made of (Int, (Array[Int],FloatMatrix))

    Any help would be greatly appreciated.

    Thanks
    Guillaume
    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05



    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
    Reply | Threaded
    Open this post in threaded view
    |

    Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9

    Roshan Nair
    Since, I saw your email yesterday, I reran my job several times. I've compiled spark-0.9.0 with hadoop cdh version 4.2.1.

    I can confirm that the issue happens regardless of what memory I specify for the workers. What complicates it, is that it happens intermittently. A lot of times the app runs till completion, but on occasion, it will just hang. The driver says, its sent all the tasks for the next stage. The executors, however, stop logging after sending back the results of the completed tasks from the previous stage.

    I was suspecting gc pauses, but I don't see much gc activity happening around that point.

    Roshan



    On Thu, Feb 20, 2014 at 2:36 PM, Guillaume Pitel <[hidden email]> wrote:
    Thanks for your feedback. It's good to know that we are not alone in the dark :)

    I've tried with a higher number of blocks, which naturally leads to smaller blocks being collect()ed, and it seems to work. So there must be a limit somewhere that triggers a bug.

    I'm wondering wether the akka's send-buffer-size and receive-buffer-size should be modified adequately together with the maximum-frame-size ?

    In akka documentation, they say this :
  • # Sets the send buffer size of the Sockets,
  • # set to 0b for platform default
  • send-buffer-size = 256000b
  •  
  • # Sets the receive buffer size of the Sockets,
  • # set to 0b for platform default
  • receive-buffer-size = 256000b
  •  
  • # Maximum message size the transport will accept, but at least
  • # 32000 bytes.
  • # Please note that UDP does not support arbitrary large datagrams,
  • # so this setting has to be chosen carefully when using UDP.
  • # Both send-buffer-size and receive-buffer-size settings has to
  • # be adjusted to be able to buffer messages of maximum size.
  • maximum- f rame-size = 128000b

  • While in spark.util.AkkaUtils, the configuration is set like that :

        val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
          ConfigFactory.parseString(
          s"""
          |akka.daemonic = on
          |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
          |akka.stdout-loglevel = "ERROR"
          |akka.jvm-exit-on-fatal-error = off
          |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
          |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
          |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
          |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
          |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
          |akka.remote.netty.tcp.hostname = "$host"
          |akka.remote.netty.tcp.port = $port
          |akka.remote.netty.tcp.tcp-nodelay = on
          |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
          |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
          |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
          |akka.actor.default-dispatcher.throughput = $akkaBatchSize
          |akka.log-config-on-start = $logAkkaConfig
          |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
          |akka.log-dead-letters = $lifecycleEvents
          |akka.log-dead-letters-during-shutdown = $lifecycleEvents
          """.stripMargin))

    For now i'm trying to see what will happen if I decrease the maximum frame size.

    Guillaume



    Hi Guillaume,

    We migrated on Monday as well, and something very similar seems to happen to us. 

    In my case, the driver logs and the executor logs are in the same state, except that the tasks the executor reports as finished are from the previous stage. It doesn't seem to have received the new task at all. The ui says this happened in a reduceByKeyLocally. The reduceByKeyLocally is preceded by a broadcast and a mapPartitions. All three show up as a single stage - reduceByKeyLocally.

    Yesterday, I was able to get past this, by increasing the amount of memory per worker from 4gb to 5gb, even though the RDD only takes about 600mb per slave (according to the app ui). The exact same code (before recompiling to scala-2.10 and spark-0.9) worked comfortably with 4gb per slave.

    Roshan


    On Wed, Feb 19, 2014 at 10:06 PM, Guillaume Pitel <[hidden email]> wrote:
    By the way, I've said the collect()ed blocks are relatively big because just before this collect() there are a few much smaller collectAsMap() and collect() that are not blocking. I'm really not sure it has something to do with the size, though, since I've previously done this with much bigger blocks, 20Mb is not supposed to block anything.

    Guillaume
    Hi, we've switched from 0.8.1 to 0.9.0 on Monday, and we're facing a problems that does not seem to be obvious.

    Basically, we generate a random dense matrix (2M rows * 40 columns), split it in 20, collect() it and then broadcast it.

    The generation is ok, but then the workers send the blocks, and nothing happens. Spark is locked forever in this state. Here is what happens on the driver :

    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:13 as 2083 bytes in 0 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:14 as TID 2294 on executor 1: t4.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:14 as 2083 bytes in 0 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:15 as TID 2295 on executor 4: t3.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:15 as 2083 bytes in 1 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:16 as TID 2296 on executor 0: t0.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:16 as 2083 bytes in 2 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:17 as TID 2297 on executor 3: t1.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:17 as 2083 bytes in 1 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:18 as TID 2298 on executor 2: t5.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:18 as 2083 bytes in 1 ms
    14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:19 as TID 2299 on executor 5: t6.exensa.loc (PROCESS_LOCAL)
    14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:19 as 2083 bytes in 1 ms


    And on an executor :

    14/02/19 15:21:53 INFO Executor: Serialized size of result for 2287 is 17229427
    14/02/19 15:21:53 INFO Executor: Sending result for 2287 directly to driver
    14/02/19 15:21:53 INFO Executor: Serialized size of result for 2299 is 17229262
    14/02/19 15:21:53 INFO Executor: Sending result for 2299 directly to driver
    14/02/19 15:21:53 INFO Executor: Finished task ID 2299
    14/02/19 15:21:53 INFO Executor: Finished task ID 2287
    14/02/19 15:21:53 INFO Executor: Serialized size of result for 2281 is 17229426
    14/02/19 15:21:53 INFO Executor: Sending result for 2281 directly to driver
    14/02/19 15:21:53 INFO Executor: Finished task ID 2281


    And.... that's all.  The driver does not receive the information that the task is finished. I have a akka.frameSize=512 and a kryo buffer on 512mb

    DEBUG level does not add anything (at least executor side, I didn't try on driver).

    The RDD which is collected is made of (Int, (Array[Int],FloatMatrix))

    Any help would be greatly appreciated.

    Thanks
    Guillaume
    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05



    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9

    Guillaume Pitel

    runs till completion, but on occasion, it will just hang. The driver says, its sent all the tasks for the next stage. The executors, however, stop logging after sending back the results of the completed tasks from the previous stage.

    Do you know what's the size of the result sent back to the driver when it fails ? It should be logged



    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
    Reply | Threaded
    Open this post in threaded view
    |

    Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9

    Guillaume Pitel
    So, the problem occur when the size of the data is > 10MB, even though I've set the akka.frameSize to 512 (and it's showing this in the environment)

    I've had to set this to 512 on SPark 0.8.0, and I think it's not needed anymore since 0.8.1, so I'm going to go back to 10MB to see what's happening.

    But anyway, it's a bug if akka.frameSize cause problems.

    Guillaume

    runs till completion, but on occasion, it will just hang. The driver says, its sent all the tasks for the next stage. The executors, however, stop logging after sending back the results of the completed tasks from the previous stage.

    Do you know what's the size of the result sent back to the driver when it fails ? It should be logged



    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
    Reply | Threaded
    Open this post in threaded view
    |

    Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9

    Roshan Nair

    In my case it hangs right after the driver sends tasks to the executors The executors never get the task.

    Yes, the drivers print the serialized task sizes and they are roughly 10MB.

    I've set framesize to 160mb and I was planning to bump it up to 500mb.

    Do you suspect that the framesize  configuration is not being picked up by spark - ie. akka is still using the default 10mb framesize?

    I noticed that AkkaUtils has a flag to print the configuration. I'll try this tomorrow.

    Roshan

    On Feb 20, 2014 8:19 PM, "Guillaume Pitel" <[hidden email]> wrote:
    So, the problem occur when the size of the data is > 10MB, even though I've set the akka.frameSize to 512 (and it's showing this in the environment)

    I've had to set this to 512 on SPark 0.8.0, and I think it's not needed anymore since 0.8.1, so I'm going to go back to 10MB to see what's happening.

    But anyway, it's a bug if akka.frameSize cause problems.

    Guillaume

    runs till completion, but on occasion, it will just hang. The driver says, its sent all the tasks for the next stage. The executors, however, stop logging after sending back the results of the completed tasks from the previous stage.

    Do you know what's the size of the result sent back to the driver when it fails ? It should be logged



    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
    Reply | Threaded
    Open this post in threaded view
    |

    Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9

    Guillaume Pitel
    I'm sure now.

    Setting the frameSize back to the default 10MB works, because since 0.8.1, messages bigger than akka frameSize are not sent directly, but given to the blockManager.

    I don't see why, though. I thought it was a problem with the configuration, but since the Executor checks the property itself, it must mean that either akka does fail over 10MB (unlikely), or the property name has changed

    Anyway, as a workaround, set the frameSize to 10 and you should be OK

    Guillaume

    In my case it hangs right after the driver sends tasks to the executors The executors never get the task.

    Yes, the drivers print the serialized task sizes and they are roughly 10MB.

    I've set framesize to 160mb and I was planning to bump it up to 500mb.

    Do you suspect that the framesize  configuration is not being picked up by spark - ie. akka is still using the default 10mb framesize?

    I noticed that AkkaUtils has a flag to print the configuration. I'll try this tomorrow.

    Roshan

    On Feb 20, 2014 8:19 PM, "Guillaume Pitel" <[hidden email]> wrote:
    So, the problem occur when the size of the data is > 10MB, even though I've set the akka.frameSize to 512 (and it's showing this in the environment)

    I've had to set this to 512 on SPark 0.8.0, and I think it's not needed anymore since 0.8.1, so I'm going to go back to 10MB to see what's happening.

    But anyway, it's a bug if akka.frameSize cause problems.

    Guillaume

    runs till completion, but on occasion, it will just hang. The driver says, its sent all the tasks for the next stage. The executors, however, stop logging after sending back the results of the completed tasks from the previous stage.

    Do you know what's the size of the result sent back to the driver when it fails ? It should be logged



    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
    Reply | Threaded
    Open this post in threaded view
    |

    Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9

    Nan Zhu
    Hi, 


    so that someone can get this fixed after he/she find the behind reason

    Best,

    -- 
    Nan Zhu

    On Thursday, February 20, 2014 at 10:57 AM, Guillaume Pitel wrote:

    I'm sure now.

    Setting the frameSize back to the default 10MB works, because since 0.8.1, messages bigger than akka frameSize are not sent directly, but given to the blockManager.

    I don't see why, though. I thought it was a problem with the configuration, but since the Executor checks the property itself, it must mean that either akka does fail over 10MB (unlikely), or the property name has changed

    Anyway, as a workaround, set the frameSize to 10 and you should be OK

    Guillaume

    In my case it hangs right after the driver sends tasks to the executors The executors never get the task.

    Yes, the drivers print the serialized task sizes and they are roughly 10MB.

    I've set framesize to 160mb and I was planning to bump it up to 500mb.

    Do you suspect that the framesize  configuration is not being picked up by spark - ie. akka is still using the default 10mb framesize?

    I noticed that AkkaUtils has a flag to print the configuration. I'll try this tomorrow.

    Roshan

    On Feb 20, 2014 8:19 PM, "Guillaume Pitel" <[hidden email]> wrote:
    So, the problem occur when the size of the data is > 10MB, even though I've set the akka.frameSize to 512 (and it's showing this in the environment)

    I've had to set this to 512 on SPark 0.8.0, and I think it's not needed anymore since 0.8.1, so I'm going to go back to 10MB to see what's happening.

    But anyway, it's a bug if akka.frameSize cause problems.

    Guillaume

    runs till completion, but on occasion, it will just hang. The driver says, its sent all the tasks for the next stage. The executors, however, stop logging after sending back the results of the completed tasks from the previous stage.

    Do you know what's the size of the result sent back to the driver when it fails ? It should be logged



    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9

    Roshan Nair
    >A lot of times the app runs till completion, but on occasion, it will just hang. The driver says, its sent all the tasks for the next stage. The executors, however, stop logging after sending back the results of the completed tasks from the previous stage.

    I've confirmed that in my case, serialized tasks don't reach the executor when they are over 10MB, even though frameSize is set to 160. In the cases, where the stage completed, the serialized tasks were just under 10MB. 

    However, the suggested workaround of setting spark.akka.frameSize to 10 does not work for me.

    I've commented on this at the issue opened by Guillaume at https://spark-project.atlassian.net/browse/SPARK-1112 

    Roshan


    On Thu, Feb 20, 2014 at 9:34 PM, Nan Zhu <[hidden email]> wrote:
    Hi, 


    so that someone can get this fixed after he/she find the behind reason

    Best,

    -- 
    Nan Zhu

    On Thursday, February 20, 2014 at 10:57 AM, Guillaume Pitel wrote:

    I'm sure now.

    Setting the frameSize back to the default 10MB works, because since 0.8.1, messages bigger than akka frameSize are not sent directly, but given to the blockManager.

    I don't see why, though. I thought it was a problem with the configuration, but since the Executor checks the property itself, it must mean that either akka does fail over 10MB (unlikely), or the property name has changed

    Anyway, as a workaround, set the frameSize to 10 and you should be OK

    Guillaume

    In my case it hangs right after the driver sends tasks to the executors The executors never get the task.

    Yes, the drivers print the serialized task sizes and they are roughly 10MB.

    I've set framesize to 160mb and I was planning to bump it up to 500mb.

    Do you suspect that the framesize  configuration is not being picked up by spark - ie. akka is still using the default 10mb framesize?

    I noticed that AkkaUtils has a flag to print the configuration. I'll try this tomorrow.

    Roshan

    On Feb 20, 2014 8:19 PM, "Guillaume Pitel" <[hidden email]> wrote:
    So, the problem occur when the size of the data is > 10MB, even though I've set the akka.frameSize to 512 (and it's showing this in the environment)

    I've had to set this to 512 on SPark 0.8.0, and I think it's not needed anymore since 0.8.1, so I'm going to go back to 10MB to see what's happening.

    But anyway, it's a bug if akka.frameSize cause problems.

    Guillaume

    runs till completion, but on occasion, it will just hang. The driver says, its sent all the tasks for the next stage. The executors, however, stop logging after sending back the results of the completed tasks from the previous stage.

    Do you know what's the size of the result sent back to the driver when it fails ? It should be logged



    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


    --
    eXenSa
    Guillaume PITEL, Président
    +33(0)6 25 48 86 80

    eXenSa S.A.S.
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05