In any case, here's the code I ran:
$ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(2))
scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
scala> val words = lines.flatMap(_.split(" "))
// *** The following code from the html page doesn't work
// because pairs has type DStream[(String, Int)] and
// there is no reduceByKey method on this type.
// Count each word in each batch
scala> val pairs = words.map(word => (word, 1))
scala> val wordCounts = pairs.reduceByKey(_ + _) // <-- error here. no reduceByKey()
// Print a few of the counts to the console
scala> wordCount.print() // ... and even if the above did work, 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile as written.
Instead, I got the following to run instead:
scala> val wordCounts = words.countByValue()
scala> ssc.start() // Start the computation
This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it.. but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into the netcat stream. It seems to print the output for all 2-second windows all-at-once after the ^D in the network stream. Is this an expected effect? I don't understand the semantics of ssc.start / awaitTermination well enough to know how it interacts with the print statement on wordCounts (which I think is a DStream of RRDs?)
I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are.. seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked a number.
I kind of expected wordCounts.print() to be constantly emitting (word, N) pairs to my spark terminal as I typed into the netcat side of things.
I'm using Spark built from github source that I pulled from source earlier today.
I am using the following as my 'origin':
... and the most recent commit (master a.k.a. HEAD) is:
Date: Mon Feb 24 17:35:22 2014 -0800
In any case, I'm happy to help update the docs (or the code) if this is a bug. I realize this is getting long-winded. But in any case, I think my questions really boil down to:
(and based on reading the source of NetworkWordCount.scala, I can't spot-identify why this *does* work there (i.e., reduceByKey compiles) but it doesn't do so in the terminal)
2) Why do I have to wait for the stream to "terminate" with a ^D before seeing any stdout in the repl from the wordCounts.print() statement? Doesn't this defeat the point of "streaming"?
2a) how does the print() statement interact with ssc.start() and ssc.awaitTermination() ?
3) is the cleaner TTL something that, as a user, I should be adjusting to change my observed effects? i.e., would adjusting this change the frequency of emissions to stdout of prior window data? Or is this just a background property that happens to affect the spamminess of my stderr that is routed to the same console?
4) Should I update the documentation to match my example (i.e., no reduceByKey, but use words.countByValue() instead)?
5) Now that Spark is a TLP, are my references to the incubator-spark.git and the http://spark.incubator.apache.org
docs woefully out of date, making this entire exercise a goof? :)
Thanks for the help!