Hey Lucas,

This code still needs to read the entire initial dataset from Cassandra, so that's probably what's taking most of the time. Also, it doesn't show here the operations you are actually doing.

What happens when you look in the Spark web UI or the logs? Can you tell which stages are taking the most time? Is it the initial load or is it the subsequent actions?

- Patrick

On Mon, Oct 28, 2013 at 10:05 AM, Lucas Fernandes Brunialti <lbrunialti@igcorp.com.br> wrote:

Patrick, we cache the RDD before doing operations. We tried to cache the cassRdd (flatMap) but doesn't help. The piece of code of one job (we're using java, so the code is bigger) is in the end of the email.

Ewen, yes, we're re-reading from Cassandra, I found that it is better than serialize and read the blocks in/from disk, as my dataset doesn't fit in memory...

I also tried the kryoSerializer, also with no success. We're sharing the events (RDD) for the other jobs. The parameters for the jobs are:







JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd = context.newAPIHadoopRDD(job.getConfiguration(),


ByteBuffer.class, SortedMap.class).persist(StorageLevel.MEMORY_ONLY());

JavaRDD<Tuple2<String, String>> events = casRdd.flatMap(new FlatMapFunction<Tuple2<ByteBuffer, SortedMap<ByteBuffer, IColumn>>, Tuple2<String, String>>() {


public Iterable<Tuple2<String, String>> call(Tuple2<ByteBuffer, SortedMap<ByteBuffer, IColumn>> tuple) throws Exception {

ArrayList<Tuple2<String, String>> events = new ArrayList<Tuple2<String, String>>();

for (ByteBuffer columnName : tuple._2().keySet()) {

String timestamp = ByteBufferUtil.string(columnName);

String event = ByteBufferUtil.string(tuple._2().get((ByteBufferUtil.bytes(timestamp))).value());

Calendar date = Calendar.getInstance();


String key = String.valueOf(date.get(Calendar.DAY_OF_MONTH)) + "-" + String.valueOf(date.get(Calendar.MONTH));

events.add(new Tuple2<String, String>(key, event));


return events;



events = events.filter(new Function<Tuple2<String,String>, Boolean>() {


public Boolean call(Tuple2<String, String> tuple) throws Exception {

return tuple._2.contains("ARTICLE");



//other operations

2013/10/28 Ewen Cheslack-Postava <me@ewencp.org>
Well, he did mention that not everything was staying in the cache, so even with an ongoing job they're probably be re-reading from Cassandra. It sounds to me like the first issue to address is why things are being evicted.


On Mon, Oct 28, 2013 at 9:24 AM, Patrick Wendell <pwendell@gmail.com> wrote:
Hey Lucas,

Could you provide some rough psuedo-code for your job? One question is: are you loading the data from cassandra every time you perform an action, or do you cache() the dataset first? If you have a dataset that's already in an RDD, it's very hard for me to imaging that filters and aggregations could possibly take 4 minutes... should be more like seconds.

- Patrick

On Mon, Oct 28, 2013 at 9:11 AM, Lucas Fernandes Brunialti <lbrunialti@igcorp.com.br> wrote:

We're using Spark to run analytics and ML jobs against Cassandra. Our analytics jobs are simple (filters and counts) and we're trying to improve the performance, these jobs takes around 4 minutes querying 160Gb (size of our dataset). Also, we use 5 workers and 1 master, EC2 m1.xlarge with 8gb in jvm heap.

We tried to increase the jvm heap to 12gb, but we had no gain in performance. We're using CACHE_ONLY (after some tests we've found it better), also it's not caching everything, just around 1000 of 2500 blocks. Maybe the cache is not impacting on performance, just the cassandra IO (?)

I saw that people from ooyala can do analytics jobs in milliseconds (http://www.youtube.com/watch?v=6kHlArorzvs), any advices?

Appreciate the help!



Lucas Fernandes Brunialti

Dev/Ops Software Engineer

+55 9 6512 4514



Lucas Fernandes Brunialti

Dev/Ops Software Engineer

+55 11 96512 4514