Filed SPARK-1173 and sent a pull request. As an aside, I think this should probably have been in the STREAMING project on the JIRA, but JIRA seemed adamant that it only allow me to create new issues in the SPARK project. Not sure if that's a JIRA permissions thing, or me losing a fight with Atlassian UX ;) Please let me know if I should do something different next time.

- Aaron

On Sun, Mar 2, 2014 at 10:28 PM, Aaron Kimball <> wrote:
Running `nc -lk 1234`  in one terminal, and running `nc localhost 1234` in another, it demonstrates line-buffered behavior. It's a mystery! 

Thanks for the link on implicit conversions. The example makes sense.  Makes the code easier to trace too. I'll send a JIRA + pull req to touch up the docs.

- Aaron

On Sun, Mar 2, 2014 at 4:59 PM, Matei Zaharia <> wrote:
Hi Aaron,

On Feb 28, 2014, at 8:46 PM, Aaron Kimball <> wrote:

> Hi folks,
> I was trying to work through the streaming word count example at and couldn't get the code as-written to run. In fairness, I was trying to do this inside the REPL rather than compiling a separate project; would the types be different?
> In any case, here's the code I ran:
> $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
> scala> import org.apache.spark.streaming._
> scala> val ssc = new StreamingContext(sc, Seconds(2))
> scala> val lines = ssc.socketTextStream("", 1234)
> scala> val words = lines.flatMap(_.split(" "))
> // *** The following code from the html page doesn't work
> // because pairs has type DStream[(String, Int)] and
> // there is no reduceByKey method on this type.

This seems to be an oversight in the docs. You need to import org.apache.spark.streaming.StreamingContext._ in order to get the pair functions on DStreams of pairs (through a Scala implicit conversion). reduceByKey is actually a function on something called PairDStreamFunctions, and the implicit conversion above provides it for you only if your DStream has key-value pairs. See for how this works.

> // Count each word in each batch
> scala> val pairs = => (word, 1))
> scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no reduceByKey()
> // Print a few of the counts to the console
> scala> wordCount.print()   // ... and even if the above did work, 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile as written.

Also looks like a bug in the docs.

> Instead, I got the following to run instead:
> scala> val wordCounts = words.countByValue()
> scala> wordCounts.print()
> scala> ssc.start()             // Start the computation
> scala> ssc.awaitTermination()
> This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it.. but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into the netcat stream. It seems to print the output for all 2-second windows all-at-once after the ^D in the network stream. Is this an expected effect? I don't understand the semantics of ssc.start / awaitTermination well enough to know how it interacts with the print statement on wordCounts (which I think is a DStream of RRDs?)

It might also be that netcat didn’t flush the stream right away when you type input. Not 100% sure about that though. You could try to listen to it using netcat on a different port and see if it does.

> I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are.. seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked a number.

This shouldn’t matter for this problem.

> 5) Now that Spark is a TLP, are my references to the incubator-spark.git and the docs woefully out of date, making this entire exercise a goof? :)

If you find these, definitely feel free to fix them, though I believe some recent pull requests fixed a few of them.

Anyway, thanks for reporting this stuff!