spark streaming and the spark shell

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

spark streaming and the spark shell

Diana Carroll
I'm working with spark streaming using spark-shell, and hoping folks could answer a few questions I have.

I'm doing WordCount on a socket stream:
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
var ssc = new StreamingContext(sc,Seconds(5))
var mystream = ssc.socketTextStream("localhost",4444)
var words = mystream.flatMap(line => line.split(" "))
var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
wordCounts.print()
ssc.start()


1.  I'm assuming that using spark shell is an edge case, and that spark streaming is really intended mostly for batch use.  True?

2.   I notice that once I start ssc.start(), my stream starts processing and continues indefinitely...even if I close the socket on the server end (I'm using unix command "nc" to mimic a server as explained in the streaming programming guide .)  Can I tell my stream to detect if it's lost a connection and therefore stop executing?  (Or even better, to attempt to re-establish the connection?)

3.  I tried entering ssc.stop which resulted in an error:
Exception in thread "Thread-43" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
14/03/27 07:36:13 ERROR ConnectionManager: Corresponding SendingConnectionManagerId not found
But it did stop the DStream execution.

4.  Then I tried restarting the ssc again (ssc.start) and got another error:
org.apache.spark.SparkException: JobScheduler already started
Is restarting an ssc supported?

5.  When I perform an operation like wordCounts.print(), that operation will execution on each batch, ever n seconds.  Is there a way I can undo that operation?  That is, I want it to *stop* executing that print ever n seconds...without having to stop the stream.

What I'm really asking is...can I explore DStreams interactively the way I can explore my data in regular Spark.  In regular Spark, I might perform various operations on an RDD to see what happens.  So at first, I might have used "split(" ") to tokenize my input text, but now I want to try using split(",") instead, after the stream has already started running.  Can I do that?

I did find out that if add a new operation to an existing dstream (say, words.print()) after the ssc.start it works. It *will* add the second print() call to the execution list every n seconds.

but if I try to add new dstreams, e.g.
... 
ssc.start()

var testpairs = words.map(x => (x, "TEST"))
testpairs.print()

I get an error:
14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time 1395932270000 ms
java.lang.Exception: org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been initialized

Is this sort of interactive use just not supported?

Thanks!

Diana
Reply | Threaded
Open this post in threaded view
|

Re: spark streaming and the spark shell

Tathagata Das
Very good questions! Responses inline.

TD

On Thu, Mar 27, 2014 at 8:02 AM, Diana Carroll <[hidden email]> wrote:

> I'm working with spark streaming using spark-shell, and hoping folks could
> answer a few questions I have.
>
> I'm doing WordCount on a socket stream:
>
> import org.apache.spark.streaming.StreamingContext
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.streaming.Seconds
> var ssc = new StreamingContext(sc,Seconds(5))
> var mystream = ssc.socketTextStream("localhost",4444)
> var words = mystream.flatMap(line => line.split(" "))
> var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
> wordCounts.print()
> ssc.start()
>
>
>
> 1.  I'm assuming that using spark shell is an edge case, and that spark
> streaming is really intended mostly for batch use.  True?
>

Yes. Currently the spark-shell is not the intended execution mode for
Spark Streaming, even though it can be done for quick testing.

> 2.   I notice that once I start ssc.start(), my stream starts processing and
> continues indefinitely...even if I close the socket on the server end (I'm
> using unix command "nc" to mimic a server as explained in the streaming
> programming guide .)  Can I tell my stream to detect if it's lost a
> connection and therefore stop executing?  (Or even better, to attempt to
> re-establish the connection?)
>


Currently, not yet. But I am aware of this and this behavior will be
improved in the future.

> 3.  I tried entering ssc.stop which resulted in an error:
>
> Exception in thread "Thread-43" org.apache.spark.SparkException: Job
> cancelled because SparkContext was shut down
> 14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
> SendingConnectionManagerId not found
>
> But it did stop the DStream execution.
>


Ah, that happens sometimes. The existing behavior of ssc.stop() is
that it will stop everything immediately.
I just opened a pull request for a more graceful shutting down of the
Spark streaming program.
https://github.com/apache/spark/pull/247

> 4.  Then I tried restarting the ssc again (ssc.start) and got another error:
> org.apache.spark.SparkException: JobScheduler already started
> Is restarting an ssc supported?
>


Restarting is ideally not supported. However, the behavior was not
explicitly checked. The above pull requests
makes the behavior more explicitly by throwing the right warnings and
exceptions.

> 5.  When I perform an operation like wordCounts.print(), that operation will
> execution on each batch, ever n seconds.  Is there a way I can undo that
> operation?  That is, I want it to *stop* executing that print ever n
> seconds...without having to stop the stream.
>
> What I'm really asking is...can I explore DStreams interactively the way I
> can explore my data in regular Spark.  In regular Spark, I might perform
> various operations on an RDD to see what happens.  So at first, I might have
> used "split(" ") to tokenize my input text, but now I want to try using
> split(",") instead, after the stream has already started running.  Can I do
> that?
>
> I did find out that if add a new operation to an existing dstream (say,
> words.print()) after the ssc.start it works. It *will* add the second
> print() call to the execution list every n seconds.
>
> but if I try to add new dstreams, e.g.
> ...
>
> ssc.start()
>
> var testpairs = words.map(x => (x, "TEST"))
> testpairs.print()
>
>
> I get an error:
>
> 14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
> 1395932270000 ms
> java.lang.Exception:
> org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
> initialized
>
>
> Is this sort of interactive use just not supported?


Modifying the DStream operations after the context has started is not
officially supported. However dynamically changing the computation can
be done using DStream.transform() or DStream.foreachRDD()
Both these operations allow you to do arbitrary RDD operations on each
RDD. So you can dynamically modify what RDD operations are used within
the DStream transform / foreachRDD (so you are not changing the
DStream operations, only whats inside the DStream operation). But to
use this really interactively, you have to write a bit of additional
code that allows the user to interactively specify the function
applied on each RDD.



>
> Thanks!
>
> Diana
Reply | Threaded
Open this post in threaded view
|

Re: spark streaming and the spark shell

Evgeniy Shishkin

2.   I notice that once I start ssc.start(), my stream starts processing and
continues indefinitely...even if I close the socket on the server end (I'm
using unix command "nc" to mimic a server as explained in the streaming
programming guide .)  Can I tell my stream to detect if it's lost a
connection and therefore stop executing?  (Or even better, to attempt to
re-establish the connection?)



Currently, not yet. But I am aware of this and this behavior will be
improved in the future.

Now i understand why out spark streaming job starts to generate zero sized rdds from kafkainput, 
when one worker get OOM or crashes.

And we can’t detect it! Great. So spark streaming just doesn’t suite yet for 24/7 operation =\
Reply | Threaded
Open this post in threaded view
|

Re: spark streaming and the spark shell

Tathagata Das
Seems like the configuration of the Spark worker is not right. Either the worker has not been given enough memory or the allocation of the memory to the RDD storage needs to be fixed. If configured correctly, the Spark workers should not get OOMs.



On Thu, Mar 27, 2014 at 2:52 PM, Evgeny Shishkin <[hidden email]> wrote:

2.   I notice that once I start ssc.start(), my stream starts processing and
continues indefinitely...even if I close the socket on the server end (I'm
using unix command "nc" to mimic a server as explained in the streaming
programming guide .)  Can I tell my stream to detect if it's lost a
connection and therefore stop executing?  (Or even better, to attempt to
re-establish the connection?)



Currently, not yet. But I am aware of this and this behavior will be
improved in the future.

Now i understand why out spark streaming job starts to generate zero sized rdds from kafkainput, 
when one worker get OOM or crashes.

And we can’t detect it! Great. So spark streaming just doesn’t suite yet for 24/7 operation =\

Reply | Threaded
Open this post in threaded view
|

Re: spark streaming and the spark shell

Evgeniy Shishkin

On 28 Mar 2014, at 01:13, Tathagata Das <[hidden email]> wrote:

Seems like the configuration of the Spark worker is not right. Either the worker has not been given enough memory or the allocation of the memory to the RDD storage needs to be fixed. If configured correctly, the Spark workers should not get OOMs.


Yes, it is easy to start with latest offsets, get steady configuration and everything is nice.

Then your machine failes. And you stop receiving from kafka anything.

Then you notice this and restart your app hoping it would continue from offsets on zookeeper.
BUT NO
YOUR DEFAULT STREAM CONSUMERS JUST ERASED OFFSETS FROM ZOOKEEPER

After we fixed erasing offsets, we start from Some Offsets in the past.
And during batch duration we can’t limit how many messages we get from Kafka.
AND HERE WE OOM

And it's just a pain. Complete pain.

And you remember, only some machines consumes. Usually two or three. Because of broken high-level consumer in kafka.
Reply | Threaded
Open this post in threaded view
|

Re: spark streaming and the spark shell

Tathagata Das
I see! As I said in the other thread, no one reported these issues until now! A good and not-too-hard fix is to add the functionality of the limiting the data rate that the receivers receives at. I have opened a JIRA. 

TD


On Thu, Mar 27, 2014 at 3:28 PM, Evgeny Shishkin <[hidden email]> wrote:

On 28 Mar 2014, at 01:13, Tathagata Das <[hidden email]> wrote:

Seems like the configuration of the Spark worker is not right. Either the worker has not been given enough memory or the allocation of the memory to the RDD storage needs to be fixed. If configured correctly, the Spark workers should not get OOMs.


Yes, it is easy to start with latest offsets, get steady configuration and everything is nice.

Then your machine failes. And you stop receiving from kafka anything.

Then you notice this and restart your app hoping it would continue from offsets on zookeeper.
BUT NO
YOUR DEFAULT STREAM CONSUMERS JUST ERASED OFFSETS FROM ZOOKEEPER

After we fixed erasing offsets, we start from Some Offsets in the past.
And during batch duration we can’t limit how many messages we get from Kafka.
AND HERE WE OOM

And it's just a pain. Complete pain.

And you remember, only some machines consumes. Usually two or three. Because of broken high-level consumer in kafka.

Reply | Threaded
Open this post in threaded view
|

Re: spark streaming and the spark shell

Evgeniy Shishkin

On 28 Mar 2014, at 01:37, Tathagata Das <[hidden email]> wrote:

I see! As I said in the other thread, no one reported these issues until now! A good and not-too-hard fix is to add the functionality of the limiting the data rate that the receivers receives at. I have opened a JIRA. 


Yes, actually you should have another Jira on this

This just erases offsets from zookeeper. But auto.offsets.reset have another meaning.

What to do when there is no initial offset in Zookeeper or if an offset is out of range:
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer. If this is set to largest, the consumer may lose some messages when the number of partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest

i will stress it — WHEN THERE IS NO INITIAL OFFSET OF IT IS OUT OF RANGE
not “hey! i’ll just reset your position because you restarted app"

TD


On Thu, Mar 27, 2014 at 3:28 PM, Evgeny Shishkin <[hidden email]> wrote:

On 28 Mar 2014, at 01:13, Tathagata Das <[hidden email]> wrote:

Seems like the configuration of the Spark worker is not right. Either the worker has not been given enough memory or the allocation of the memory to the RDD storage needs to be fixed. If configured correctly, the Spark workers should not get OOMs.


Yes, it is easy to start with latest offsets, get steady configuration and everything is nice.

Then your machine failes. And you stop receiving from kafka anything.

Then you notice this and restart your app hoping it would continue from offsets on zookeeper.
BUT NO
YOUR DEFAULT STREAM CONSUMERS JUST ERASED OFFSETS FROM ZOOKEEPER

After we fixed erasing offsets, we start from Some Offsets in the past.
And during batch duration we can’t limit how many messages we get from Kafka.
AND HERE WE OOM

And it's just a pain. Complete pain.

And you remember, only some machines consumes. Usually two or three. Because of broken high-level consumer in kafka.


Reply | Threaded
Open this post in threaded view
|

Re: spark streaming and the spark shell

Diana Carroll
In reply to this post by Tathagata Das
Thanks, Tagatha.  This and your other reply on awaitTermination are very helpful.

Diana


On Thu, Mar 27, 2014 at 4:40 PM, Tathagata Das <[hidden email]> wrote:
Very good questions! Responses inline.

TD

On Thu, Mar 27, 2014 at 8:02 AM, Diana Carroll <[hidden email]> wrote:
> I'm working with spark streaming using spark-shell, and hoping folks could
> answer a few questions I have.
>
> I'm doing WordCount on a socket stream:
>
> import org.apache.spark.streaming.StreamingContext
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.streaming.Seconds
> var ssc = new StreamingContext(sc,Seconds(5))
> var mystream = ssc.socketTextStream("localhost",4444)
> var words = mystream.flatMap(line => line.split(" "))
> var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
> wordCounts.print()
> ssc.start()
>
>
>
> 1.  I'm assuming that using spark shell is an edge case, and that spark
> streaming is really intended mostly for batch use.  True?
>

Yes. Currently the spark-shell is not the intended execution mode for
Spark Streaming, even though it can be done for quick testing.

> 2.   I notice that once I start ssc.start(), my stream starts processing and
> continues indefinitely...even if I close the socket on the server end (I'm
> using unix command "nc" to mimic a server as explained in the streaming
> programming guide .)  Can I tell my stream to detect if it's lost a
> connection and therefore stop executing?  (Or even better, to attempt to
> re-establish the connection?)
>


Currently, not yet. But I am aware of this and this behavior will be
improved in the future.

> 3.  I tried entering ssc.stop which resulted in an error:
>
> Exception in thread "Thread-43" org.apache.spark.SparkException: Job
> cancelled because SparkContext was shut down
> 14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
> SendingConnectionManagerId not found
>
> But it did stop the DStream execution.
>


Ah, that happens sometimes. The existing behavior of ssc.stop() is
that it will stop everything immediately.
I just opened a pull request for a more graceful shutting down of the
Spark streaming program.
https://github.com/apache/spark/pull/247

> 4.  Then I tried restarting the ssc again (ssc.start) and got another error:
> org.apache.spark.SparkException: JobScheduler already started
> Is restarting an ssc supported?
>


Restarting is ideally not supported. However, the behavior was not
explicitly checked. The above pull requests
makes the behavior more explicitly by throwing the right warnings and
exceptions.

> 5.  When I perform an operation like wordCounts.print(), that operation will
> execution on each batch, ever n seconds.  Is there a way I can undo that
> operation?  That is, I want it to *stop* executing that print ever n
> seconds...without having to stop the stream.
>
> What I'm really asking is...can I explore DStreams interactively the way I
> can explore my data in regular Spark.  In regular Spark, I might perform
> various operations on an RDD to see what happens.  So at first, I might have
> used "split(" ") to tokenize my input text, but now I want to try using
> split(",") instead, after the stream has already started running.  Can I do
> that?
>
> I did find out that if add a new operation to an existing dstream (say,
> words.print()) after the ssc.start it works. It *will* add the second
> print() call to the execution list every n seconds.
>
> but if I try to add new dstreams, e.g.
> ...
>
> ssc.start()
>
> var testpairs = words.map(x => (x, "TEST"))
> testpairs.print()
>
>
> I get an error:
>
> 14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
> 1395932270000 ms
> java.lang.Exception:
> org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
> initialized
>
>
> Is this sort of interactive use just not supported?


Modifying the DStream operations after the context has started is not
officially supported. However dynamically changing the computation can
be done using DStream.transform() or DStream.foreachRDD()
Both these operations allow you to do arbitrary RDD operations on each
RDD. So you can dynamically modify what RDD operations are used within
the DStream transform / foreachRDD (so you are not changing the
DStream operations, only whats inside the DStream operation). But to
use this really interactively, you have to write a bit of additional
code that allows the user to interactively specify the function
applied on each RDD.



>
> Thanks!
>
> Diana

Reply | Threaded
Open this post in threaded view
|

Re: spark streaming and the spark shell

Tian Zhang
In reply to this post by Evgeniy Shishkin
I am hitting the same issue, i.e., after running for some time, if spark streaming job lost or timeout
kafka connection, it will just start to return empty RDD's ..
Is there a timeline for when this issue will be fixed so that I can plan accordingly?

Thanks.

Tian