spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sundaram, Muthu X." <>
Subject Tranforming flume events using Spark transformation functions
Date Tue, 22 Jul 2014 15:24:11 GMT
Hi All,
  I am getting events from flume using following line.

  JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);

Each event is a delimited record. I like to use some of the transformation functions like
map and reduce on this. Do I need to convert the JavaDStream<SparkFlumeEvent> to JavaDStream<String>
or can I apply these function directly on this?

I need to do following kind of operations

XXXX                     AA
YYYYY                    Delta
TTTTT                    AA
CCCC                     Southwest
XXXX                     AA

Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 ticket.

I have to do transformations like this. Right now I am able to receives records. But I am
struggling to transform them using spark transformation functions since they are not of type

Can I create new JavaRDD<String>? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = events.iterator();
                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;
                     // All the user level data is carried as payload in Flume Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent =;
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD
= " + logRecord);

Where do I create new JavaRDD<String>? DO I do it before this loop? How do I create
this JavaRDD<String>?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.


View raw message