spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: Tranforming flume events using Spark transformation functions
Date Wed, 23 Jul 2014 05:20:15 GMT
This is because of the RDD's lazy evaluation! Unless you force a
transformed (mapped/filtered/etc.) RDD to give you back some data (like
RDD.count) or output the data (like RDD.saveAsTextFile()), Spark will not
do anything.

So after the eventData.map(...), if you do take(10) and then print the
result, you should seem 10 items from each batch be printed.

Also you can do the same map operation on the Dstream as well. FYI.

inputDStream.map(...).foreachRDD(...)     is equivalent to
 inputDStream.foreachRDD(     // call rdd.map(...) )

Either way you have to call some RDD "action" (count, collect, take,
saveAsHadoopFile, etc.)  that asks the system to something concrete with
the data.

TD




On Tue, Jul 22, 2014 at 1:55 PM, Sundaram, Muthu X. <
Muthu.X.Sundaram.ctr@sabre.com> wrote:

> I tried to map SparkFlumeEvents to String of RDDs like below. But that map
> and call are not at all executed. I might be doing this in a wrong way. Any
> help would be appreciated.
>
> flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
>               @Override
>               public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
> Exception {
>                                 System.out.println("<<<<<<Inside for
> each...call>>>>");
>
>                                 JavaRDD<String> records = eventsData.map(
>             new Function<SparkFlumeEvent, String>() {
>                                 @Override
>                 public String call(SparkFlumeEvent flume) throws Exception
> {
>                     String logRecord = null;
>                 AvroFlumeEvent avroEvent = null;
>       ByteBuffer bytePayload = null;
>
>
>       System.out.println("<<<<<<Inside Map..call>>>>");
>                     /* List<SparkFlumeEvent> events = flume.collect();
>                      Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>
>                             SparkFlumeEvent flumeEvent =
> batchedEvents.next();*/
>                             avroEvent = flume.event();
>                             bytePayload = avroEvent.getBody();
>                             logRecord = new String(bytePayload.array());
>
>                                       System.out.println("<<<<Record is"
+
> logRecord);
>
>                     return logRecord;
>                 }
>             });
>                                 return null;
> }
>
> -----Original Message-----
> From: Sundaram, Muthu X. [mailto:Muthu.X.Sundaram.ctr@sabre.com]
> Sent: Tuesday, July 22, 2014 10:24 AM
> To: user@spark.apache.org; dev@spark.incubator.apache.org
> Subject: Tranforming flume events using Spark transformation functions
>
> Hi All,
>   I am getting events from flume using following line.
>
>   JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc,
> host, port);
>
> Each event is a delimited record. I like to use some of the transformation
> functions like map and reduce on this. Do I need to convert the
> JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these
> function directly on this?
>
> I need to do following kind of operations
>
> XXXX                     AA
> YYYYY                    Delta
> TTTTT                    AA
> CCCC                     Southwest
> XXXX                     AA
>
> Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
> Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
> AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket,
> Southwest - 1 ticket.
>
> I have to do transformations like this. Right now I am able to receives
> records. But I am struggling to transform them using spark transformation
> functions since they are not of type JavaRDD<String>.
>
> Can I create new JavaRDD<String>? How do I create new JavaRDD?
>
> I loop through  the events like below
>
> flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
>               @Override
>               public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
> Exception {
>                      String logRecord = null;
>                      List<SparkFlumeEvent> events = eventsData.collect();
>                      Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>                      long t1 = System.currentTimeMillis();
>                      AvroFlumeEvent avroEvent = null;
>                      ByteBuffer bytePayload = null;
>                      // All the user level data is carried as payload in
> Flume Event
>                      while(batchedEvents.hasNext()) {
>                             SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>                             avroEvent = flumeEvent.event();
>                             bytePayload = avroEvent.getBody();
>                             logRecord = new String(bytePayload.array());
>
>                             System.out.println(">>>>>>>>LOG RECORD
= " +
> logRecord); }
>
> Where do I create new JavaRDD<String>? DO I do it before this loop? How do
> I create this JavaRDD<String>?
> In the loop I am able to get every record and I am able to print them.
>
> I appreciate any help here.
>
> Thanks,
> Muthu
>
>
>

Mime
View raw message