spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: DStream demultiplexer based on a key
Date Sun, 14 Dec 2014 19:34:13 GMT
I haven't done anything else than performance tuning on Spark Streaming for
the past weeks. rdd.cache makes a huge difference. A must in this case
where you want to iterate over the same RDD several times.

Intuitively, I also thought that all data was in memory already so that
wouldn't make a difference and I was very surprised to see stage times
dropping from seconds to ms when cache() was present.

Our intervals are 10-12 seconds long. I've not tried batches of minutes
yet.
Probably the best way would be to use window functions for that.  Although
something in the 1-5 minute range should be doable as well.

-kr, Gerard.




On Sun, Dec 14, 2014 at 8:25 PM, Jean-Pascal Billaud <jp@tellapart.com>
wrote:
>
> Ah! That sounds very much like what I need. A very basic question (most
> likely), why is "rdd.cache()" critical? Isn't it already true that in Spark
> Streaming DStream are cached in memory anyway?
>
> Also any experience with minutes long batch interval?
>
> Thanks for the quick answer!
>
> On Sun, Dec 14, 2014 at 11:17 AM, Gerard Maas <gerard.maas@gmail.com>
> wrote:
>>
>> Hi Jean-Pascal,
>>
>> At Virdata we do a similar thing to 'bucketize' our data to different
>> keyspaces in Cassandra.
>>
>> The basic construction would be to filter the DStream (or the underlying
>> RDD) for each key and then apply the usual storage operations on that new
>> data set.
>> Given that, in your case, you need the data within the stream to apply
>> the filter, you will need first to collect those keys in order to create
>> the buckets.
>>
>> Something like this:
>>
>> val kafkaStream =  ???
>> kafkaStream.foreachRDD{rdd  =>
>>     rdd.cache() // very important!
>>     val keys = rdd.map(elem => key(elem)).distinct.collect  // where
>> key(...) is a function to get the desired key from each record
>>     keys.foreach{ key =>
>>         rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...)
>>     }
>>     rdd.unpersist()
>> }
>>
>> -kr, Gerard.
>>
>>
>>
>>
>> On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud <jp@tellapart.com>
>> wrote:
>>>
>>> Hey,
>>>
>>> I am doing an experiment with Spark Streaming consisting of moving data
>>> from Kafka to S3 locations while partitioning by date. I have already
>>> looked into Linked Camus and Pinterest Secor and while both are workable
>>> solutions, it just feels that Spark Streaming should be able to be on par
>>> with those without having to manage yet another application in our stack
>>> since we already have a Spark Streaming cluster in production.
>>>
>>> So what I am trying to do is very simple really. Each message in Kafka
>>> is thrift serialized, and the corresponding thrift objects have a timestamp
>>> field. What I'd like is to do is something like that:
>>>
>>> JavaPairDStream stream = KafkaUtils.createRawStream(...)
>>> stream = stream.map(new PairFunction<Tuple2<Void, Log>, String, Log>
{
>>>   public Tuple2<String, Log> call(Tuple2<Void, Log> tuple) {
>>>     return new Tuple2<>(tuple._2().getDate(), tuple._2());
>>>   }
>>> }
>>>
>>> At this point, I'd like to do some partitioning on the resulting DStream
>>> to have multiple DStream each with a single common string Date... So for
>>> instance in one DStream I would have all the entries from 12/01 and on
>>> another the entries from 12/02. Once I have this list of DStream, for each
>>> of them I would call saveAsObjectFiles() basically. I unfortunately did not
>>> find a way to demultiplex DStream based on a key. Obviously the reduce
>>> operation families does some of that but the result is still a single
>>> DStream.
>>>
>>> An alternative approach would be to call forEachRDD() on the DStream and
>>> demultiplex the entries into multiple new RDDs based on the timestamp to
>>> bucketize the entries with the same day date in the same RDD and finally
>>> call saveAsObjectFiles(). I am not sure if I can use parallelize() to
>>> create those RDDs?
>>>
>>> Another thing that I am gonna be experimenting with is to use much
>>> longer batching interval. I am talking in minutes because I don't want to
>>> have bunch of tiny files. I might simply use a bigger Duration or use one
>>> of the window operation. Not sure if anybody tries running Spark Streaming
>>> in that way.
>>>
>>> Any thoughts on that would be much appreciated,
>>>
>>> Thanks!
>>>
>>

Mime
View raw message