spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eduardo Costa Alfaia <e.costaalf...@unibs.it>
Subject Re: Source code JavaNetworkWordcount
Date Wed, 05 Feb 2014 22:33:01 GMT
So I could use reduceByKeyAndWindow like this
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
?


> The reduceByKeyAndWindow and other ***ByKey****   operations work only on
> DStreams of key-value pairs. "Words" is a DStream[String], so its not
> key-value pairs. "words.map(x => (x, 1))" is DStream[(String, Int)] that
> has key-value pairs, so you can call reduceByKeyAndWindow.
> 
> TD
> 
> 
> On Wed, Feb 5, 2014 at 8:15 AM, Eduardo Costa Alfaia <e.costaalfaia@unibs.it
>> wrote:
> 
>> Hi Tathagata
>> I am playing with NetworkWordCount.scala, I did some changes like this(in
>> red):
>> 
>> // Create the context with a 1 second batch size
>> 67     val ssc = new StreamingContext(args(0), "NetworkWordCount",
>> Seconds(1),
>> 68       System.getenv("SPARK_HOME"),
>> StreamingContext.jarOfClass(this.getClass))
>> 69         ssc.checkpoint("hdfs://computer8:54310/user/root/INPUT")
>> 70     // Create a socket text stream on target ip:port and count the
>> 71     // words in the input stream of \n delimited text (eg. generated
>> by 'nc')
>> 72     val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
>> StorageLevel.MEMORY_ONLY)
>> 73     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
>> StorageLevel.MEMORY_ONLY)
>> 74     val lines3 = ssc.socketTextStream("localhost", "12345".toInt,
>> StorageLevel.MEMORY_ONLY)
>> 75     val union2 = lines1.union(lines2)
>> 76     val union3 = union2.union(lines3)
>> 77
>> 78         //val words = lines.flatMap(_.split(" "))
>> 79         val words = union3.flatMap(_.split(" "))
>> 80 //    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>> 81         val wordCounts = words.reduceByKeyAndWindow(_ + _,
>> Seconds(30), Seconds(10))
>> 
>> However I have gotten the error bellow:
>> 
>> [error]
>> /opt/unibs_test/incubator-spark-tdas/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala:81:
>> value reduceByKeyAndWindow is not a member of
>> org.apache.spark.streaming.dstream.DStream[String]
>> [error]         val wordCounts = words.reduceByKeyAndWindow(_ + _,
>> Seconds(30), Seconds(10))
>> [error]                                ^
>> [error] one error found
>> [error] (examples/compile:compile) Compilation failed
>> [error] Total time: 15 s, completed 05-Feb-2014 17:10:38
>> 
>> 
>> The class is import within the code:
>> 
>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>> import org.apache.spark.streaming.StreamingContext._
>> import org.apache.spark.storage.StorageLevel
>> 
>> 
>> Thanks
>> 
>> On Feb 5, 2014, at 5:22, Tathagata Das <tathagata.das1565@gmail.com>
>> wrote:
>> 
>>> Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without replication)
>>> for testing, but you should turn on replication if you want
>>> fault-tolerance.
>>> 
>>> TD
>>> 
>>> 
>>> On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia <
>> e.costaalfaia@unibs.it
>>>> wrote:
>>> 
>>>> Hi Tathagata,
>>>> 
>>>> You were right when you have said for me to use scala against java,
>> scala
>>>> is very easy. I have implemented that code you have given (in bold),
>> but I
>>>> have implemented also an union function(in red) because I am testing
>> with 2
>>>> stream sources, my idea is putting 3 or more stream sources and doing
>> the
>>>> union.
>>>> 
>>>> object NetworkWordCount {
>>>> 37   def main(args: Array[String]) {
>>>> 38     if (args.length < 1) {
>>>> 39       System.err.println("Usage: NetworkWordCount <master> <hostname>
>>>> <port>\n" +
>>>> 40         "In local mode, <master> should be 'local[n]' with n >
1")
>>>> 41       System.exit(1)
>>>> 42     }
>>>> 43
>>>> 44     StreamingExamples.setStreamingLogLevels()
>>>> 45
>>>> 46     // Create the context with a 1 second batch size
>>>> 47     val ssc = new StreamingContext(args(0), "NetworkWordCount",
>>>> Seconds(1),
>>>> 48       System.getenv("SPARK_HOME"),
>>>> StreamingContext.jarOfClass(this.getClass))
>>>> 49         ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT")
>>>> 50     // Create a socket text stream on target ip:port and count the
>>>> 51     // words in the input stream of \n delimited text (eg. generated
>> by
>>>> 'nc')
>>>> 52     *val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
>>>> StorageLevel.MEMORY_ONLY_SER)*
>>>> * 53     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
>>>> StorageLevel.MEMORY_ONLY_SER)*
>>>> * 54     val union2 = lines1.union(lines2)*
>>>> 55         //val words = lines.flatMap(_.split(" "))
>>>> 56         *val words = union2.flatMap(_.split(" "))*
>>>> 57     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>>>> 58
>>>> 59        * words.count().foreachRDD(rdd => {*
>>>> * 60     val totalCount = rdd.first()*
>>>> * 61 *
>>>> * 62     // print to screen*
>>>> * 63     println(totalCount)*
>>>> * 64 *
>>>> * 65     // append count to file*
>>>> * 66   //  ...*
>>>> * 67 })*
>>>>        //wordCounts.print()
>>>> 70     ssc.start()
>>>> 71     ssc.awaitTermination()
>>>> 72   }
>>>> 73 }
>>>> 
>>>> What do you think? is My code right?
>>>> 
>>>> I have obtained the follow result:
>>>> 
>>>> root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example
>>>> org.apache.spark.streaming.examples.NetworkWordCount
>>>> spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J
>>>> bindings.
>>>> SLF4J: Found binding in
>>>> 
>>>> 
>> [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: Found binding in
>>>> 
>>>> 
>> [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>> explanation.
>>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>>> 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j
>>>> profile: org/apache/spark/log4j-defaults.properties
>>>> 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN]
>> for
>>>> streaming example. To override add a custom log4j.properties to the
>>>> classpath.
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 90715
>>>> 1375825
>>>> 882490
>>>> 941226
>>>> 811032
>>>> 734399
>>>> 804453
>>>> 718688
>>>> 1058695
>>>> 854417
>>>> 813263
>>>> 798885
>>>> 785455
>>>> 952804
>>>> 780140
>>>> 697533
>>>> 
>>>> 
>>>> Thanks Tathagata.
>>>> 
>>>> Att
>>>> 
>>>> 
>>>> 2014-01-30 Eduardo Costa Alfaia <e.costaalfaia@unibs.it>:
>>>> 
>>>>> Hi Tathagata,
>>>>> 
>>>>> Thank you by your explanations it'll be useful to me to understand how
>>>>> work this piece of code to do that we want. We have created a code in
C
>>>>> which send a txt file, for example Don Quixote, like a stream over the
>>>>> network so we've changed the java code from JavaNetworkWordcount to
>>>> connect
>>>>> in each source described within source code. Bellow it is that we've
>>>>> inserted, three streams sources.
>>>>> 
>>>>>     JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
>>>>> Integer.parseInt("12345"));
>>>>>     JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
>>>>> Integer.parseInt("12345"));
>>>>>     JavaDStream<String> lines3 = ssc1.socketTextStream("localhost",
>>>>> Integer.parseInt("12345"));
>>>>>     JavaDStream<String> union2 = lines1.union(lines2);
>>>>>     JavaDStream<String> union3 = union2.union(lines3);
>>>>>     JavaDStream<String> words = union3.flatMap(new
>>>>> FlatMapFunction<String, String>() {
>>>>> 
>>>>> So, the second option that you've given me I think to be the better
>>>> option.
>>>>> Sorry Tathagata for my insistence in this case and I thank you by your
>>>>> patient.
>>>>> 
>>>>> Best Regards
>>>>> 
>>>>> 
>>>>> 2014-01-30 Tathagata Das <tathagata.das1565@gmail.com>
>>>>> 
>>>>> Let me first ask for a few clarifications.
>>>>>> 
>>>>>> 1. If you just want to count the words in a single text file like
Don
>>>>>> Quixote (that is, not for a stream of data), you should use only
>> Spark.
>>>>>> Then the program to count the frequency of words in a text file would
>>>> look
>>>>>> like this in Java. If you are not super-comfortable with Java, then
I
>>>>>> strongly recommend using the Scala API or pyspark. For scala, it
may
>> be
>>>> a
>>>>>> little trickier to learn if you have absolutely no idea. But it is
>> worth
>>>>>> it. The frequency count would look like this.
>>>>>> 
>>>>>> val sc = new SparkContext(...)
>>>>>> val linesInFile = sc.textFile("path_to_file")
>>>>>> val words = linesInFile.flatMap(line => line.split(" "))
>>>>>> val frequencies = words.map(word => (word, 1L)).reduceByKey(_
+ _)
>>>>>> println("Word frequencies = " + frequences.collect())      // collect
>> is
>>>>>> costly if the file is large
>>>>>> 
>>>>>> 
>>>>>> 2. Let me assume that you want to do read a stream of text over the
>>>>>> network and then print the count of total number of words into a
file.
>>>> Note
>>>>>> that it is "total number of words" and not "frequency of each word".
>> The
>>>>>> Java version would be something like this.
>>>>>> 
>>>>>> DStream<Integer> totalCounts = words.count();
>>>>>> 
>>>>>> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time,
Void>() {
>>>>>>  @Override public Void call(JavaRDD<Long> pairRDD, Time time)
throws
>>>>>> Exception {
>>>>>>          Long totalCount = totalCounts.first();
>>>>>> 
>>>>>>          // print to screen
>>>>>>          System.out.println(totalCount);
>>>>>> 
>>>>>>         // append count to file
>>>>>>         ...
>>>>>>         return null;
>>>>>>   }
>>>>>> })
>>>>>> 
>>>>>> This is count how many words have been received in each batch. The
>> Scala
>>>>>> version would be much simpler to read.
>>>>>> 
>>>>>> words.count().foreachRDD(rdd => {
>>>>>>   val totalCount = rdd.first()
>>>>>> 
>>>>>>   // print to screen
>>>>>>   println(totalCount)
>>>>>> 
>>>>>>   // append count to file
>>>>>>   ...
>>>>>> })
>>>>>> 
>>>>>> Hope this helps! I apologize if the code doesnt compile, I didnt
test
>>>> for
>>>>>> syntax and stuff.
>>>>>> 
>>>>>> TD
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia <
>>>>>> e.costaalfaia@unibs.it> wrote:
>>>>>> 
>>>>>>> Hi Guys,
>>>>>>> 
>>>>>>> I'm not very good like java programmer, so anybody could me help
with
>>>>>>> this
>>>>>>> code piece from JavaNetworkWordcount:
>>>>>>> 
>>>>>>> JavaPairDStream<String, Integer> wordCounts = words.map(
>>>>>>>       new PairFunction<String, String, Integer>() {
>>>>>>>    @Override
>>>>>>>         public Tuple2<String, Integer> call(String s) throws
>>>> Exception
>>>>>>> {
>>>>>>>           return new Tuple2<String, Integer>(s, 1);
>>>>>>>         }
>>>>>>>       }).reduceByKey(new Function2<Integer, Integer, Integer>()
{
>>>>>>>         @Override
>>>>>>>         public Integer call(Integer i1, Integer i2) throws Exception
>>>> {
>>>>>>>           return i1 + i2;
>>>>>>>         }
>>>>>>>       });
>>>>>>> 
>>>>>>>     JavaPairDStream<String, Integer> counts =
>>>>>>> wordCounts.reduceByKeyAndWindow(
>>>>>>>       new Function2<Integer, Integer, Integer>() {
>>>>>>>         public Integer call(Integer i1, Integer i2) { return
i1 +
>>>> i2; }
>>>>>>>       },
>>>>>>>       new Function2<Integer, Integer, Integer>() {
>>>>>>>         public Integer call(Integer i1, Integer i2) { return
i1 -
>>>> i2; }
>>>>>>>       },
>>>>>>>       new Duration(60 * 5 * 1000),
>>>>>>>       new Duration(1 * 1000)
>>>>>>>     );
>>>>>>> 
>>>>>>> I would like to think a manner of counting and after summing
 and
>>>>>>> getting a
>>>>>>> total from words counted in a single file, for example a book
in txt
>>>>>>> extension Don Quixote. The counts function give me the resulted
from
>>>> each
>>>>>>> word has found and not a total of words from the file.
>>>>>>> Tathagata has sent me a piece from scala code, Thanks Tathagata
by
>> your
>>>>>>> attention with my posts I am very thankfully,
>>>>>>> 
>>>>>>> yourDStream.foreachRDD(rdd => {
>>>>>>> 
>>>>>>>  // Get and print first n elements
>>>>>>>  val firstN = rdd.take(n)
>>>>>>>  println("First N elements = " + firstN)
>>>>>>> 
>>>>>>> // Count the number of elements in each batch
>>>>>>> println("RDD has " + rdd.count() + " elements")
>>>>>>> 
>>>>>>> })
>>>>>>> 
>>>>>>> yourDStream.count.print()
>>>>>>> 
>>>>>>> Could anybody help me?
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks Guys
>>>>>>> 
>>>>>>> --
>>>>>>> 
>>>>>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>>>>>>> 
>>>>>>> I dati utilizzati per l'invio del presente messaggio sono trattati
>>>>>>> dall'Università degli Studi di Brescia esclusivamente per finalità
>>>>>>> istituzionali. Informazioni più dettagliate anche in ordine
ai
>> diritti
>>>>>>> 
>>>>>>> dell'interessato sono riposte nell'informativa generale e nelle
>> notizie
>>>>>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>>>>>>> 
>>>>>>> Il contenuto di questo messaggio è rivolto unicamente alle persona
>> cui
>>>>>>> è indirizzato e può contenere informazioni la cui riservatezza
è
>>>>>>> 
>>>>>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione
e
>>>>>>> l'uso
>>>>>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
>>>>>>> fosse pervenuto per errore, preghiamo di eliminarlo.
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> MSc Eduardo Costa Alfaia
>>>> PhD Student
>>>> Università degli Studi di Brescia
>>>> 
>>>> --
>>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>>>> 
>>>> I dati utilizzati per l'invio del presente messaggio sono trattati
>>>> dall'Università degli Studi di Brescia esclusivamente per finalità
>>>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
>>>> dell'interessato sono riposte nell'informativa generale e nelle notizie
>>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>>>> 
>>>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
>>>> è indirizzato e può contenere informazioni la cui riservatezza è
>>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
>> l'uso
>>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
>>>> fosse pervenuto per errore, preghiamo di eliminarlo.
>>>> 
>> 
>> 
>> --
>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>> 
>> I dati utilizzati per l'invio del presente messaggio sono trattati
>> dall'Università degli Studi di Brescia esclusivamente per finalità
>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
>> dell'interessato sono riposte nell'informativa generale e nelle notizie
>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>> 
>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
>> è indirizzato e può contenere informazioni la cui riservatezza è
>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
>> fosse pervenuto per errore, preghiamo di eliminarlo.
>> 


-- 
INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI

I dati utilizzati per l'invio del presente messaggio sono trattati 
dall'Università degli Studi di Brescia esclusivamente per finalità 
istituzionali. Informazioni più dettagliate anche in ordine ai diritti 
dell'interessato sono riposte nell'informativa generale e nelle notizie 
pubblicate sul sito web dell'Ateneo nella sezione "Privacy".

Il contenuto di questo messaggio è rivolto unicamente alle persona cui 
è indirizzato e può contenere informazioni la cui riservatezza è 
tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso 
in mancanza di autorizzazione del destinatario. Qualora il messaggio 
fosse pervenuto per errore, preghiamo di eliminarlo.

Mime
View raw message