How to force-quit a Spark application?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

How to force-quit a Spark application?

Pola Yao
I submitted a Spark job through ./spark-submit command, the code was executed successfully, however, the application got stuck when trying to quit spark.

My code snippet:
'''
{
val spark = SparkSession.builder.master(...).getOrCreate

val pool = Executors.newFixedThreadPool(3)
implicit val xc = ExecutionContext.fromExecutorService(pool)
val taskList = List(train1, train2, train3)  // where train* is a Future function which wrapped up some data reading and feature engineering and machine learning steps
val results = Await.result(Future.sequence(taskList), 20 minutes)

println("Shutting down pool and executor service")
pool.shutdown()
xc.shutdown()

println("Exiting spark")
spark.stop()
}
'''

After I submitted the job, from terminal, I could see the code was executed and printing "Exiting spark", however, after printing that line, it never existed spark, just got stuck.

Does any body know what the reason is? Or how to force quitting?

Thanks!


Reply | Threaded
Open this post in threaded view
|

Re: How to force-quit a Spark application?

Marcelo Vanzin-2
You should check the active threads in your app. Since your pool uses
non-daemon threads, that will prevent the app from exiting.

spark.stop() should have stopped the Spark jobs in other threads, at
least. But if something is blocking one of those threads, or if
something is creating a non-daemon thread that stays alive somewhere,
you'll see that.

Or you can force quit with sys.exit.

On Tue, Jan 15, 2019 at 1:30 PM Pola Yao <[hidden email]> wrote:

>
> I submitted a Spark job through ./spark-submit command, the code was executed successfully, however, the application got stuck when trying to quit spark.
>
> My code snippet:
> '''
> {
>
> val spark = SparkSession.builder.master(...).getOrCreate
>
> val pool = Executors.newFixedThreadPool(3)
> implicit val xc = ExecutionContext.fromExecutorService(pool)
> val taskList = List(train1, train2, train3)  // where train* is a Future function which wrapped up some data reading and feature engineering and machine learning steps
> val results = Await.result(Future.sequence(taskList), 20 minutes)
>
> println("Shutting down pool and executor service")
> pool.shutdown()
> xc.shutdown()
>
> println("Exiting spark")
> spark.stop()
>
> }
> '''
>
> After I submitted the job, from terminal, I could see the code was executed and printing "Exiting spark", however, after printing that line, it never existed spark, just got stuck.
>
> Does any body know what the reason is? Or how to force quitting?
>
> Thanks!
>
>


--
Marcelo

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to force-quit a Spark application?

Pola Yao
Hi Marcelo,

Thanks for your reply! It made sense to me. However, I've tried many ways to exit the spark (e.g., System.exit()), but failed. Is there an explicit way to shutdown all the alive threads in the spark application and then quit afterwards?


On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin <[hidden email]> wrote:
You should check the active threads in your app. Since your pool uses
non-daemon threads, that will prevent the app from exiting.

spark.stop() should have stopped the Spark jobs in other threads, at
least. But if something is blocking one of those threads, or if
something is creating a non-daemon thread that stays alive somewhere,
you'll see that.

Or you can force quit with sys.exit.

On Tue, Jan 15, 2019 at 1:30 PM Pola Yao <[hidden email]> wrote:
>
> I submitted a Spark job through ./spark-submit command, the code was executed successfully, however, the application got stuck when trying to quit spark.
>
> My code snippet:
> '''
> {
>
> val spark = SparkSession.builder.master(...).getOrCreate
>
> val pool = Executors.newFixedThreadPool(3)
> implicit val xc = ExecutionContext.fromExecutorService(pool)
> val taskList = List(train1, train2, train3)  // where train* is a Future function which wrapped up some data reading and feature engineering and machine learning steps
> val results = Await.result(Future.sequence(taskList), 20 minutes)
>
> println("Shutting down pool and executor service")
> pool.shutdown()
> xc.shutdown()
>
> println("Exiting spark")
> spark.stop()
>
> }
> '''
>
> After I submitted the job, from terminal, I could see the code was executed and printing "Exiting spark", however, after printing that line, it never existed spark, just got stuck.
>
> Does any body know what the reason is? Or how to force quitting?
>
> Thanks!
>
>


--
Marcelo
Reply | Threaded
Open this post in threaded view
|

Re: How to force-quit a Spark application?

Marcelo Vanzin-2
If System.exit() doesn't work, you may have a bigger problem
somewhere. Check your threads (using e.g. jstack) to see what's going
on.

On Wed, Jan 16, 2019 at 8:09 AM Pola Yao <[hidden email]> wrote:

>
> Hi Marcelo,
>
> Thanks for your reply! It made sense to me. However, I've tried many ways to exit the spark (e.g., System.exit()), but failed. Is there an explicit way to shutdown all the alive threads in the spark application and then quit afterwards?
>
>
> On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin <[hidden email]> wrote:
>>
>> You should check the active threads in your app. Since your pool uses
>> non-daemon threads, that will prevent the app from exiting.
>>
>> spark.stop() should have stopped the Spark jobs in other threads, at
>> least. But if something is blocking one of those threads, or if
>> something is creating a non-daemon thread that stays alive somewhere,
>> you'll see that.
>>
>> Or you can force quit with sys.exit.
>>
>> On Tue, Jan 15, 2019 at 1:30 PM Pola Yao <[hidden email]> wrote:
>> >
>> > I submitted a Spark job through ./spark-submit command, the code was executed successfully, however, the application got stuck when trying to quit spark.
>> >
>> > My code snippet:
>> > '''
>> > {
>> >
>> > val spark = SparkSession.builder.master(...).getOrCreate
>> >
>> > val pool = Executors.newFixedThreadPool(3)
>> > implicit val xc = ExecutionContext.fromExecutorService(pool)
>> > val taskList = List(train1, train2, train3)  // where train* is a Future function which wrapped up some data reading and feature engineering and machine learning steps
>> > val results = Await.result(Future.sequence(taskList), 20 minutes)
>> >
>> > println("Shutting down pool and executor service")
>> > pool.shutdown()
>> > xc.shutdown()
>> >
>> > println("Exiting spark")
>> > spark.stop()
>> >
>> > }
>> > '''
>> >
>> > After I submitted the job, from terminal, I could see the code was executed and printing "Exiting spark", however, after printing that line, it never existed spark, just got stuck.
>> >
>> > Does any body know what the reason is? Or how to force quitting?
>> >
>> > Thanks!
>> >
>> >
>>
>>
>> --
>> Marcelo



--
Marcelo

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to force-quit a Spark application?

Pola Yao
Hi Marcelo,

Thanks for your response. 

I have dumped the threads on the server where I submitted the spark application:
'''
...
"dispatcher-event-loop-2" #28 daemon prio=5 os_prio=0 tid=0x00007f56cee0e000 nid=0x1cb6 waiting on condition [0x00007f5699811000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006400161b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"dispatcher-event-loop-1" #27 daemon prio=5 os_prio=0 tid=0x00007f56cee0c800 nid=0x1cb5 waiting on condition [0x00007f5699912000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006400161b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"dispatcher-event-loop-0" #26 daemon prio=5 os_prio=0 tid=0x00007f56cee0c000 nid=0x1cb4 waiting on condition [0x00007f569a120000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000006400161b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"Service Thread" #20 daemon prio=9 os_prio=0 tid=0x00007f56cc12d800 nid=0x1ca5 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread14" #19 daemon prio=9 os_prio=0 tid=0x00007f56cc12a000 nid=0x1ca4 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
...
"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f56cc0ce000 nid=0x1c93 in Object.wait() [0x00007f56ab3f2000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000006400cd498> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f56cc0c9800 nid=0x1c92 in Object.wait() [0x00007f55cfffe000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000006400a2660> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=0 tid=0x00007f56cc021000 nid=0x1c74 in Object.wait() [0x00007f56d344c000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1249)
- locked <0x000000064056f6a0> (a org.apache.hadoop.util.ShutdownHookManager$1)
at java.lang.Thread.join(Thread.java:1323)
at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
at java.lang.Shutdown.runHooks(Shutdown.java:123)
at java.lang.Shutdown.sequence(Shutdown.java:167)
at java.lang.Shutdown.exit(Shutdown.java:212)
- locked <0x00000006404e65b8> (a java.lang.Class for java.lang.Shutdown)
at java.lang.Runtime.exit(Runtime.java:109)
at java.lang.System.exit(System.java:971)
at scala.sys.package$.exit(package.scala:40)
at scala.sys.package$.exit(package.scala:33)
at actionmodel.ParallelAdvertiserBeaconModel$.main(ParallelAdvertiserBeaconModel.scala:252)
at actionmodel.ParallelAdvertiserBeaconModel.main(ParallelAdvertiserBeaconModel.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

"VM Thread" os_prio=0 tid=0x00007f56cc0c1800 nid=0x1c91 runnable 
...
'''
I have no clear idea what went wrong. I did call  awaitTermination to terminate the thread pool. Or is there any way to force close all those 'WAITING' threads associated with my spark application?

On Wed, Jan 16, 2019 at 8:31 AM Marcelo Vanzin <[hidden email]> wrote:
If System.exit() doesn't work, you may have a bigger problem
somewhere. Check your threads (using e.g. jstack) to see what's going
on.

On Wed, Jan 16, 2019 at 8:09 AM Pola Yao <[hidden email]> wrote:
>
> Hi Marcelo,
>
> Thanks for your reply! It made sense to me. However, I've tried many ways to exit the spark (e.g., System.exit()), but failed. Is there an explicit way to shutdown all the alive threads in the spark application and then quit afterwards?
>
>
> On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin <[hidden email]> wrote:
>>
>> You should check the active threads in your app. Since your pool uses
>> non-daemon threads, that will prevent the app from exiting.
>>
>> spark.stop() should have stopped the Spark jobs in other threads, at
>> least. But if something is blocking one of those threads, or if
>> something is creating a non-daemon thread that stays alive somewhere,
>> you'll see that.
>>
>> Or you can force quit with sys.exit.
>>
>> On Tue, Jan 15, 2019 at 1:30 PM Pola Yao <[hidden email]> wrote:
>> >
>> > I submitted a Spark job through ./spark-submit command, the code was executed successfully, however, the application got stuck when trying to quit spark.
>> >
>> > My code snippet:
>> > '''
>> > {
>> >
>> > val spark = SparkSession.builder.master(...).getOrCreate
>> >
>> > val pool = Executors.newFixedThreadPool(3)
>> > implicit val xc = ExecutionContext.fromExecutorService(pool)
>> > val taskList = List(train1, train2, train3)  // where train* is a Future function which wrapped up some data reading and feature engineering and machine learning steps
>> > val results = Await.result(Future.sequence(taskList), 20 minutes)
>> >
>> > println("Shutting down pool and executor service")
>> > pool.shutdown()
>> > xc.shutdown()
>> >
>> > println("Exiting spark")
>> > spark.stop()
>> >
>> > }
>> > '''
>> >
>> > After I submitted the job, from terminal, I could see the code was executed and printing "Exiting spark", however, after printing that line, it never existed spark, just got stuck.
>> >
>> > Does any body know what the reason is? Or how to force quitting?
>> >
>> > Thanks!
>> >
>> >
>>
>>
>> --
>> Marcelo



--
Marcelo
Reply | Threaded
Open this post in threaded view
|

Re: How to force-quit a Spark application?

Marcelo Vanzin-2
Those are daemon threads and not the cause of the problem. The main
thread is waiting for the "org.apache.hadoop.util.ShutdownHookManager"
thread, but I don't see that one in your list.

On Wed, Jan 16, 2019 at 12:08 PM Pola Yao <[hidden email]> wrote:

>
> Hi Marcelo,
>
> Thanks for your response.
>
> I have dumped the threads on the server where I submitted the spark application:
>
> '''
> ...
> "dispatcher-event-loop-2" #28 daemon prio=5 os_prio=0 tid=0x00007f56cee0e000 nid=0x1cb6 waiting on condition [0x00007f5699811000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000006400161b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "dispatcher-event-loop-1" #27 daemon prio=5 os_prio=0 tid=0x00007f56cee0c800 nid=0x1cb5 waiting on condition [0x00007f5699912000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000006400161b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "dispatcher-event-loop-0" #26 daemon prio=5 os_prio=0 tid=0x00007f56cee0c000 nid=0x1cb4 waiting on condition [0x00007f569a120000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000006400161b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "Service Thread" #20 daemon prio=9 os_prio=0 tid=0x00007f56cc12d800 nid=0x1ca5 runnable [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "C1 CompilerThread14" #19 daemon prio=9 os_prio=0 tid=0x00007f56cc12a000 nid=0x1ca4 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
> ...
> "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f56cc0ce000 nid=0x1c93 in Object.wait() [0x00007f56ab3f2000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000006400cd498> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
>
> "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f56cc0c9800 nid=0x1c92 in Object.wait() [0x00007f55cfffe000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
> - locked <0x00000006400a2660> (a java.lang.ref.Reference$Lock)
> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
>
> "main" #1 prio=5 os_prio=0 tid=0x00007f56cc021000 nid=0x1c74 in Object.wait() [0x00007f56d344c000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Thread.join(Thread.java:1249)
> - locked <0x000000064056f6a0> (a org.apache.hadoop.util.ShutdownHookManager$1)
> at java.lang.Thread.join(Thread.java:1323)
> at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
> at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
> at java.lang.Shutdown.runHooks(Shutdown.java:123)
> at java.lang.Shutdown.sequence(Shutdown.java:167)
> at java.lang.Shutdown.exit(Shutdown.java:212)
> - locked <0x00000006404e65b8> (a java.lang.Class for java.lang.Shutdown)
> at java.lang.Runtime.exit(Runtime.java:109)
> at java.lang.System.exit(System.java:971)
> at scala.sys.package$.exit(package.scala:40)
> at scala.sys.package$.exit(package.scala:33)
> at actionmodel.ParallelAdvertiserBeaconModel$.main(ParallelAdvertiserBeaconModel.scala:252)
> at actionmodel.ParallelAdvertiserBeaconModel.main(ParallelAdvertiserBeaconModel.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> "VM Thread" os_prio=0 tid=0x00007f56cc0c1800 nid=0x1c91 runnable
> ...
> '''
>
> I have no clear idea what went wrong. I did call  awaitTermination to terminate the thread pool. Or is there any way to force close all those 'WAITING' threads associated with my spark application?
>
> On Wed, Jan 16, 2019 at 8:31 AM Marcelo Vanzin <[hidden email]> wrote:
>>
>> If System.exit() doesn't work, you may have a bigger problem
>> somewhere. Check your threads (using e.g. jstack) to see what's going
>> on.
>>
>> On Wed, Jan 16, 2019 at 8:09 AM Pola Yao <[hidden email]> wrote:
>> >
>> > Hi Marcelo,
>> >
>> > Thanks for your reply! It made sense to me. However, I've tried many ways to exit the spark (e.g., System.exit()), but failed. Is there an explicit way to shutdown all the alive threads in the spark application and then quit afterwards?
>> >
>> >
>> > On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin <[hidden email]> wrote:
>> >>
>> >> You should check the active threads in your app. Since your pool uses
>> >> non-daemon threads, that will prevent the app from exiting.
>> >>
>> >> spark.stop() should have stopped the Spark jobs in other threads, at
>> >> least. But if something is blocking one of those threads, or if
>> >> something is creating a non-daemon thread that stays alive somewhere,
>> >> you'll see that.
>> >>
>> >> Or you can force quit with sys.exit.
>> >>
>> >> On Tue, Jan 15, 2019 at 1:30 PM Pola Yao <[hidden email]> wrote:
>> >> >
>> >> > I submitted a Spark job through ./spark-submit command, the code was executed successfully, however, the application got stuck when trying to quit spark.
>> >> >
>> >> > My code snippet:
>> >> > '''
>> >> > {
>> >> >
>> >> > val spark = SparkSession.builder.master(...).getOrCreate
>> >> >
>> >> > val pool = Executors.newFixedThreadPool(3)
>> >> > implicit val xc = ExecutionContext.fromExecutorService(pool)
>> >> > val taskList = List(train1, train2, train3)  // where train* is a Future function which wrapped up some data reading and feature engineering and machine learning steps
>> >> > val results = Await.result(Future.sequence(taskList), 20 minutes)
>> >> >
>> >> > println("Shutting down pool and executor service")
>> >> > pool.shutdown()
>> >> > xc.shutdown()
>> >> >
>> >> > println("Exiting spark")
>> >> > spark.stop()
>> >> >
>> >> > }
>> >> > '''
>> >> >
>> >> > After I submitted the job, from terminal, I could see the code was executed and printing "Exiting spark", however, after printing that line, it never existed spark, just got stuck.
>> >> >
>> >> > Does any body know what the reason is? Or how to force quitting?
>> >> >
>> >> > Thanks!
>> >> >
>> >> >
>> >>
>> >>
>> >> --
>> >> Marcelo
>>
>>
>>
>> --
>> Marcelo



--
Marcelo

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to force-quit a Spark application?

Pola Yao
Hi Marcelo,

I have dumped through jstack, and saw the ShutdownHookManager :
'''
"Thread-1" #19 prio=5 os_prio=0 tid=0x00007f9b6828e800 nid=0x77cb waiting on condition [0x00007f9a123e3000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000005408a5420> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
        at org.apache.spark.scheduler.AsyncEventQueue.stop(AsyncEventQueue.scala:131)
        at org.apache.spark.scheduler.LiveListenerBus$$anonfun$stop$1.apply(LiveListenerBus.scala:219)
        at org.apache.spark.scheduler.LiveListenerBus$$anonfun$stop$1.apply(LiveListenerBus.scala:219)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.scheduler.LiveListenerBus.stop(LiveListenerBus.scala:219)
        - locked <0x00000005400c8f40> (a org.apache.spark.scheduler.LiveListenerBus)
        at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1915)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1914)
        at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:572)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
......

"main" #1 prio=5 os_prio=0 tid=0x00007f9d50020000 nid=0x6a25 in Object.wait() [0x00007f9d58f69000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1249)
        - locked <0x00000005404fe248> (a org.apache.hadoop.util.ShutdownHookManager$1)
        at java.lang.Thread.join(Thread.java:1323)
        at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
        at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
        at java.lang.Shutdown.runHooks(Shutdown.java:123)
        at java.lang.Shutdown.sequence(Shutdown.java:167)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - locked <0x00000005404938f0> (a java.lang.Class for java.lang.Shutdown)
        at java.lang.Runtime.exit(Runtime.java:109)
        at java.lang.System.exit(System.java:971)
        at scala.sys.package$.exit(package.scala:40)
        at scala.sys.package$.exit(package.scala:33)
        at actionmodel.ParallelAdvertiserBeaconModel$.main(ParallelAdvertiserBeaconModel.scala:253)
        at actionmodel.ParallelAdvertiserBeaconModel.main(ParallelAdvertiserBeaconModel.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
...
'''

What shall I do then? Thanks!


On Wed, Jan 16, 2019 at 1:15 PM Marcelo Vanzin <[hidden email]> wrote:
Those are daemon threads and not the cause of the problem. The main
thread is waiting for the "org.apache.hadoop.util.ShutdownHookManager"
thread, but I don't see that one in your list.

On Wed, Jan 16, 2019 at 12:08 PM Pola Yao <[hidden email]> wrote:
>
> Hi Marcelo,
>
> Thanks for your response.
>
> I have dumped the threads on the server where I submitted the spark application:
>
> '''
> ...
> "dispatcher-event-loop-2" #28 daemon prio=5 os_prio=0 tid=0x00007f56cee0e000 nid=0x1cb6 waiting on condition [0x00007f5699811000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000006400161b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "dispatcher-event-loop-1" #27 daemon prio=5 os_prio=0 tid=0x00007f56cee0c800 nid=0x1cb5 waiting on condition [0x00007f5699912000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000006400161b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "dispatcher-event-loop-0" #26 daemon prio=5 os_prio=0 tid=0x00007f56cee0c000 nid=0x1cb4 waiting on condition [0x00007f569a120000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000006400161b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "Service Thread" #20 daemon prio=9 os_prio=0 tid=0x00007f56cc12d800 nid=0x1ca5 runnable [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "C1 CompilerThread14" #19 daemon prio=9 os_prio=0 tid=0x00007f56cc12a000 nid=0x1ca4 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
> ...
> "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f56cc0ce000 nid=0x1c93 in Object.wait() [0x00007f56ab3f2000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000006400cd498> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
>
> "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f56cc0c9800 nid=0x1c92 in Object.wait() [0x00007f55cfffe000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
> - locked <0x00000006400a2660> (a java.lang.ref.Reference$Lock)
> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
>
> "main" #1 prio=5 os_prio=0 tid=0x00007f56cc021000 nid=0x1c74 in Object.wait() [0x00007f56d344c000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Thread.join(Thread.java:1249)
> - locked <0x000000064056f6a0> (a org.apache.hadoop.util.ShutdownHookManager$1)
> at java.lang.Thread.join(Thread.java:1323)
> at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
> at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
> at java.lang.Shutdown.runHooks(Shutdown.java:123)
> at java.lang.Shutdown.sequence(Shutdown.java:167)
> at java.lang.Shutdown.exit(Shutdown.java:212)
> - locked <0x00000006404e65b8> (a java.lang.Class for java.lang.Shutdown)
> at java.lang.Runtime.exit(Runtime.java:109)
> at java.lang.System.exit(System.java:971)
> at scala.sys.package$.exit(package.scala:40)
> at scala.sys.package$.exit(package.scala:33)
> at actionmodel.ParallelAdvertiserBeaconModel$.main(ParallelAdvertiserBeaconModel.scala:252)
> at actionmodel.ParallelAdvertiserBeaconModel.main(ParallelAdvertiserBeaconModel.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> "VM Thread" os_prio=0 tid=0x00007f56cc0c1800 nid=0x1c91 runnable
> ...
> '''
>
> I have no clear idea what went wrong. I did call  awaitTermination to terminate the thread pool. Or is there any way to force close all those 'WAITING' threads associated with my spark application?
>
> On Wed, Jan 16, 2019 at 8:31 AM Marcelo Vanzin <[hidden email]> wrote:
>>
>> If System.exit() doesn't work, you may have a bigger problem
>> somewhere. Check your threads (using e.g. jstack) to see what's going
>> on.
>>
>> On Wed, Jan 16, 2019 at 8:09 AM Pola Yao <[hidden email]> wrote:
>> >
>> > Hi Marcelo,
>> >
>> > Thanks for your reply! It made sense to me. However, I've tried many ways to exit the spark (e.g., System.exit()), but failed. Is there an explicit way to shutdown all the alive threads in the spark application and then quit afterwards?
>> >
>> >
>> > On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin <[hidden email]> wrote:
>> >>
>> >> You should check the active threads in your app. Since your pool uses
>> >> non-daemon threads, that will prevent the app from exiting.
>> >>
>> >> spark.stop() should have stopped the Spark jobs in other threads, at
>> >> least. But if something is blocking one of those threads, or if
>> >> something is creating a non-daemon thread that stays alive somewhere,
>> >> you'll see that.
>> >>
>> >> Or you can force quit with sys.exit.
>> >>
>> >> On Tue, Jan 15, 2019 at 1:30 PM Pola Yao <[hidden email]> wrote:
>> >> >
>> >> > I submitted a Spark job through ./spark-submit command, the code was executed successfully, however, the application got stuck when trying to quit spark.
>> >> >
>> >> > My code snippet:
>> >> > '''
>> >> > {
>> >> >
>> >> > val spark = SparkSession.builder.master(...).getOrCreate
>> >> >
>> >> > val pool = Executors.newFixedThreadPool(3)
>> >> > implicit val xc = ExecutionContext.fromExecutorService(pool)
>> >> > val taskList = List(train1, train2, train3)  // where train* is a Future function which wrapped up some data reading and feature engineering and machine learning steps
>> >> > val results = Await.result(Future.sequence(taskList), 20 minutes)
>> >> >
>> >> > println("Shutting down pool and executor service")
>> >> > pool.shutdown()
>> >> > xc.shutdown()
>> >> >
>> >> > println("Exiting spark")
>> >> > spark.stop()
>> >> >
>> >> > }
>> >> > '''
>> >> >
>> >> > After I submitted the job, from terminal, I could see the code was executed and printing "Exiting spark", however, after printing that line, it never existed spark, just got stuck.
>> >> >
>> >> > Does any body know what the reason is? Or how to force quitting?
>> >> >
>> >> > Thanks!
>> >> >
>> >> >
>> >>
>> >>
>> >> --
>> >> Marcelo
>>
>>
>>
>> --
>> Marcelo



--
Marcelo
Reply | Threaded
Open this post in threaded view
|

Re: How to force-quit a Spark application?

Marcelo Vanzin-2
Hi,

On Tue, Jan 22, 2019 at 11:30 AM Pola Yao <[hidden email]> wrote:
> "Thread-1" #19 prio=5 os_prio=0 tid=0x00007f9b6828e800 nid=0x77cb waiting on condition [0x00007f9a123e3000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000005408a5420> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>         at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
>         at org.apache.spark.scheduler.AsyncEventQueue.stop(AsyncEventQueue.scala:131)

This looks a little weird. Are you sure this thread is not making any
progress (i.e. did you take multiple stack snapshots)? I wouldn't
expect that call to block.

At first I was suspicious of SPARK-24309 but that looks different from
what you're seeing.

--
Marcelo

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]