Hi folks,

I was trying to work through the streaming word count example at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html and couldn't get the code as-written to run. In fairness, I was trying to do this inside the REPL rather than compiling a separate project; would the types be different?

In any case, here's the code I ran:

$ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell

scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(2))
scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
scala> val words = lines.flatMap(_.split(" "))

// *** The following code from the html page doesn't work
// because pairs has type DStream[(String, Int)] and
// there is no reduceByKey method on this type.

// Count each word in each batch
scala> val pairs = words.map(word => (word, 1))
scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no reduceByKey()

// Print a few of the counts to the console
scala> wordCount.print()   // ... and even if the above did work, 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile as written.


Instead, I got the following to run instead:
scala> val wordCounts = words.countByValue()
scala> wordCounts.print()
scala> ssc.start()             // Start the computation
scala> ssc.awaitTermination()

This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it.. but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into the netcat stream. It seems to print the output for all 2-second windows all-at-once after the ^D in the network stream. Is this an expected effect? I don't understand the semantics of ssc.start / awaitTermination well enough to know how it interacts with the print statement on wordCounts (which I think is a DStream of RRDs?)  

I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are.. seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked a number.

I kind of expected wordCounts.print() to be constantly emitting (word, N) pairs to my spark terminal as I typed into the netcat side of things.

I'm using Spark built from github source that I pulled from source earlier today.

I am using the following as my 'origin':
  Fetch URL: git://github.com/apache/incubator-spark.git

... and the most recent commit (master a.k.a. HEAD) is:
commit 4d880304867b55a4f2138617b30600b7fa013b14
Author: Bryn Keller <bryn.keller@intel.com>
Date:   Mon Feb 24 17:35:22 2014 -0800


In any case, I'm happy to help update the docs (or the code) if this is a bug. I realize this is getting long-winded. But in any case, I think my questions really boil down to:

1) should there be a reduceByKey() method on DStream? The documentation at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html says so in the "Transformations" section, but the scaladoc at https://spark.incubator.apache.org/docs/latest/api/streaming/index.html#org.apache.spark.streaming.dstream.DStream doesn't list it.  DStream.scala also doesn't have a definition for such a method...

(and based on reading the source of NetworkWordCount.scala, I can't spot-identify why this *does* work there (i.e., reduceByKey compiles) but it doesn't do so in the terminal)

2) Why do I have to wait for the stream to "terminate" with a ^D before seeing any stdout in the repl from the wordCounts.print() statement?  Doesn't this defeat the point of "streaming"?
2a) how does the print() statement interact with ssc.start() and ssc.awaitTermination() ?

3) is the cleaner TTL something that, as a user, I should be adjusting to change my observed effects? i.e., would adjusting this change the frequency of emissions to stdout of prior window data?  Or is this just a background property that happens to affect the spamminess of my stderr that is routed to the same console?

4) Should I update the documentation to match my example (i.e., no reduceByKey, but use words.countByValue() instead)?

5) Now that Spark is a TLP, are my references to the incubator-spark.git and the http://spark.incubator.apache.org docs woefully out of date, making this entire exercise a goof? :)

Thanks for the help!

Cheers,
- Aaron