spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matei Zaharia <matei.zaha...@gmail.com>
Subject Re: error in streaming word count API?
Date Mon, 03 Mar 2014 00:59:54 GMT
Hi Aaron,

On Feb 28, 2014, at 8:46 PM, Aaron Kimball <akimball83@gmail.com> wrote:

> Hi folks,
> 
> I was trying to work through the streaming word count example at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html
and couldn't get the code as-written to run. In fairness, I was trying to do this inside the
REPL rather than compiling a separate project; would the types be different?
> 
> In any case, here's the code I ran:
> 
> $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
> 
> scala> import org.apache.spark.streaming._
> scala> val ssc = new StreamingContext(sc, Seconds(2))
> scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
> scala> val words = lines.flatMap(_.split(" "))
> 
> // *** The following code from the html page doesn't work
> // because pairs has type DStream[(String, Int)] and
> // there is no reduceByKey method on this type.

This seems to be an oversight in the docs. You need to import org.apache.spark.streaming.StreamingContext._
in order to get the pair functions on DStreams of pairs (through a Scala implicit conversion).
reduceByKey is actually a function on something called PairDStreamFunctions, and the implicit
conversion above provides it for you only if your DStream has key-value pairs. See http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.html
for how this works.


> // Count each word in each batch
> scala> val pairs = words.map(word => (word, 1))
> scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no reduceByKey()
> 
> // Print a few of the counts to the console
> scala> wordCount.print()   // ... and even if the above did work, 'wordCount' and
'wordCounts' are different symbols ;) This couldn't compile as written.

Also looks like a bug in the docs.

> 
> Instead, I got the following to run instead:
> scala> val wordCounts = words.countByValue()
> scala> wordCounts.print()
> scala> ssc.start()             // Start the computation
> scala> ssc.awaitTermination()
> 
> This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it..
but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into
the netcat stream. It seems to print the output for all 2-second windows all-at-once after
the ^D in the network stream. Is this an expected effect? I don't understand the semantics
of ssc.start / awaitTermination well enough to know how it interacts with the print statement
on wordCounts (which I think is a DStream of RRDs?)  

It might also be that netcat didn’t flush the stream right away when you type input. Not
100% sure about that though. You could try to listen to it using netcat on a different port
and see if it does.

> 
> I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are..
seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal
unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked
a number.

This shouldn’t matter for this problem.

> 5) Now that Spark is a TLP, are my references to the incubator-spark.git and the http://spark.incubator.apache.org
docs woefully out of date, making this entire exercise a goof? :)

If you find these, definitely feel free to fix them, though I believe some recent pull requests
fixed a few of them.

Anyway, thanks for reporting this stuff!

Matei


Mime
View raw message