Stopping a Spark Streaming Context gracefully

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Stopping a Spark Streaming Context gracefully

Bryan Jeffrey
Hello.

I am running Spark 2.1, Scala 2.11.  We're running several Spark streaming jobs.  In some cases we restart these jobs on an occasional basis.  We have code that looks like the following:

logger.info("Starting the streaming context!")
ssc.start()
logger.info("Waiting for termination!")
Option(config.getInt(Parameters.RestartMinutes)).getOrElse(0) match {
case restartMinutes: Int if restartMinutes > 0 =>
logger.info(s"Waiting for ${restartMinutes} before terminating job")
ssc.awaitTerminationOrTimeout(restartMinutes * DateUtils.millisecondsPerMinute)
case _ => ssc.awaitTermination()
}
logger.info("Calling 'stop'")
ssc.stop(stopSparkContext = true, stopGracefully = true)

In several cases we've observed jobs where we've called 'stop' not stopping.  I went and wrote a simple job that reads from Kafka and does nothing (prints a count of data).  After several minutes it simply calls 'ssc.stop(true, true)'.  In some cases this will stop the context.  In others it will not stop the context.  If we call 'stop' several times over an interval one of them eventually succeeds.

It looks like this is a bug.  I looked in Jira and did not see an open issue.  Is this a  known problem?  If not I'll open a bug.

Regards,

Bryan Jeffrey


Reply | Threaded
Open this post in threaded view
|

Re: Stopping a Spark Streaming Context gracefully

Dhaval Modi
+1

Regards,
Dhaval Modi
[hidden email]

On 8 November 2017 at 00:06, Bryan Jeffrey <[hidden email]> wrote:
Hello.

I am running Spark 2.1, Scala 2.11.  We're running several Spark streaming jobs.  In some cases we restart these jobs on an occasional basis.  We have code that looks like the following:

logger.info("Starting the streaming context!")
ssc.start()
logger.info("Waiting for termination!")
Option(config.getInt(Parameters.RestartMinutes)).getOrElse(0) match {
case restartMinutes: Int if restartMinutes > 0 =>
logger.info(s"Waiting for ${restartMinutes} before terminating job")
ssc.awaitTerminationOrTimeout(restartMinutes * DateUtils.millisecondsPerMinute)
case _ => ssc.awaitTermination()
}
logger.info("Calling 'stop'")
ssc.stop(stopSparkContext = true, stopGracefully = true)

In several cases we've observed jobs where we've called 'stop' not stopping.  I went and wrote a simple job that reads from Kafka and does nothing (prints a count of data).  After several minutes it simply calls 'ssc.stop(true, true)'.  In some cases this will stop the context.  In others it will not stop the context.  If we call 'stop' several times over an interval one of them eventually succeeds.

It looks like this is a bug.  I looked in Jira and did not see an open issue.  Is this a  known problem?  If not I'll open a bug.

Regards,

Bryan Jeffrey