spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: Source code JavaNetworkWordcount
Date Wed, 05 Feb 2014 18:07:56 GMT
The reduceByKeyAndWindow and other ***ByKey****   operations work only on
DStreams of key-value pairs. "Words" is a DStream[String], so its not
key-value pairs. "words.map(x => (x, 1))" is DStream[(String, Int)] that
has key-value pairs, so you can call reduceByKeyAndWindow.

TD


On Wed, Feb 5, 2014 at 8:15 AM, Eduardo Costa Alfaia <e.costaalfaia@unibs.it
> wrote:

> Hi Tathagata
> I am playing with NetworkWordCount.scala, I did some changes like this(in
> red):
>
>  // Create the context with a 1 second batch size
>  67     val ssc = new StreamingContext(args(0), "NetworkWordCount",
> Seconds(1),
>  68       System.getenv("SPARK_HOME"),
> StreamingContext.jarOfClass(this.getClass))
>  69         ssc.checkpoint("hdfs://computer8:54310/user/root/INPUT")
>  70     // Create a socket text stream on target ip:port and count the
>  71     // words in the input stream of \n delimited text (eg. generated
> by 'nc')
>  72     val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
> StorageLevel.MEMORY_ONLY)
>  73     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
> StorageLevel.MEMORY_ONLY)
>  74     val lines3 = ssc.socketTextStream("localhost", "12345".toInt,
> StorageLevel.MEMORY_ONLY)
>  75     val union2 = lines1.union(lines2)
>  76     val union3 = union2.union(lines3)
>  77
>  78         //val words = lines.flatMap(_.split(" "))
>  79         val words = union3.flatMap(_.split(" "))
>  80 //    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>  81         val wordCounts = words.reduceByKeyAndWindow(_ + _,
> Seconds(30), Seconds(10))
>
> However I have gotten the error bellow:
>
> [error]
> /opt/unibs_test/incubator-spark-tdas/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala:81:
> value reduceByKeyAndWindow is not a member of
> org.apache.spark.streaming.dstream.DStream[String]
> [error]         val wordCounts = words.reduceByKeyAndWindow(_ + _,
> Seconds(30), Seconds(10))
> [error]                                ^
> [error] one error found
> [error] (examples/compile:compile) Compilation failed
> [error] Total time: 15 s, completed 05-Feb-2014 17:10:38
>
>
> The class is import within the code:
>
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.storage.StorageLevel
>
>
> Thanks
>
> On Feb 5, 2014, at 5:22, Tathagata Das <tathagata.das1565@gmail.com>
> wrote:
>
> > Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without replication)
> > for testing, but you should turn on replication if you want
> > fault-tolerance.
> >
> > TD
> >
> >
> > On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia <
> e.costaalfaia@unibs.it
> >> wrote:
> >
> >> Hi Tathagata,
> >>
> >> You were right when you have said for me to use scala against java,
> scala
> >> is very easy. I have implemented that code you have given (in bold),
> but I
> >> have implemented also an union function(in red) because I am testing
> with 2
> >> stream sources, my idea is putting 3 or more stream sources and doing
> the
> >> union.
> >>
> >> object NetworkWordCount {
> >> 37   def main(args: Array[String]) {
> >> 38     if (args.length < 1) {
> >> 39       System.err.println("Usage: NetworkWordCount <master> <hostname>
> >> <port>\n" +
> >> 40         "In local mode, <master> should be 'local[n]' with n > 1")
> >> 41       System.exit(1)
> >> 42     }
> >> 43
> >> 44     StreamingExamples.setStreamingLogLevels()
> >> 45
> >> 46     // Create the context with a 1 second batch size
> >> 47     val ssc = new StreamingContext(args(0), "NetworkWordCount",
> >> Seconds(1),
> >> 48       System.getenv("SPARK_HOME"),
> >> StreamingContext.jarOfClass(this.getClass))
> >> 49         ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT")
> >> 50     // Create a socket text stream on target ip:port and count the
> >> 51     // words in the input stream of \n delimited text (eg. generated
> by
> >> 'nc')
> >> 52     *val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
> >> StorageLevel.MEMORY_ONLY_SER)*
> >> * 53     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
> >> StorageLevel.MEMORY_ONLY_SER)*
> >> * 54     val union2 = lines1.union(lines2)*
> >> 55         //val words = lines.flatMap(_.split(" "))
> >> 56         *val words = union2.flatMap(_.split(" "))*
> >> 57     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
> >> 58
> >> 59        * words.count().foreachRDD(rdd => {*
> >> * 60     val totalCount = rdd.first()*
> >> * 61 *
> >> * 62     // print to screen*
> >> * 63     println(totalCount)*
> >> * 64 *
> >> * 65     // append count to file*
> >> * 66   //  ...*
> >> * 67 })*
> >>         //wordCounts.print()
> >> 70     ssc.start()
> >> 71     ssc.awaitTermination()
> >> 72   }
> >> 73 }
> >>
> >> What do you think? is My code right?
> >>
> >> I have obtained the follow result:
> >>
> >> root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example
> >> org.apache.spark.streaming.examples.NetworkWordCount
> >> spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J
> >> bindings.
> >> SLF4J: Found binding in
> >>
> >>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> SLF4J: Found binding in
> >>
> >>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >> explanation.
> >> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> >> 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j
> >> profile: org/apache/spark/log4j-defaults.properties
> >> 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN]
> for
> >> streaming example. To override add a custom log4j.properties to the
> >> classpath.
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 90715
> >> 1375825
> >> 882490
> >> 941226
> >> 811032
> >> 734399
> >> 804453
> >> 718688
> >> 1058695
> >> 854417
> >> 813263
> >> 798885
> >> 785455
> >> 952804
> >> 780140
> >> 697533
> >>
> >>
> >> Thanks Tathagata.
> >>
> >> Att
> >>
> >>
> >> 2014-01-30 Eduardo Costa Alfaia <e.costaalfaia@unibs.it>:
> >>
> >>> Hi Tathagata,
> >>>
> >>> Thank you by your explanations it'll be useful to me to understand how
> >>> work this piece of code to do that we want. We have created a code in C
> >>> which send a txt file, for example Don Quixote, like a stream over the
> >>> network so we've changed the java code from JavaNetworkWordcount to
> >> connect
> >>> in each source described within source code. Bellow it is that we've
> >>> inserted, three streams sources.
> >>>
> >>>      JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
> >>> Integer.parseInt("12345"));
> >>>      JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
> >>> Integer.parseInt("12345"));
> >>>      JavaDStream<String> lines3 = ssc1.socketTextStream("localhost",
> >>> Integer.parseInt("12345"));
> >>>      JavaDStream<String> union2 = lines1.union(lines2);
> >>>      JavaDStream<String> union3 = union2.union(lines3);
> >>>      JavaDStream<String> words = union3.flatMap(new
> >>> FlatMapFunction<String, String>() {
> >>>
> >>> So, the second option that you've given me I think to be the better
> >> option.
> >>> Sorry Tathagata for my insistence in this case and I thank you by your
> >>> patient.
> >>>
> >>> Best Regards
> >>>
> >>>
> >>> 2014-01-30 Tathagata Das <tathagata.das1565@gmail.com>
> >>>
> >>> Let me first ask for a few clarifications.
> >>>>
> >>>> 1. If you just want to count the words in a single text file like Don
> >>>> Quixote (that is, not for a stream of data), you should use only
> Spark.
> >>>> Then the program to count the frequency of words in a text file would
> >> look
> >>>> like this in Java. If you are not super-comfortable with Java, then
I
> >>>> strongly recommend using the Scala API or pyspark. For scala, it may
> be
> >> a
> >>>> little trickier to learn if you have absolutely no idea. But it is
> worth
> >>>> it. The frequency count would look like this.
> >>>>
> >>>> val sc = new SparkContext(...)
> >>>> val linesInFile = sc.textFile("path_to_file")
> >>>> val words = linesInFile.flatMap(line => line.split(" "))
> >>>> val frequencies = words.map(word => (word, 1L)).reduceByKey(_ + _)
> >>>> println("Word frequencies = " + frequences.collect())      // collect
> is
> >>>> costly if the file is large
> >>>>
> >>>>
> >>>> 2. Let me assume that you want to do read a stream of text over the
> >>>> network and then print the count of total number of words into a file.
> >> Note
> >>>> that it is "total number of words" and not "frequency of each word".
> The
> >>>> Java version would be something like this.
> >>>>
> >>>> DStream<Integer> totalCounts = words.count();
> >>>>
> >>>> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time, Void>()
{
> >>>>   @Override public Void call(JavaRDD<Long> pairRDD, Time time)
throws
> >>>> Exception {
> >>>>           Long totalCount = totalCounts.first();
> >>>>
> >>>>           // print to screen
> >>>>           System.out.println(totalCount);
> >>>>
> >>>>          // append count to file
> >>>>          ...
> >>>>          return null;
> >>>>    }
> >>>> })
> >>>>
> >>>> This is count how many words have been received in each batch. The
> Scala
> >>>> version would be much simpler to read.
> >>>>
> >>>> words.count().foreachRDD(rdd => {
> >>>>    val totalCount = rdd.first()
> >>>>
> >>>>    // print to screen
> >>>>    println(totalCount)
> >>>>
> >>>>    // append count to file
> >>>>    ...
> >>>> })
> >>>>
> >>>> Hope this helps! I apologize if the code doesnt compile, I didnt test
> >> for
> >>>> syntax and stuff.
> >>>>
> >>>> TD
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia <
> >>>> e.costaalfaia@unibs.it> wrote:
> >>>>
> >>>>> Hi Guys,
> >>>>>
> >>>>> I'm not very good like java programmer, so anybody could me help
with
> >>>>> this
> >>>>> code piece from JavaNetworkWordcount:
> >>>>>
> >>>>> JavaPairDStream<String, Integer> wordCounts = words.map(
> >>>>>        new PairFunction<String, String, Integer>() {
> >>>>>     @Override
> >>>>>          public Tuple2<String, Integer> call(String s) throws
> >> Exception
> >>>>> {
> >>>>>            return new Tuple2<String, Integer>(s, 1);
> >>>>>          }
> >>>>>        }).reduceByKey(new Function2<Integer, Integer, Integer>()
{
> >>>>>          @Override
> >>>>>          public Integer call(Integer i1, Integer i2) throws Exception
> >> {
> >>>>>            return i1 + i2;
> >>>>>          }
> >>>>>        });
> >>>>>
> >>>>>      JavaPairDStream<String, Integer> counts =
> >>>>> wordCounts.reduceByKeyAndWindow(
> >>>>>        new Function2<Integer, Integer, Integer>() {
> >>>>>          public Integer call(Integer i1, Integer i2) { return i1
+
> >> i2; }
> >>>>>        },
> >>>>>        new Function2<Integer, Integer, Integer>() {
> >>>>>          public Integer call(Integer i1, Integer i2) { return i1
-
> >> i2; }
> >>>>>        },
> >>>>>        new Duration(60 * 5 * 1000),
> >>>>>        new Duration(1 * 1000)
> >>>>>      );
> >>>>>
> >>>>> I would like to think a manner of counting and after summing  and
> >>>>> getting a
> >>>>> total from words counted in a single file, for example a book in
txt
> >>>>> extension Don Quixote. The counts function give me the resulted
from
> >> each
> >>>>> word has found and not a total of words from the file.
> >>>>> Tathagata has sent me a piece from scala code, Thanks Tathagata
by
> your
> >>>>> attention with my posts I am very thankfully,
> >>>>>
> >>>>>  yourDStream.foreachRDD(rdd => {
> >>>>>
> >>>>>   // Get and print first n elements
> >>>>>   val firstN = rdd.take(n)
> >>>>>   println("First N elements = " + firstN)
> >>>>>
> >>>>>  // Count the number of elements in each batch
> >>>>>  println("RDD has " + rdd.count() + " elements")
> >>>>>
> >>>>> })
> >>>>>
> >>>>> yourDStream.count.print()
> >>>>>
> >>>>> Could anybody help me?
> >>>>>
> >>>>>
> >>>>> Thanks Guys
> >>>>>
> >>>>> --
> >>>>>
> >>>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>>>>
> >>>>> I dati utilizzati per l'invio del presente messaggio sono trattati
> >>>>> dall'Università degli Studi di Brescia esclusivamente per finalità
> >>>>> istituzionali. Informazioni più dettagliate anche in ordine ai
> diritti
> >>>>>
> >>>>> dell'interessato sono riposte nell'informativa generale e nelle
> notizie
> >>>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>>>>
> >>>>> Il contenuto di questo messaggio è rivolto unicamente alle persona
> cui
> >>>>> è indirizzato e può contenere informazioni la cui riservatezza
è
> >>>>>
> >>>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione
e
> >>>>> l'uso
> >>>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >>>>> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >> --
> >> MSc Eduardo Costa Alfaia
> >> PhD Student
> >> Università degli Studi di Brescia
> >>
> >> --
> >> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>
> >> I dati utilizzati per l'invio del presente messaggio sono trattati
> >> dall'Università degli Studi di Brescia esclusivamente per finalità
> >> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> >> dell'interessato sono riposte nell'informativa generale e nelle notizie
> >> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>
> >> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> >> è indirizzato e può contenere informazioni la cui riservatezza è
> >> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
> l'uso
> >> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>
>
>
> --
> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>
> I dati utilizzati per l'invio del presente messaggio sono trattati
> dall'Università degli Studi di Brescia esclusivamente per finalità
> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> dell'interessato sono riposte nell'informativa generale e nelle notizie
> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>
> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> è indirizzato e può contenere informazioni la cui riservatezza è
> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> fosse pervenuto per errore, preghiamo di eliminarlo.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message