spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: Source code JavaNetworkWordcount
Date Wed, 05 Feb 2014 09:22:51 GMT
Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without replication)
for testing, but you should turn on replication if you want
fault-tolerance.

TD


On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia <e.costaalfaia@unibs.it
> wrote:

> Hi Tathagata,
>
> You were right when you have said for me to use scala against java, scala
> is very easy. I have implemented that code you have given (in bold), but I
> have implemented also an union function(in red) because I am testing with 2
> stream sources, my idea is putting 3 or more stream sources and doing the
> union.
>
> object NetworkWordCount {
>  37   def main(args: Array[String]) {
>  38     if (args.length < 1) {
>  39       System.err.println("Usage: NetworkWordCount <master> <hostname>
> <port>\n" +
>  40         "In local mode, <master> should be 'local[n]' with n > 1")
>  41       System.exit(1)
>  42     }
>  43
>  44     StreamingExamples.setStreamingLogLevels()
>  45
>  46     // Create the context with a 1 second batch size
>  47     val ssc = new StreamingContext(args(0), "NetworkWordCount",
> Seconds(1),
>  48       System.getenv("SPARK_HOME"),
> StreamingContext.jarOfClass(this.getClass))
>  49         ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT")
>  50     // Create a socket text stream on target ip:port and count the
>  51     // words in the input stream of \n delimited text (eg. generated by
> 'nc')
>  52     *val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
> StorageLevel.MEMORY_ONLY_SER)*
> * 53     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
> StorageLevel.MEMORY_ONLY_SER)*
> * 54     val union2 = lines1.union(lines2)*
>  55         //val words = lines.flatMap(_.split(" "))
>  56         *val words = union2.flatMap(_.split(" "))*
>  57     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>  58
>  59        * words.count().foreachRDD(rdd => {*
> * 60     val totalCount = rdd.first()*
> * 61 *
> * 62     // print to screen*
> * 63     println(totalCount)*
> * 64 *
> * 65     // append count to file*
> * 66   //  ...*
> * 67 })*
>          //wordCounts.print()
>  70     ssc.start()
>  71     ssc.awaitTermination()
>  72   }
>  73 }
>
> What do you think? is My code right?
>
> I have obtained the follow result:
>
> root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example
> org.apache.spark.streaming.examples.NetworkWordCount
> spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J
> bindings.
> SLF4J: Found binding in
>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j
> profile: org/apache/spark/log4j-defaults.properties
> 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN] for
> streaming example. To override add a custom log4j.properties to the
> classpath.
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 90715
> 1375825
> 882490
> 941226
> 811032
> 734399
> 804453
> 718688
> 1058695
> 854417
> 813263
> 798885
> 785455
> 952804
> 780140
> 697533
>
>
> Thanks Tathagata.
>
> Att
>
>
> 2014-01-30 Eduardo Costa Alfaia <e.costaalfaia@unibs.it>:
>
> > Hi Tathagata,
> >
> > Thank you by your explanations it'll be useful to me to understand how
> > work this piece of code to do that we want. We have created a code in C
> > which send a txt file, for example Don Quixote, like a stream over the
> > network so we've changed the java code from JavaNetworkWordcount to
> connect
> > in each source described within source code. Bellow it is that we've
> > inserted, three streams sources.
> >
> >       JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
> > Integer.parseInt("12345"));
> >       JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
> > Integer.parseInt("12345"));
> >       JavaDStream<String> lines3 = ssc1.socketTextStream("localhost",
> > Integer.parseInt("12345"));
> >       JavaDStream<String> union2 = lines1.union(lines2);
> >       JavaDStream<String> union3 = union2.union(lines3);
> >       JavaDStream<String> words = union3.flatMap(new
> > FlatMapFunction<String, String>() {
> >
> > So, the second option that you've given me I think to be the better
> option.
> >  Sorry Tathagata for my insistence in this case and I thank you by your
> > patient.
> >
> > Best Regards
> >
> >
> > 2014-01-30 Tathagata Das <tathagata.das1565@gmail.com>
> >
> > Let me first ask for a few clarifications.
> >>
> >> 1. If you just want to count the words in a single text file like Don
> >> Quixote (that is, not for a stream of data), you should use only Spark.
> >> Then the program to count the frequency of words in a text file would
> look
> >> like this in Java. If you are not super-comfortable with Java, then I
> >> strongly recommend using the Scala API or pyspark. For scala, it may be
> a
> >> little trickier to learn if you have absolutely no idea. But it is worth
> >> it. The frequency count would look like this.
> >>
> >> val sc = new SparkContext(...)
> >> val linesInFile = sc.textFile("path_to_file")
> >> val words = linesInFile.flatMap(line => line.split(" "))
> >> val frequencies = words.map(word => (word, 1L)).reduceByKey(_ + _)
> >> println("Word frequencies = " + frequences.collect())      // collect is
> >> costly if the file is large
> >>
> >>
> >> 2. Let me assume that you want to do read a stream of text over the
> >> network and then print the count of total number of words into a file.
> Note
> >> that it is "total number of words" and not "frequency of each word". The
> >> Java version would be something like this.
> >>
> >> DStream<Integer> totalCounts = words.count();
> >>
> >> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time, Void>()
{
> >>    @Override public Void call(JavaRDD<Long> pairRDD, Time time) throws
> >> Exception {
> >>            Long totalCount = totalCounts.first();
> >>
> >>            // print to screen
> >>            System.out.println(totalCount);
> >>
> >>           // append count to file
> >>           ...
> >>           return null;
> >>     }
> >> })
> >>
> >> This is count how many words have been received in each batch. The Scala
> >> version would be much simpler to read.
> >>
> >> words.count().foreachRDD(rdd => {
> >>     val totalCount = rdd.first()
> >>
> >>     // print to screen
> >>     println(totalCount)
> >>
> >>     // append count to file
> >>     ...
> >> })
> >>
> >> Hope this helps! I apologize if the code doesnt compile, I didnt test
> for
> >> syntax and stuff.
> >>
> >> TD
> >>
> >>
> >>
> >> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia <
> >> e.costaalfaia@unibs.it> wrote:
> >>
> >>> Hi Guys,
> >>>
> >>> I'm not very good like java programmer, so anybody could me help with
> >>> this
> >>> code piece from JavaNetworkWordcount:
> >>>
> >>> JavaPairDStream<String, Integer> wordCounts = words.map(
> >>>         new PairFunction<String, String, Integer>() {
> >>>      @Override
> >>>           public Tuple2<String, Integer> call(String s) throws
> Exception
> >>> {
> >>>             return new Tuple2<String, Integer>(s, 1);
> >>>           }
> >>>         }).reduceByKey(new Function2<Integer, Integer, Integer>()
{
> >>>           @Override
> >>>           public Integer call(Integer i1, Integer i2) throws Exception
> {
> >>>             return i1 + i2;
> >>>           }
> >>>         });
> >>>
> >>>       JavaPairDStream<String, Integer> counts =
> >>> wordCounts.reduceByKeyAndWindow(
> >>>         new Function2<Integer, Integer, Integer>() {
> >>>           public Integer call(Integer i1, Integer i2) { return i1 +
> i2; }
> >>>         },
> >>>         new Function2<Integer, Integer, Integer>() {
> >>>           public Integer call(Integer i1, Integer i2) { return i1 -
> i2; }
> >>>         },
> >>>         new Duration(60 * 5 * 1000),
> >>>         new Duration(1 * 1000)
> >>>       );
> >>>
> >>> I would like to think a manner of counting and after summing  and
> >>> getting a
> >>> total from words counted in a single file, for example a book in txt
> >>> extension Don Quixote. The counts function give me the resulted from
> each
> >>> word has found and not a total of words from the file.
> >>> Tathagata has sent me a piece from scala code, Thanks Tathagata by your
> >>> attention with my posts I am very thankfully,
> >>>
> >>>   yourDStream.foreachRDD(rdd => {
> >>>
> >>>    // Get and print first n elements
> >>>    val firstN = rdd.take(n)
> >>>    println("First N elements = " + firstN)
> >>>
> >>>   // Count the number of elements in each batch
> >>>   println("RDD has " + rdd.count() + " elements")
> >>>
> >>> })
> >>>
> >>> yourDStream.count.print()
> >>>
> >>> Could anybody help me?
> >>>
> >>>
> >>> Thanks Guys
> >>>
> >>> --
> >>>
> >>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>>
> >>> I dati utilizzati per l'invio del presente messaggio sono trattati
> >>> dall'Università degli Studi di Brescia esclusivamente per finalità
> >>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> >>>
> >>> dell'interessato sono riposte nell'informativa generale e nelle notizie
> >>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>>
> >>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> >>> è indirizzato e può contenere informazioni la cui riservatezza è
> >>>
> >>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
> >>> l'uso
> >>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >>> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>>
> >>
> >>
> >
>
>
> --
> MSc Eduardo Costa Alfaia
> PhD Student
> Università degli Studi di Brescia
>
> --
> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>
> I dati utilizzati per l'invio del presente messaggio sono trattati
> dall'Università degli Studi di Brescia esclusivamente per finalità
> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> dell'interessato sono riposte nell'informativa generale e nelle notizie
> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>
> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> è indirizzato e può contenere informazioni la cui riservatezza è
> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> fosse pervenuto per errore, preghiamo di eliminarlo.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message