spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: DStream demultiplexer based on a key
Date Sun, 14 Dec 2014 19:17:33 GMT
Hi Jean-Pascal,

At Virdata we do a similar thing to 'bucketize' our data to different
keyspaces in Cassandra.

The basic construction would be to filter the DStream (or the underlying
RDD) for each key and then apply the usual storage operations on that new
data set.
Given that, in your case, you need the data within the stream to apply the
filter, you will need first to collect those keys in order to create the
buckets.

Something like this:

val kafkaStream =  ???
kafkaStream.foreachRDD{rdd  =>
    rdd.cache() // very important!
    val keys = rdd.map(elem => key(elem)).distinct.collect  // where
key(...) is a function to get the desired key from each record
    keys.foreach{ key =>
        rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...)
    }
    rdd.unpersist()
}

-kr, Gerard.




On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud <jp@tellapart.com>
wrote:
>
> Hey,
>
> I am doing an experiment with Spark Streaming consisting of moving data
> from Kafka to S3 locations while partitioning by date. I have already
> looked into Linked Camus and Pinterest Secor and while both are workable
> solutions, it just feels that Spark Streaming should be able to be on par
> with those without having to manage yet another application in our stack
> since we already have a Spark Streaming cluster in production.
>
> So what I am trying to do is very simple really. Each message in Kafka is
> thrift serialized, and the corresponding thrift objects have a timestamp
> field. What I'd like is to do is something like that:
>
> JavaPairDStream stream = KafkaUtils.createRawStream(...)
> stream = stream.map(new PairFunction<Tuple2<Void, Log>, String, Log> {
>   public Tuple2<String, Log> call(Tuple2<Void, Log> tuple) {
>     return new Tuple2<>(tuple._2().getDate(), tuple._2());
>   }
> }
>
> At this point, I'd like to do some partitioning on the resulting DStream
> to have multiple DStream each with a single common string Date... So for
> instance in one DStream I would have all the entries from 12/01 and on
> another the entries from 12/02. Once I have this list of DStream, for each
> of them I would call saveAsObjectFiles() basically. I unfortunately did not
> find a way to demultiplex DStream based on a key. Obviously the reduce
> operation families does some of that but the result is still a single
> DStream.
>
> An alternative approach would be to call forEachRDD() on the DStream and
> demultiplex the entries into multiple new RDDs based on the timestamp to
> bucketize the entries with the same day date in the same RDD and finally
> call saveAsObjectFiles(). I am not sure if I can use parallelize() to
> create those RDDs?
>
> Another thing that I am gonna be experimenting with is to use much longer
> batching interval. I am talking in minutes because I don't want to have
> bunch of tiny files. I might simply use a bigger Duration or use one of the
> window operation. Not sure if anybody tries running Spark Streaming in that
> way.
>
> Any thoughts on that would be much appreciated,
>
> Thanks!
>

Mime
View raw message