Hey Lucas,

Would you mind showing a screenshot of the memory dashboard as well? Also, could you show the distribution of task times within one of the stages (you can get this screen by just clicking on one of the stages - it shows the distribution at the top)?

- Patrick


On Mon, Oct 28, 2013 at 3:01 PM, Lucas Fernandes Brunialti <lbrunialti@igcorp.com.br> wrote:
Hello Patrick,

It's 182 unique keys (after the aggregations), before the aggregations I would say that are thousands of keys. Most part of the time i see 38gb out of 40gb used, and the events RDD is using 35gb (around 90% cached). Most time is wasted in reduceByKey (see screenshot).

Thanks a lot Patrick!

Lucas. 


2013/10/28 Patrick Wendell <pwendell@gmail.com>
Hey Lucas,

How many unique keys do you have when you do these aggregations? Also, when you look in the web UI, can you tell how much in-memory storage is being used overall by the events RDD and the casRDD?

- Patrick


On Mon, Oct 28, 2013 at 1:21 PM, Lucas Fernandes Brunialti <lbrunialti@igcorp.com.br> wrote:
Hello,

I count everts per date/time after that code, like the code below:

JavaPairRDD<String, Integer> eventPerDate = events.map(

                        new PairFunction<Tuple2<String, String>, String, Integer>() {

                                @Override

                                public Tuple2<String, Integer> call(Tuple2<String, String> tuple) throws Exception {

                                        return new Tuple2<String, Integer>(tuple._1, 1);

                                }

                        }).reduceByKey(new Function2<Integer, Integer, Integer>() {

                                @Override

                                public Integer call(Integer i1, Integer i2) throws Exception {

                                       return i1 + i2;

                                }

                        }

                );


Also, there are other jobs that run agains that RDD `events` cached. So, the jobs that are taking about 4 minutes are the ones run against cached (or partially cached) datasets, the 'agregator' job that cache the RDD takes about 7 minutes. The most part of my log is, so the actions that are taking time are the subsequential ones:


13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task 3.0:2062 as TID 6838 on executor 1: ip-10-8-19-185.ec2.internal (PROCESS_LOCAL)

13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task 3.0:2062 as 2621 bytes in 0 ms

13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_2243 in memory on domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 34.5 MB, free: 1371.7 MB)

13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_2259 in memory on domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 6.4 MB, free: 1365.2 MB)

13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Finished TID 6818 in 2545 ms on ip-10-4-231-4.ec2.internal (progress: 2220/2300)

13/10/28 20:01:05 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3, 2123)

13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task 3.0:2157 as TID 6839 on executor 3: ip-10-4-231-4.ec2.internal (PROCESS_LOCAL)

13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task 3.0:2157 as 2619 bytes in 1 ms

13/10/28 20:01:06 INFO cluster.ClusterTaskSetManager: Finished TID 6782 in 7985 ms on domU-12-31-39-0A-90-F2.compute-1.internal (progress: 2221/2300)


Thank you very much!


Lucas.



2013/10/28 Patrick Wendell <pwendell@gmail.com>
Hey Lucas,

This code still needs to read the entire initial dataset from Cassandra, so that's probably what's taking most of the time. Also, it doesn't show here the operations you are actually doing.

What happens when you look in the Spark web UI or the logs? Can you tell which stages are taking the most time? Is it the initial load or is it the subsequent actions?

- Patrick


On Mon, Oct 28, 2013 at 10:05 AM, Lucas Fernandes Brunialti <lbrunialti@igcorp.com.br> wrote:
Hi,

Patrick, we cache the RDD before doing operations. We tried to cache the cassRdd (flatMap) but doesn't help. The piece of code of one job (we're using java, so the code is bigger) is in the end of the email.

Ewen, yes, we're re-reading from Cassandra, I found that it is better than serialize and read the blocks in/from disk, as my dataset doesn't fit in memory...

I also tried the kryoSerializer, also with no success. We're sharing the events (RDD) for the other jobs. The parameters for the jobs are:

System.setProperty("spark.storage.memoryFraction""0.7");

System.setProperty("spark.executor.memory""12g");

System.setProperty("spark.storage.StorageLevel""MEMORY_ONLY");



Thanks!

Lucas.



@SuppressWarnings("unchecked")

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd = context.newAPIHadoopRDD(job.getConfiguration(),

ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),

ByteBuffer.class, SortedMap.class).persist(StorageLevel.MEMORY_ONLY());


JavaRDD<Tuple2<String, String>> events = casRdd.flatMap(new FlatMapFunction<Tuple2<ByteBuffer, SortedMap<ByteBuffer, IColumn>>, Tuple2<String, String>>() {

@Override

public Iterable<Tuple2<String, String>> call(Tuple2<ByteBuffer, SortedMap<ByteBuffer, IColumn>> tuple) throws Exception {

ArrayList<Tuple2<String, String>> events = new ArrayList<Tuple2<String, String>>();

for (ByteBuffer columnName : tuple._2().keySet()) {

String timestamp = ByteBufferUtil.string(columnName);

String event = ByteBufferUtil.string(tuple._2().get((ByteBufferUtil.bytes(timestamp))).value());

Calendar date = Calendar.getInstance();

date.setTimeInMillis(Long.parseLong(timestamp));

String key = String.valueOf(date.get(Calendar.DAY_OF_MONTH)) + "-" + String.valueOf(date.get(Calendar.MONTH));

events.add(new Tuple2<String, String>(key, event));

}

return events;

}

});


events = events.filter(new Function<Tuple2<String,String>, Boolean>() {

@Override

public Boolean call(Tuple2<String, String> tuple) throws Exception {

return tuple._2.contains("ARTICLE");

}

}).persist(StorageLevel.MEMORY_ONLY());


//other operations



2013/10/28 Ewen Cheslack-Postava <me@ewencp.org>
Well, he did mention that not everything was staying in the cache, so even with an ongoing job they're probably be re-reading from Cassandra. It sounds to me like the first issue to address is why things are being evicted.

-Ewen



On Mon, Oct 28, 2013 at 9:24 AM, Patrick Wendell <pwendell@gmail.com> wrote:
Hey Lucas,

Could you provide some rough psuedo-code for your job? One question is: are you loading the data from cassandra every time you perform an action, or do you cache() the dataset first? If you have a dataset that's already in an RDD, it's very hard for me to imaging that filters and aggregations could possibly take 4 minutes... should be more like seconds.

- Patrick


On Mon, Oct 28, 2013 at 9:11 AM, Lucas Fernandes Brunialti <lbrunialti@igcorp.com.br> wrote:
Hello,

We're using Spark to run analytics and ML jobs against Cassandra. Our analytics jobs are simple (filters and counts) and we're trying to improve the performance, these jobs takes around 4 minutes querying 160Gb (size of our dataset). Also, we use 5 workers and 1 master, EC2 m1.xlarge with 8gb in jvm heap.

We tried to increase the jvm heap to 12gb, but we had no gain in performance. We're using CACHE_ONLY (after some tests we've found it better), also it's not caching everything, just around 1000 of 2500 blocks. Maybe the cache is not impacting on performance, just the cassandra IO (?)

I saw that people from ooyala can do analytics jobs in milliseconds (http://www.youtube.com/watch?v=6kHlArorzvs), any advices?

Appreciate the help!

Lucas.

--

Lucas Fernandes Brunialti

Dev/Ops Software Engineer

+55 9 6512 4514

lbrunialti@igcorp.com.br






--

Lucas Fernandes Brunialti

Dev/Ops Software Engineer

+55 11 96512 4514

lbrunialti@igcorp.com.br