spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: Source code JavaNetworkWordcount
Date Thu, 06 Feb 2014 00:19:46 GMT
Yes. You should be able to.

Lets try to have future conversations through the
user@spark.incubator.apache.org mailing list :)


On Wed, Feb 5, 2014 at 2:33 PM, Eduardo Costa Alfaia <e.costaalfaia@unibs.it
> wrote:

> So I could use reduceByKeyAndWindow like this
> val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _,
> Seconds(30), Seconds(10))
> ?
>
>
> > The reduceByKeyAndWindow and other ***ByKey****   operations work only on
> > DStreams of key-value pairs. "Words" is a DStream[String], so its not
> > key-value pairs. "words.map(x => (x, 1))" is DStream[(String, Int)] that
> > has key-value pairs, so you can call reduceByKeyAndWindow.
> >
> > TD
> >
> >
> > On Wed, Feb 5, 2014 at 8:15 AM, Eduardo Costa Alfaia <
> e.costaalfaia@unibs.it
> >> wrote:
> >
> >> Hi Tathagata
> >> I am playing with NetworkWordCount.scala, I did some changes like
> this(in
> >> red):
> >>
> >> // Create the context with a 1 second batch size
> >> 67     val ssc = new StreamingContext(args(0), "NetworkWordCount",
> >> Seconds(1),
> >> 68       System.getenv("SPARK_HOME"),
> >> StreamingContext.jarOfClass(this.getClass))
> >> 69         ssc.checkpoint("hdfs://computer8:54310/user/root/INPUT")
> >> 70     // Create a socket text stream on target ip:port and count the
> >> 71     // words in the input stream of \n delimited text (eg. generated
> >> by 'nc')
> >> 72     val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
> >> StorageLevel.MEMORY_ONLY)
> >> 73     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
> >> StorageLevel.MEMORY_ONLY)
> >> 74     val lines3 = ssc.socketTextStream("localhost", "12345".toInt,
> >> StorageLevel.MEMORY_ONLY)
> >> 75     val union2 = lines1.union(lines2)
> >> 76     val union3 = union2.union(lines3)
> >> 77
> >> 78         //val words = lines.flatMap(_.split(" "))
> >> 79         val words = union3.flatMap(_.split(" "))
> >> 80 //    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
> >> 81         val wordCounts = words.reduceByKeyAndWindow(_ + _,
> >> Seconds(30), Seconds(10))
> >>
> >> However I have gotten the error bellow:
> >>
> >> [error]
> >>
> /opt/unibs_test/incubator-spark-tdas/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala:81:
> >> value reduceByKeyAndWindow is not a member of
> >> org.apache.spark.streaming.dstream.DStream[String]
> >> [error]         val wordCounts = words.reduceByKeyAndWindow(_ + _,
> >> Seconds(30), Seconds(10))
> >> [error]                                ^
> >> [error] one error found
> >> [error] (examples/compile:compile) Compilation failed
> >> [error] Total time: 15 s, completed 05-Feb-2014 17:10:38
> >>
> >>
> >> The class is import within the code:
> >>
> >> import org.apache.spark.streaming.{Seconds, StreamingContext}
> >> import org.apache.spark.streaming.StreamingContext._
> >> import org.apache.spark.storage.StorageLevel
> >>
> >>
> >> Thanks
> >>
> >> On Feb 5, 2014, at 5:22, Tathagata Das <tathagata.das1565@gmail.com>
> >> wrote:
> >>
> >>> Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without
> replication)
> >>> for testing, but you should turn on replication if you want
> >>> fault-tolerance.
> >>>
> >>> TD
> >>>
> >>>
> >>> On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia <
> >> e.costaalfaia@unibs.it
> >>>> wrote:
> >>>
> >>>> Hi Tathagata,
> >>>>
> >>>> You were right when you have said for me to use scala against java,
> >> scala
> >>>> is very easy. I have implemented that code you have given (in bold),
> >> but I
> >>>> have implemented also an union function(in red) because I am testing
> >> with 2
> >>>> stream sources, my idea is putting 3 or more stream sources and doing
> >> the
> >>>> union.
> >>>>
> >>>> object NetworkWordCount {
> >>>> 37   def main(args: Array[String]) {
> >>>> 38     if (args.length < 1) {
> >>>> 39       System.err.println("Usage: NetworkWordCount <master>
> <hostname>
> >>>> <port>\n" +
> >>>> 40         "In local mode, <master> should be 'local[n]' with
n > 1")
> >>>> 41       System.exit(1)
> >>>> 42     }
> >>>> 43
> >>>> 44     StreamingExamples.setStreamingLogLevels()
> >>>> 45
> >>>> 46     // Create the context with a 1 second batch size
> >>>> 47     val ssc = new StreamingContext(args(0), "NetworkWordCount",
> >>>> Seconds(1),
> >>>> 48       System.getenv("SPARK_HOME"),
> >>>> StreamingContext.jarOfClass(this.getClass))
> >>>> 49         ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT")
> >>>> 50     // Create a socket text stream on target ip:port and count the
> >>>> 51     // words in the input stream of \n delimited text (eg.
> generated
> >> by
> >>>> 'nc')
> >>>> 52     *val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
> >>>> StorageLevel.MEMORY_ONLY_SER)*
> >>>> * 53     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
> >>>> StorageLevel.MEMORY_ONLY_SER)*
> >>>> * 54     val union2 = lines1.union(lines2)*
> >>>> 55         //val words = lines.flatMap(_.split(" "))
> >>>> 56         *val words = union2.flatMap(_.split(" "))*
> >>>> 57     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
> >>>> 58
> >>>> 59        * words.count().foreachRDD(rdd => {*
> >>>> * 60     val totalCount = rdd.first()*
> >>>> * 61 *
> >>>> * 62     // print to screen*
> >>>> * 63     println(totalCount)*
> >>>> * 64 *
> >>>> * 65     // append count to file*
> >>>> * 66   //  ...*
> >>>> * 67 })*
> >>>>        //wordCounts.print()
> >>>> 70     ssc.start()
> >>>> 71     ssc.awaitTermination()
> >>>> 72   }
> >>>> 73 }
> >>>>
> >>>> What do you think? is My code right?
> >>>>
> >>>> I have obtained the follow result:
> >>>>
> >>>> root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example
> >>>> org.apache.spark.streaming.examples.NetworkWordCount
> >>>> spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J
> >>>> bindings.
> >>>> SLF4J: Found binding in
> >>>>
> >>>>
> >>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>>> SLF4J: Found binding in
> >>>>
> >>>>
> >>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >>>> explanation.
> >>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> >>>> 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j
> >>>> profile: org/apache/spark/log4j-defaults.properties
> >>>> 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN]
> >> for
> >>>> streaming example. To override add a custom log4j.properties to the
> >>>> classpath.
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 90715
> >>>> 1375825
> >>>> 882490
> >>>> 941226
> >>>> 811032
> >>>> 734399
> >>>> 804453
> >>>> 718688
> >>>> 1058695
> >>>> 854417
> >>>> 813263
> >>>> 798885
> >>>> 785455
> >>>> 952804
> >>>> 780140
> >>>> 697533
> >>>>
> >>>>
> >>>> Thanks Tathagata.
> >>>>
> >>>> Att
> >>>>
> >>>>
> >>>> 2014-01-30 Eduardo Costa Alfaia <e.costaalfaia@unibs.it>:
> >>>>
> >>>>> Hi Tathagata,
> >>>>>
> >>>>> Thank you by your explanations it'll be useful to me to understand
> how
> >>>>> work this piece of code to do that we want. We have created a code
> in C
> >>>>> which send a txt file, for example Don Quixote, like a stream over
> the
> >>>>> network so we've changed the java code from JavaNetworkWordcount
to
> >>>> connect
> >>>>> in each source described within source code. Bellow it is that we've
> >>>>> inserted, three streams sources.
> >>>>>
> >>>>>     JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
> >>>>> Integer.parseInt("12345"));
> >>>>>     JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
> >>>>> Integer.parseInt("12345"));
> >>>>>     JavaDStream<String> lines3 = ssc1.socketTextStream("localhost",
> >>>>> Integer.parseInt("12345"));
> >>>>>     JavaDStream<String> union2 = lines1.union(lines2);
> >>>>>     JavaDStream<String> union3 = union2.union(lines3);
> >>>>>     JavaDStream<String> words = union3.flatMap(new
> >>>>> FlatMapFunction<String, String>() {
> >>>>>
> >>>>> So, the second option that you've given me I think to be the better
> >>>> option.
> >>>>> Sorry Tathagata for my insistence in this case and I thank you by
> your
> >>>>> patient.
> >>>>>
> >>>>> Best Regards
> >>>>>
> >>>>>
> >>>>> 2014-01-30 Tathagata Das <tathagata.das1565@gmail.com>
> >>>>>
> >>>>> Let me first ask for a few clarifications.
> >>>>>>
> >>>>>> 1. If you just want to count the words in a single text file
like
> Don
> >>>>>> Quixote (that is, not for a stream of data), you should use
only
> >> Spark.
> >>>>>> Then the program to count the frequency of words in a text file
> would
> >>>> look
> >>>>>> like this in Java. If you are not super-comfortable with Java,
then
> I
> >>>>>> strongly recommend using the Scala API or pyspark. For scala,
it may
> >> be
> >>>> a
> >>>>>> little trickier to learn if you have absolutely no idea. But
it is
> >> worth
> >>>>>> it. The frequency count would look like this.
> >>>>>>
> >>>>>> val sc = new SparkContext(...)
> >>>>>> val linesInFile = sc.textFile("path_to_file")
> >>>>>> val words = linesInFile.flatMap(line => line.split(" "))
> >>>>>> val frequencies = words.map(word => (word, 1L)).reduceByKey(_
+ _)
> >>>>>> println("Word frequencies = " + frequences.collect())      //
> collect
> >> is
> >>>>>> costly if the file is large
> >>>>>>
> >>>>>>
> >>>>>> 2. Let me assume that you want to do read a stream of text over
the
> >>>>>> network and then print the count of total number of words into
a
> file.
> >>>> Note
> >>>>>> that it is "total number of words" and not "frequency of each
word".
> >> The
> >>>>>> Java version would be something like this.
> >>>>>>
> >>>>>> DStream<Integer> totalCounts = words.count();
> >>>>>>
> >>>>>> totalCounts.foreachRDD(new Function2<JavaRDD<Long>,
Time, Void>() {
> >>>>>>  @Override public Void call(JavaRDD<Long> pairRDD, Time
time) throws
> >>>>>> Exception {
> >>>>>>          Long totalCount = totalCounts.first();
> >>>>>>
> >>>>>>          // print to screen
> >>>>>>          System.out.println(totalCount);
> >>>>>>
> >>>>>>         // append count to file
> >>>>>>         ...
> >>>>>>         return null;
> >>>>>>   }
> >>>>>> })
> >>>>>>
> >>>>>> This is count how many words have been received in each batch.
The
> >> Scala
> >>>>>> version would be much simpler to read.
> >>>>>>
> >>>>>> words.count().foreachRDD(rdd => {
> >>>>>>   val totalCount = rdd.first()
> >>>>>>
> >>>>>>   // print to screen
> >>>>>>   println(totalCount)
> >>>>>>
> >>>>>>   // append count to file
> >>>>>>   ...
> >>>>>> })
> >>>>>>
> >>>>>> Hope this helps! I apologize if the code doesnt compile, I didnt
> test
> >>>> for
> >>>>>> syntax and stuff.
> >>>>>>
> >>>>>> TD
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia <
> >>>>>> e.costaalfaia@unibs.it> wrote:
> >>>>>>
> >>>>>>> Hi Guys,
> >>>>>>>
> >>>>>>> I'm not very good like java programmer, so anybody could
me help
> with
> >>>>>>> this
> >>>>>>> code piece from JavaNetworkWordcount:
> >>>>>>>
> >>>>>>> JavaPairDStream<String, Integer> wordCounts = words.map(
> >>>>>>>       new PairFunction<String, String, Integer>()
{
> >>>>>>>    @Override
> >>>>>>>         public Tuple2<String, Integer> call(String
s) throws
> >>>> Exception
> >>>>>>> {
> >>>>>>>           return new Tuple2<String, Integer>(s, 1);
> >>>>>>>         }
> >>>>>>>       }).reduceByKey(new Function2<Integer, Integer,
Integer>() {
> >>>>>>>         @Override
> >>>>>>>         public Integer call(Integer i1, Integer i2) throws
> Exception
> >>>> {
> >>>>>>>           return i1 + i2;
> >>>>>>>         }
> >>>>>>>       });
> >>>>>>>
> >>>>>>>     JavaPairDStream<String, Integer> counts =
> >>>>>>> wordCounts.reduceByKeyAndWindow(
> >>>>>>>       new Function2<Integer, Integer, Integer>() {
> >>>>>>>         public Integer call(Integer i1, Integer i2) { return
i1 +
> >>>> i2; }
> >>>>>>>       },
> >>>>>>>       new Function2<Integer, Integer, Integer>() {
> >>>>>>>         public Integer call(Integer i1, Integer i2) { return
i1 -
> >>>> i2; }
> >>>>>>>       },
> >>>>>>>       new Duration(60 * 5 * 1000),
> >>>>>>>       new Duration(1 * 1000)
> >>>>>>>     );
> >>>>>>>
> >>>>>>> I would like to think a manner of counting and after summing
 and
> >>>>>>> getting a
> >>>>>>> total from words counted in a single file, for example a
book in
> txt
> >>>>>>> extension Don Quixote. The counts function give me the resulted
> from
> >>>> each
> >>>>>>> word has found and not a total of words from the file.
> >>>>>>> Tathagata has sent me a piece from scala code, Thanks Tathagata
by
> >> your
> >>>>>>> attention with my posts I am very thankfully,
> >>>>>>>
> >>>>>>> yourDStream.foreachRDD(rdd => {
> >>>>>>>
> >>>>>>>  // Get and print first n elements
> >>>>>>>  val firstN = rdd.take(n)
> >>>>>>>  println("First N elements = " + firstN)
> >>>>>>>
> >>>>>>> // Count the number of elements in each batch
> >>>>>>> println("RDD has " + rdd.count() + " elements")
> >>>>>>>
> >>>>>>> })
> >>>>>>>
> >>>>>>> yourDStream.count.print()
> >>>>>>>
> >>>>>>> Could anybody help me?
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks Guys
> >>>>>>>
> >>>>>>> --
> >>>>>>>
> >>>>>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>>>>>>
> >>>>>>> I dati utilizzati per l'invio del presente messaggio sono
trattati
> >>>>>>> dall'Università degli Studi di Brescia esclusivamente per
finalità
> >>>>>>> istituzionali. Informazioni più dettagliate anche in ordine
ai
> >> diritti
> >>>>>>>
> >>>>>>> dell'interessato sono riposte nell'informativa generale
e nelle
> >> notizie
> >>>>>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>>>>>>
> >>>>>>> Il contenuto di questo messaggio è rivolto unicamente alle
persona
> >> cui
> >>>>>>> è indirizzato e può contenere informazioni la cui riservatezza
è
> >>>>>>>
> >>>>>>> tutelata legalmente. Ne sono vietati la riproduzione, la
> diffusione e
> >>>>>>> l'uso
> >>>>>>> in mancanza di autorizzazione del destinatario. Qualora
il
> messaggio
> >>>>>>> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> MSc Eduardo Costa Alfaia
> >>>> PhD Student
> >>>> Università degli Studi di Brescia
> >>>>
> >>>> --
> >>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>>>
> >>>> I dati utilizzati per l'invio del presente messaggio sono trattati
> >>>> dall'Università degli Studi di Brescia esclusivamente per finalità
> >>>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> >>>> dell'interessato sono riposte nell'informativa generale e nelle
> notizie
> >>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>>>
> >>>> Il contenuto di questo messaggio è rivolto unicamente alle persona
cui
> >>>> è indirizzato e può contenere informazioni la cui riservatezza è
> >>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione
e
> >> l'uso
> >>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >>>> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>>>
> >>
> >>
> >> --
> >> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>
> >> I dati utilizzati per l'invio del presente messaggio sono trattati
> >> dall'Università degli Studi di Brescia esclusivamente per finalità
> >> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> >> dell'interessato sono riposte nell'informativa generale e nelle notizie
> >> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>
> >> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> >> è indirizzato e può contenere informazioni la cui riservatezza è
> >> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
> l'uso
> >> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>
>
>
> --
> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>
> I dati utilizzati per l'invio del presente messaggio sono trattati
> dall'Università degli Studi di Brescia esclusivamente per finalità
> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> dell'interessato sono riposte nell'informativa generale e nelle notizie
> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>
> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> è indirizzato e può contenere informazioni la cui riservatezza è
> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> fosse pervenuto per errore, preghiamo di eliminarlo.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message