Streaming job, catch exceptions

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Streaming job, catch exceptions

bsikander
Hello,
I am using Spark 2.2.1 with standalone resource manager.

I have a streaming job where from time to time jobs are aborted due to the following exception. The reasons are different e.g. FileNotFound/NullPointerException etc
org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0.....
These exceptions are printed in driver logs.

I have a try/catch around my streaming job. Strangely sometimes, the above exceptions are printed in the logs but my try/catch block never catches them but sometimes, it does catch them. In both cases, the job continues to process data.

I am trying to understand this behavior that in which case I will be able to catch the exception.

I have tried to reproduce this using something like rdd.map(x=> 1/0).print() but failed. I can see the exception in driver logs but my catch block never catches it.

Regards,
Behroz
Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

bsikander
Hi,
Anyone? This should be a straight forward one :)



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

jasonnerothin@gmail.com
Code would be very helpful, but it seems like you are:

1. Writing in Java
2. Wrapping the entire app in a try/catch
3. Executing in local mode

The code that is throwing the exceptions is not executed locally in the driver process. Spark is executing the failing code on the cluster.

On Sun, May 12, 2019 at 3:37 PM bsikander <[hidden email]> wrote:
Hi,
Anyone? This should be a straight forward one :)



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Thanks,
Jason
Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

bsikander
>> Code would be very helpful,
I will try to put together something to post here.

>> 1. Writing in Java
I am using Scala


>> Wrapping the entire app in a try/catch
Once the SparkContext object is created, a Future is started where actions
and transformations are defined and streaming context is started.
I am using spark-jobserver and here is how the job is started (job.runJob()
defines all the actions/transformations and starts the streaming context).
See this
<https://github.com/spark-jobserver/spark-jobserver/blob/master/job-server/src/main/scala/spark/jobserver/JobManagerActor.scala#L570>
. As mentioned in my original message, I sometimes am able to catch
exception  in this block
<https://github.com/spark-jobserver/spark-jobserver/blob/master/job-server/src/main/scala/spark/jobserver/JobManagerActor.scala#L606>
.

>> 3. Executing in local mode
I am running in cluster mode.


>> The code that is throwing the exceptions is not executed locally in the
>> driver process. Spark is executing the failing code on the cluster.
Yea the code is executing in executors but once it fails 4 times, the
exceptions seems to be getting thrown on driver side.





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

bsikander
Any help would be much appreciated.

The error and question is quite generic, i believe that most experienced
users will be able to answer.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

bsikander
I was able to reproduce the problem.

In the below repository, I have 2 sample jobs. Both are execution 1/0
(Arithmetic Exception) on the executor sides and but in case of
NetworkWordCount job, awaitTerminate throws the same exceptions (Job aborted
due to stage failure .....) that I can see in the driver/executor logs and
terminates the spark job (which is expected) but in the other job
(QueueStream), I see the exceptions in driver/executor logs but no exception
is throw by awaitTerminate method and job continues.

https://github.com/bsikander/spark-reproduce/


I am trying to understand this behavior why, it happens. Any help would be
appreciated.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

bsikander
Just to add to my previous message.
I am using Spark 2.2.2 standalone cluster manager and deploying the jobs in
cluster mode.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

bsikander
Ok, I found the reason.

In my QueueStream example, I have a while(true) which keeps on adding the
RDDs, my awaitTermination call if after the while loop. Since, the while
loop never exits, awaitTermination never gets fired and never get reported
the exceptions.


The above was just the problem with the code that I tried to show my problem
with.

My real problem was due to the shutdown behavior of Spark. Spark streaming
does the following

- context.start() triggers the pipeline, context.awaitTerminate() block the
current thread, whenever an exception is reported, awaitTerminated throws an
exception. Since generally, we never have any code after awaitTerminate, the
shutdown hooks get called which stops the spark context.

- I am using spark-jobserver, when an exception is reported from
awaitTerminate, jobserver catches the exception and updates the status of
job in database but the driver process keeps on running because the main
thread in driver is waiting for an Akka actor to shutdown which belongs to
jobserver. Since, it never shutsdown, the driver keeps on running and no one
executes a context.stop(). Since context.stop() is not executed, the
jobschedular and generator keeps on running and job also keeps on going.

This implicit behavior of Spark where it relies on shutdown hooks to close
the context is a bit strange. I believe that as soon as an exception is
reported, the spark should just execute context.stop(). This behavior can
have serious consequence e.g. data loss. Will fix it though.

What is your opinion on stopping the context as soon as an exception is
raised?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

jasonnerothin@gmail.com
The behavior is a deliberate design decision by the Spark team. 

If Spark were to "fail fast", it would prevent the system from recovering from many classes of errors that are in principle recoverable (for example if two otherwise unrelated jobs cause a garbage collection spike on the same node). Checkpointing and similar features have been added to support high availability and platform resilience.

As regards the more general topic of exception handling and recovery, I side with Bruce Eckel and (for Java) Josh Bloch (see Effective Java, Exception Handling). The Scala+functional community is similarly opinionated against using exceptions for explicit control flow. (scala.util.Try is useful for supporting libraries that don't share this opinion.)

Higher-level design thoughts: 

I recommend reading Chapter 15 of Chambers & Zaharia's Spark The Definitive Guide (at least). The Spark engine makes some assumptions about execution boundaries being managed by Spark (that the Spark Jobs get broken into Tasks on the Executor and are managed by the resource manager). If multiple Threads are executing within a given Task, I would expect things like data exchange/shuffle to get unpredictable.

Said a different way: Spark is a micro-batch architecture, even when using the streaming apis. The Spark Application is assumed to be relatively light-weight (the goal is to parallelize execution across big data, after all). 

You might also look at the way the Apache Livy team is implementing their solution.

HTH
Jason


On Tue, May 21, 2019 at 6:04 AM bsikander <[hidden email]> wrote:
Ok, I found the reason.

In my QueueStream example, I have a while(true) which keeps on adding the
RDDs, my awaitTermination call if after the while loop. Since, the while
loop never exits, awaitTermination never gets fired and never get reported
the exceptions.


The above was just the problem with the code that I tried to show my problem
with.

My real problem was due to the shutdown behavior of Spark. Spark streaming
does the following

- context.start() triggers the pipeline, context.awaitTerminate() block the
current thread, whenever an exception is reported, awaitTerminated throws an
exception. Since generally, we never have any code after awaitTerminate, the
shutdown hooks get called which stops the spark context.

- I am using spark-jobserver, when an exception is reported from
awaitTerminate, jobserver catches the exception and updates the status of
job in database but the driver process keeps on running because the main
thread in driver is waiting for an Akka actor to shutdown which belongs to
jobserver. Since, it never shutsdown, the driver keeps on running and no one
executes a context.stop(). Since context.stop() is not executed, the
jobschedular and generator keeps on running and job also keeps on going.

This implicit behavior of Spark where it relies on shutdown hooks to close
the context is a bit strange. I believe that as soon as an exception is
reported, the spark should just execute context.stop(). This behavior can
have serious consequence e.g. data loss. Will fix it though.

What is your opinion on stopping the context as soon as an exception is
raised?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Thanks,
Jason
Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

jasonnerothin@gmail.com
Correction: The Driver manages the Tasks, the resource manager serves up resources to the Driver or Task.

On Tue, May 21, 2019 at 9:11 AM Jason Nerothin <[hidden email]> wrote:
The behavior is a deliberate design decision by the Spark team. 

If Spark were to "fail fast", it would prevent the system from recovering from many classes of errors that are in principle recoverable (for example if two otherwise unrelated jobs cause a garbage collection spike on the same node). Checkpointing and similar features have been added to support high availability and platform resilience.

As regards the more general topic of exception handling and recovery, I side with Bruce Eckel and (for Java) Josh Bloch (see Effective Java, Exception Handling). The Scala+functional community is similarly opinionated against using exceptions for explicit control flow. (scala.util.Try is useful for supporting libraries that don't share this opinion.)

Higher-level design thoughts: 

I recommend reading Chapter 15 of Chambers & Zaharia's Spark The Definitive Guide (at least). The Spark engine makes some assumptions about execution boundaries being managed by Spark (that the Spark Jobs get broken into Tasks on the Executor and are managed by the resource manager). If multiple Threads are executing within a given Task, I would expect things like data exchange/shuffle to get unpredictable.

Said a different way: Spark is a micro-batch architecture, even when using the streaming apis. The Spark Application is assumed to be relatively light-weight (the goal is to parallelize execution across big data, after all). 

You might also look at the way the Apache Livy team is implementing their solution.

HTH
Jason


On Tue, May 21, 2019 at 6:04 AM bsikander <[hidden email]> wrote:
Ok, I found the reason.

In my QueueStream example, I have a while(true) which keeps on adding the
RDDs, my awaitTermination call if after the while loop. Since, the while
loop never exits, awaitTermination never gets fired and never get reported
the exceptions.


The above was just the problem with the code that I tried to show my problem
with.

My real problem was due to the shutdown behavior of Spark. Spark streaming
does the following

- context.start() triggers the pipeline, context.awaitTerminate() block the
current thread, whenever an exception is reported, awaitTerminated throws an
exception. Since generally, we never have any code after awaitTerminate, the
shutdown hooks get called which stops the spark context.

- I am using spark-jobserver, when an exception is reported from
awaitTerminate, jobserver catches the exception and updates the status of
job in database but the driver process keeps on running because the main
thread in driver is waiting for an Akka actor to shutdown which belongs to
jobserver. Since, it never shutsdown, the driver keeps on running and no one
executes a context.stop(). Since context.stop() is not executed, the
jobschedular and generator keeps on running and job also keeps on going.

This implicit behavior of Spark where it relies on shutdown hooks to close
the context is a bit strange. I believe that as soon as an exception is
reported, the spark should just execute context.stop(). This behavior can
have serious consequence e.g. data loss. Will fix it though.

What is your opinion on stopping the context as soon as an exception is
raised?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Thanks,
Jason


--
Thanks,
Jason
Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

bsikander
In reply to this post by jasonnerothin@gmail.com
umm, i am not sure if I got this fully.

It is a design decision to not have context.stop() right after
awaitTermination throws exception?

So, the ideology is that if after n tries (default 4) a task fails, the
spark should fail fast and let user know? Is this correct?


As you mentioned there are many error classes and as the chances of getting
an exception are quite high. If the above ideology is correct then it makes
it really hard to keep the job up and running all the time especially
streaming cases.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

jasonnerothin@gmail.com
Yes.

If the job fails repeatedly (4 times in this case), Spark assumes that there is a problem in the Job and notifies the user. In exchange for this, the engine can go on to serve other jobs with its available resources.

I would try the following until things improve:

1. Figure out what's wrong with the Job that's failing
2. Make exception handling more functional: http://oneeyedmen.com/failure-is-not-an-option-part-4.html (kotlin, but ideas still work)

Why do #2? Because it will force you to decide which exceptions: your code should handle, spark should handle, should cause job failure (current behavior).

By analogy to the halting problem, I believe that expecting a program to handle all possible exceptional states is unreasonable.

Jm2c
Jason


On Tue, May 21, 2019 at 9:30 AM bsikander <[hidden email]> wrote:
umm, i am not sure if I got this fully.

It is a design decision to not have context.stop() right after
awaitTermination throws exception?

So, the ideology is that if after n tries (default 4) a task fails, the
spark should fail fast and let user know? Is this correct?


As you mentioned there are many error classes and as the chances of getting
an exception are quite high. If the above ideology is correct then it makes
it really hard to keep the job up and running all the time especially
streaming cases.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Thanks,
Jason
Reply | Threaded
Open this post in threaded view
|

Re: Streaming job, catch exceptions

bsikander
Ok great. I understood the ideology, thanks.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]