spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron Kimball <akimbal...@gmail.com>
Subject Re: error in streaming word count API?
Date Mon, 03 Mar 2014 07:16:45 GMT
Filed SPARK-1173 and sent a pull request. As an aside, I think this should
probably have been in the STREAMING project on the JIRA, but JIRA seemed
adamant that it only allow me to create new issues in the SPARK project.
Not sure if that's a JIRA permissions thing, or me losing a fight with
Atlassian UX ;) Please let me know if I should do something different next
time.

Cheers,
- Aaron


On Sun, Mar 2, 2014 at 10:28 PM, Aaron Kimball <akimball83@gmail.com> wrote:

> Running `nc -lk 1234`  in one terminal, and running `nc localhost 1234` in
> another, it demonstrates line-buffered behavior. It's a mystery!
>
> Thanks for the link on implicit conversions. The example makes sense.
>  Makes the code easier to trace too. I'll send a JIRA + pull req to touch
> up the docs.
>
> cheers,
> - Aaron
>
>
> On Sun, Mar 2, 2014 at 4:59 PM, Matei Zaharia <matei.zaharia@gmail.com>wrote:
>
>> Hi Aaron,
>>
>> On Feb 28, 2014, at 8:46 PM, Aaron Kimball <akimball83@gmail.com> wrote:
>>
>> > Hi folks,
>> >
>> > I was trying to work through the streaming word count example at
>> http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmland
couldn't get the code as-written to run. In fairness, I was trying to
>> do this inside the REPL rather than compiling a separate project; would the
>> types be different?
>> >
>> > In any case, here's the code I ran:
>> >
>> > $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
>> >
>> > scala> import org.apache.spark.streaming._
>> > scala> val ssc = new StreamingContext(sc, Seconds(2))
>> > scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
>> > scala> val words = lines.flatMap(_.split(" "))
>> >
>> > // *** The following code from the html page doesn't work
>> > // because pairs has type DStream[(String, Int)] and
>> > // there is no reduceByKey method on this type.
>>
>> This seems to be an oversight in the docs. You need to import
>> org.apache.spark.streaming.StreamingContext._ in order to get the pair
>> functions on DStreams of pairs (through a Scala implicit conversion).
>> reduceByKey is actually a function on something called
>> PairDStreamFunctions, and the implicit conversion above provides it for you
>> only if your DStream has key-value pairs. See
>> http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.htmlfor how
this works.
>>
>>
>> > // Count each word in each batch
>> > scala> val pairs = words.map(word => (word, 1))
>> > scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no
>> reduceByKey()
>> >
>> > // Print a few of the counts to the console
>> > scala> wordCount.print()   // ... and even if the above did work,
>> 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile
>> as written.
>>
>> Also looks like a bug in the docs.
>>
>> >
>> > Instead, I got the following to run instead:
>> > scala> val wordCounts = words.countByValue()
>> > scala> wordCounts.print()
>> > scala> ssc.start()             // Start the computation
>> > scala> ssc.awaitTermination()
>> >
>> > This worked if I ran 'nc -lk 1234' in another terminal and typed some
>> words into it.. but the 'wordCounts.print()' statement would only emit
>> things to stdout if I sent a ^D into the netcat stream. It seems to print
>> the output for all 2-second windows all-at-once after the ^D in the network
>> stream. Is this an expected effect? I don't understand the semantics of
>> ssc.start / awaitTermination well enough to know how it interacts with the
>> print statement on wordCounts (which I think is a DStream of RRDs?)
>>
>> It might also be that netcat didn't flush the stream right away when you
>> type input. Not 100% sure about that though. You could try to listen to it
>> using netcat on a different port and see if it does.
>>
>> >
>> > I set spark.cleaner.ttl to a relatively high value (I'm not sure what
>> units those are.. seconds or millis) because a lower value caused stderr to
>> spam everywhere and make my terminal unreadable. Is that part of my issue?
>> the spark repl said I had to set it, so I just picked a number.
>>
>> This shouldn't matter for this problem.
>>
>> > 5) Now that Spark is a TLP, are my references to the
>> incubator-spark.git and the http://spark.incubator.apache.org docs
>> woefully out of date, making this entire exercise a goof? :)
>>
>> If you find these, definitely feel free to fix them, though I believe
>> some recent pull requests fixed a few of them.
>>
>> Anyway, thanks for reporting this stuff!
>>
>> Matei
>>
>>
>

Mime
View raw message