error in streaming word count API?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

error in streaming word count API?

Aaron Kimball
Hi folks,

I was trying to work through the streaming word count example at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html and couldn't get the code as-written to run. In fairness, I was trying to do this inside the REPL rather than compiling a separate project; would the types be different?

In any case, here's the code I ran:

$ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell

scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(2))
scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
scala> val words = lines.flatMap(_.split(" "))

// *** The following code from the html page doesn't work
// because pairs has type DStream[(String, Int)] and
// there is no reduceByKey method on this type.

// Count each word in each batch
scala> val pairs = words.map(word => (word, 1))
scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no reduceByKey()

// Print a few of the counts to the console
scala> wordCount.print()   // ... and even if the above did work, 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile as written.


Instead, I got the following to run instead:
scala> val wordCounts = words.countByValue()
scala> wordCounts.print()
scala> ssc.start()             // Start the computation
scala> ssc.awaitTermination()

This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it.. but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into the netcat stream. It seems to print the output for all 2-second windows all-at-once after the ^D in the network stream. Is this an expected effect? I don't understand the semantics of ssc.start / awaitTermination well enough to know how it interacts with the print statement on wordCounts (which I think is a DStream of RRDs?)  

I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are.. seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked a number.

I kind of expected wordCounts.print() to be constantly emitting (word, N) pairs to my spark terminal as I typed into the netcat side of things.

I'm using Spark built from github source that I pulled from source earlier today.

I am using the following as my 'origin':

... and the most recent commit (master a.k.a. HEAD) is:
commit 4d880304867b55a4f2138617b30600b7fa013b14
Author: Bryn Keller <[hidden email]>
Date:   Mon Feb 24 17:35:22 2014 -0800


In any case, I'm happy to help update the docs (or the code) if this is a bug. I realize this is getting long-winded. But in any case, I think my questions really boil down to:

1) should there be a reduceByKey() method on DStream? The documentation at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html says so in the "Transformations" section, but the scaladoc at https://spark.incubator.apache.org/docs/latest/api/streaming/index.html#org.apache.spark.streaming.dstream.DStream doesn't list it.  DStream.scala also doesn't have a definition for such a method...

(and based on reading the source of NetworkWordCount.scala, I can't spot-identify why this *does* work there (i.e., reduceByKey compiles) but it doesn't do so in the terminal)

2) Why do I have to wait for the stream to "terminate" with a ^D before seeing any stdout in the repl from the wordCounts.print() statement?  Doesn't this defeat the point of "streaming"?
2a) how does the print() statement interact with ssc.start() and ssc.awaitTermination() ?

3) is the cleaner TTL something that, as a user, I should be adjusting to change my observed effects? i.e., would adjusting this change the frequency of emissions to stdout of prior window data?  Or is this just a background property that happens to affect the spamminess of my stderr that is routed to the same console?

4) Should I update the documentation to match my example (i.e., no reduceByKey, but use words.countByValue() instead)?

5) Now that Spark is a TLP, are my references to the incubator-spark.git and the http://spark.incubator.apache.org docs woefully out of date, making this entire exercise a goof? :)

Thanks for the help!

Cheers,
- Aaron

Reply | Threaded
Open this post in threaded view
|

Re: error in streaming word count API?

Aaron Kimball
As a post-script, when running the example in precompiled form:


/bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999

... I don't need to send a ^D to the netcat stream. It does print the batches to stdout in the manner I'd expect. So is this more repl weirdness than spark weirdness?

- Aaron


On Fri, Feb 28, 2014 at 8:46 PM, Aaron Kimball <[hidden email]> wrote:
Hi folks,

I was trying to work through the streaming word count example at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html and couldn't get the code as-written to run. In fairness, I was trying to do this inside the REPL rather than compiling a separate project; would the types be different?

In any case, here's the code I ran:

$ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell

scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(2))
scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
scala> val words = lines.flatMap(_.split(" "))

// *** The following code from the html page doesn't work
// because pairs has type DStream[(String, Int)] and
// there is no reduceByKey method on this type.

// Count each word in each batch
scala> val pairs = words.map(word => (word, 1))
scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no reduceByKey()

// Print a few of the counts to the console
scala> wordCount.print()   // ... and even if the above did work, 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile as written.


Instead, I got the following to run instead:
scala> val wordCounts = words.countByValue()
scala> wordCounts.print()
scala> ssc.start()             // Start the computation
scala> ssc.awaitTermination()

This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it.. but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into the netcat stream. It seems to print the output for all 2-second windows all-at-once after the ^D in the network stream. Is this an expected effect? I don't understand the semantics of ssc.start / awaitTermination well enough to know how it interacts with the print statement on wordCounts (which I think is a DStream of RRDs?)  

I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are.. seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked a number.

I kind of expected wordCounts.print() to be constantly emitting (word, N) pairs to my spark terminal as I typed into the netcat side of things.

I'm using Spark built from github source that I pulled from source earlier today.

I am using the following as my 'origin':

... and the most recent commit (master a.k.a. HEAD) is:
commit 4d880304867b55a4f2138617b30600b7fa013b14
Author: Bryn Keller <[hidden email]>
Date:   Mon Feb 24 17:35:22 2014 -0800


In any case, I'm happy to help update the docs (or the code) if this is a bug. I realize this is getting long-winded. But in any case, I think my questions really boil down to:

1) should there be a reduceByKey() method on DStream? The documentation at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html says so in the "Transformations" section, but the scaladoc at https://spark.incubator.apache.org/docs/latest/api/streaming/index.html#org.apache.spark.streaming.dstream.DStream doesn't list it.  DStream.scala also doesn't have a definition for such a method...

(and based on reading the source of NetworkWordCount.scala, I can't spot-identify why this *does* work there (i.e., reduceByKey compiles) but it doesn't do so in the terminal)

2) Why do I have to wait for the stream to "terminate" with a ^D before seeing any stdout in the repl from the wordCounts.print() statement?  Doesn't this defeat the point of "streaming"?
2a) how does the print() statement interact with ssc.start() and ssc.awaitTermination() ?

3) is the cleaner TTL something that, as a user, I should be adjusting to change my observed effects? i.e., would adjusting this change the frequency of emissions to stdout of prior window data?  Or is this just a background property that happens to affect the spamminess of my stderr that is routed to the same console?

4) Should I update the documentation to match my example (i.e., no reduceByKey, but use words.countByValue() instead)?

5) Now that Spark is a TLP, are my references to the incubator-spark.git and the http://spark.incubator.apache.org docs woefully out of date, making this entire exercise a goof? :)

Thanks for the help!

Cheers,
- Aaron


Reply | Threaded
Open this post in threaded view
|

Re: error in streaming word count API?

Matei Zaharia
Administrator
In reply to this post by Aaron Kimball
Hi Aaron,

On Feb 28, 2014, at 8:46 PM, Aaron Kimball <[hidden email]> wrote:

> Hi folks,
>
> I was trying to work through the streaming word count example at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html and couldn't get the code as-written to run. In fairness, I was trying to do this inside the REPL rather than compiling a separate project; would the types be different?
>
> In any case, here's the code I ran:
>
> $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
>
> scala> import org.apache.spark.streaming._
> scala> val ssc = new StreamingContext(sc, Seconds(2))
> scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
> scala> val words = lines.flatMap(_.split(" "))
>
> // *** The following code from the html page doesn't work
> // because pairs has type DStream[(String, Int)] and
> // there is no reduceByKey method on this type.

This seems to be an oversight in the docs. You need to import org.apache.spark.streaming.StreamingContext._ in order to get the pair functions on DStreams of pairs (through a Scala implicit conversion). reduceByKey is actually a function on something called PairDStreamFunctions, and the implicit conversion above provides it for you only if your DStream has key-value pairs. See http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.html for how this works.


> // Count each word in each batch
> scala> val pairs = words.map(word => (word, 1))
> scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no reduceByKey()
>
> // Print a few of the counts to the console
> scala> wordCount.print()   // ... and even if the above did work, 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile as written.

Also looks like a bug in the docs.

>
> Instead, I got the following to run instead:
> scala> val wordCounts = words.countByValue()
> scala> wordCounts.print()
> scala> ssc.start()             // Start the computation
> scala> ssc.awaitTermination()
>
> This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it.. but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into the netcat stream. It seems to print the output for all 2-second windows all-at-once after the ^D in the network stream. Is this an expected effect? I don't understand the semantics of ssc.start / awaitTermination well enough to know how it interacts with the print statement on wordCounts (which I think is a DStream of RRDs?)  

It might also be that netcat didn’t flush the stream right away when you type input. Not 100% sure about that though. You could try to listen to it using netcat on a different port and see if it does.

>
> I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are.. seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked a number.

This shouldn’t matter for this problem.

> 5) Now that Spark is a TLP, are my references to the incubator-spark.git and the http://spark.incubator.apache.org docs woefully out of date, making this entire exercise a goof? :)

If you find these, definitely feel free to fix them, though I believe some recent pull requests fixed a few of them.

Anyway, thanks for reporting this stuff!

Matei

Reply | Threaded
Open this post in threaded view
|

Re: error in streaming word count API?

Aaron Kimball
Running `nc -lk 1234`  in one terminal, and running `nc localhost 1234` in another, it demonstrates line-buffered behavior. It's a mystery! 

Thanks for the link on implicit conversions. The example makes sense.  Makes the code easier to trace too. I'll send a JIRA + pull req to touch up the docs.

cheers,
- Aaron


On Sun, Mar 2, 2014 at 4:59 PM, Matei Zaharia <[hidden email]> wrote:
Hi Aaron,

On Feb 28, 2014, at 8:46 PM, Aaron Kimball <[hidden email]> wrote:

> Hi folks,
>
> I was trying to work through the streaming word count example at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html and couldn't get the code as-written to run. In fairness, I was trying to do this inside the REPL rather than compiling a separate project; would the types be different?
>
> In any case, here's the code I ran:
>
> $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
>
> scala> import org.apache.spark.streaming._
> scala> val ssc = new StreamingContext(sc, Seconds(2))
> scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
> scala> val words = lines.flatMap(_.split(" "))
>
> // *** The following code from the html page doesn't work
> // because pairs has type DStream[(String, Int)] and
> // there is no reduceByKey method on this type.

This seems to be an oversight in the docs. You need to import org.apache.spark.streaming.StreamingContext._ in order to get the pair functions on DStreams of pairs (through a Scala implicit conversion). reduceByKey is actually a function on something called PairDStreamFunctions, and the implicit conversion above provides it for you only if your DStream has key-value pairs. See http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.html for how this works.


> // Count each word in each batch
> scala> val pairs = words.map(word => (word, 1))
> scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no reduceByKey()
>
> // Print a few of the counts to the console
> scala> wordCount.print()   // ... and even if the above did work, 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile as written.

Also looks like a bug in the docs.

>
> Instead, I got the following to run instead:
> scala> val wordCounts = words.countByValue()
> scala> wordCounts.print()
> scala> ssc.start()             // Start the computation
> scala> ssc.awaitTermination()
>
> This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it.. but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into the netcat stream. It seems to print the output for all 2-second windows all-at-once after the ^D in the network stream. Is this an expected effect? I don't understand the semantics of ssc.start / awaitTermination well enough to know how it interacts with the print statement on wordCounts (which I think is a DStream of RRDs?)

It might also be that netcat didn’t flush the stream right away when you type input. Not 100% sure about that though. You could try to listen to it using netcat on a different port and see if it does.

>
> I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are.. seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked a number.

This shouldn’t matter for this problem.

> 5) Now that Spark is a TLP, are my references to the incubator-spark.git and the http://spark.incubator.apache.org docs woefully out of date, making this entire exercise a goof? :)

If you find these, definitely feel free to fix them, though I believe some recent pull requests fixed a few of them.

Anyway, thanks for reporting this stuff!

Matei


Reply | Threaded
Open this post in threaded view
|

Re: error in streaming word count API?

Aaron Kimball
Filed SPARK-1173 and sent a pull request. As an aside, I think this should probably have been in the STREAMING project on the JIRA, but JIRA seemed adamant that it only allow me to create new issues in the SPARK project. Not sure if that's a JIRA permissions thing, or me losing a fight with Atlassian UX ;) Please let me know if I should do something different next time.

Cheers,
- Aaron


On Sun, Mar 2, 2014 at 10:28 PM, Aaron Kimball <[hidden email]> wrote:
Running `nc -lk 1234`  in one terminal, and running `nc localhost 1234` in another, it demonstrates line-buffered behavior. It's a mystery! 

Thanks for the link on implicit conversions. The example makes sense.  Makes the code easier to trace too. I'll send a JIRA + pull req to touch up the docs.

cheers,
- Aaron


On Sun, Mar 2, 2014 at 4:59 PM, Matei Zaharia <[hidden email]> wrote:
Hi Aaron,

On Feb 28, 2014, at 8:46 PM, Aaron Kimball <[hidden email]> wrote:

> Hi folks,
>
> I was trying to work through the streaming word count example at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html and couldn't get the code as-written to run. In fairness, I was trying to do this inside the REPL rather than compiling a separate project; would the types be different?
>
> In any case, here's the code I ran:
>
> $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
>
> scala> import org.apache.spark.streaming._
> scala> val ssc = new StreamingContext(sc, Seconds(2))
> scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
> scala> val words = lines.flatMap(_.split(" "))
>
> // *** The following code from the html page doesn't work
> // because pairs has type DStream[(String, Int)] and
> // there is no reduceByKey method on this type.

This seems to be an oversight in the docs. You need to import org.apache.spark.streaming.StreamingContext._ in order to get the pair functions on DStreams of pairs (through a Scala implicit conversion). reduceByKey is actually a function on something called PairDStreamFunctions, and the implicit conversion above provides it for you only if your DStream has key-value pairs. See http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.html for how this works.


> // Count each word in each batch
> scala> val pairs = words.map(word => (word, 1))
> scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no reduceByKey()
>
> // Print a few of the counts to the console
> scala> wordCount.print()   // ... and even if the above did work, 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile as written.

Also looks like a bug in the docs.

>
> Instead, I got the following to run instead:
> scala> val wordCounts = words.countByValue()
> scala> wordCounts.print()
> scala> ssc.start()             // Start the computation
> scala> ssc.awaitTermination()
>
> This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it.. but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into the netcat stream. It seems to print the output for all 2-second windows all-at-once after the ^D in the network stream. Is this an expected effect? I don't understand the semantics of ssc.start / awaitTermination well enough to know how it interacts with the print statement on wordCounts (which I think is a DStream of RRDs?)

It might also be that netcat didn’t flush the stream right away when you type input. Not 100% sure about that though. You could try to listen to it using netcat on a different port and see if it does.

>
> I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are.. seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked a number.

This shouldn’t matter for this problem.

> 5) Now that Spark is a TLP, are my references to the incubator-spark.git and the http://spark.incubator.apache.org docs woefully out of date, making this entire exercise a goof? :)

If you find these, definitely feel free to fix them, though I believe some recent pull requests fixed a few of them.

Anyway, thanks for reporting this stuff!

Matei