spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <>
Subject Re: DStream demultiplexer based on a key
Date Sun, 14 Dec 2014 19:34:13 GMT
I haven't done anything else than performance tuning on Spark Streaming for
the past weeks. rdd.cache makes a huge difference. A must in this case
where you want to iterate over the same RDD several times.

Intuitively, I also thought that all data was in memory already so that
wouldn't make a difference and I was very surprised to see stage times
dropping from seconds to ms when cache() was present.

Our intervals are 10-12 seconds long. I've not tried batches of minutes
Probably the best way would be to use window functions for that.  Although
something in the 1-5 minute range should be doable as well.

-kr, Gerard.

On Sun, Dec 14, 2014 at 8:25 PM, Jean-Pascal Billaud <>
> Ah! That sounds very much like what I need. A very basic question (most
> likely), why is "rdd.cache()" critical? Isn't it already true that in Spark
> Streaming DStream are cached in memory anyway?
> Also any experience with minutes long batch interval?
> Thanks for the quick answer!
> On Sun, Dec 14, 2014 at 11:17 AM, Gerard Maas <>
> wrote:
>> Hi Jean-Pascal,
>> At Virdata we do a similar thing to 'bucketize' our data to different
>> keyspaces in Cassandra.
>> The basic construction would be to filter the DStream (or the underlying
>> RDD) for each key and then apply the usual storage operations on that new
>> data set.
>> Given that, in your case, you need the data within the stream to apply
>> the filter, you will need first to collect those keys in order to create
>> the buckets.
>> Something like this:
>> val kafkaStream =  ???
>> kafkaStream.foreachRDD{rdd  =>
>>     rdd.cache() // very important!
>>     val keys = => key(elem)).distinct.collect  // where
>> key(...) is a function to get the desired key from each record
>>     keys.foreach{ key =>
>>         rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...)
>>     }
>>     rdd.unpersist()
>> }
>> -kr, Gerard.
>> On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud <>
>> wrote:
>>> Hey,
>>> I am doing an experiment with Spark Streaming consisting of moving data
>>> from Kafka to S3 locations while partitioning by date. I have already
>>> looked into Linked Camus and Pinterest Secor and while both are workable
>>> solutions, it just feels that Spark Streaming should be able to be on par
>>> with those without having to manage yet another application in our stack
>>> since we already have a Spark Streaming cluster in production.
>>> So what I am trying to do is very simple really. Each message in Kafka
>>> is thrift serialized, and the corresponding thrift objects have a timestamp
>>> field. What I'd like is to do is something like that:
>>> JavaPairDStream stream = KafkaUtils.createRawStream(...)
>>> stream = PairFunction<Tuple2<Void, Log>, String, Log>
>>>   public Tuple2<String, Log> call(Tuple2<Void, Log> tuple) {
>>>     return new Tuple2<>(tuple._2().getDate(), tuple._2());
>>>   }
>>> }
>>> At this point, I'd like to do some partitioning on the resulting DStream
>>> to have multiple DStream each with a single common string Date... So for
>>> instance in one DStream I would have all the entries from 12/01 and on
>>> another the entries from 12/02. Once I have this list of DStream, for each
>>> of them I would call saveAsObjectFiles() basically. I unfortunately did not
>>> find a way to demultiplex DStream based on a key. Obviously the reduce
>>> operation families does some of that but the result is still a single
>>> DStream.
>>> An alternative approach would be to call forEachRDD() on the DStream and
>>> demultiplex the entries into multiple new RDDs based on the timestamp to
>>> bucketize the entries with the same day date in the same RDD and finally
>>> call saveAsObjectFiles(). I am not sure if I can use parallelize() to
>>> create those RDDs?
>>> Another thing that I am gonna be experimenting with is to use much
>>> longer batching interval. I am talking in minutes because I don't want to
>>> have bunch of tiny files. I might simply use a bigger Duration or use one
>>> of the window operation. Not sure if anybody tries running Spark Streaming
>>> in that way.
>>> Any thoughts on that would be much appreciated,
>>> Thanks!

View raw message