spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucas Fernandes Brunialti <lbrunia...@igcorp.com.br>
Subject Re: Job duration
Date Mon, 28 Oct 2013 17:05:39 GMT
Hi,

Patrick, we cache the RDD before doing operations. We tried to cache the
cassRdd (flatMap) but doesn't help. The piece of code of one job (we're
using java, so the code is bigger) is in the end of the email.

Ewen, yes, we're re-reading from Cassandra, I found that it is better than
serialize and read the blocks in/from disk, as my dataset doesn't fit in
memory...

I also tried the kryoSerializer, also with no success. We're sharing the
events (RDD) for the other jobs. The parameters for the jobs are:

System.setProperty("spark.storage.memoryFraction", "0.7");

System.setProperty("spark.executor.memory", "12g");

System.setProperty("spark.storage.StorageLevel", "MEMORY_ONLY");



Thanks!

Lucas.



@SuppressWarnings("unchecked")

 JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd =
context.newAPIHadoopRDD(job.getConfiguration(),

   ColumnFamilyInputFormat.class
.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),

   ByteBuffer.class, SortedMap.class).persist(StorageLevel.MEMORY_ONLY());


 JavaRDD<Tuple2<String, String>> events = casRdd.flatMap(new
FlatMapFunction<Tuple2<ByteBuffer,
SortedMap<ByteBuffer, IColumn>>, Tuple2<String, String>>() {

@Override

public Iterable<Tuple2<String, String>> call(Tuple2<ByteBuffer,
SortedMap<ByteBuffer, IColumn>> tuple) throws Exception {

ArrayList<Tuple2<String, String>> events = new ArrayList<Tuple2<String,
String>>();

for (ByteBuffer columnName : tuple._2().keySet()) {

String timestamp = ByteBufferUtil.string(columnName);

String event =
ByteBufferUtil.string(tuple._2().get((ByteBufferUtil.bytes(timestamp))).value());

Calendar date = Calendar.getInstance();

date.setTimeInMillis(Long.parseLong(timestamp));

String key = String.valueOf(date.get(Calendar.DAY_OF_MONTH)) + "-" +
String.valueOf(date.get(Calendar.MONTH));

events.add(new Tuple2<String, String>(key, event));

}

return events;

}

 });


   events = events.filter(new Function<Tuple2<String,String>, Boolean>() {

  @Override

public Boolean call(Tuple2<String, String> tuple) throws Exception {

return tuple._2.contains("ARTICLE");

}

    }).persist(StorageLevel.MEMORY_ONLY());


//other operations


2013/10/28 Ewen Cheslack-Postava <me@ewencp.org>

> Well, he did mention that not everything was staying in the cache, so even
> with an ongoing job they're probably be re-reading from Cassandra. It
> sounds to me like the first issue to address is why things are being
> evicted.
>
> -Ewen
>
> -----
> Ewen Cheslack-Postava
> StraightUp | http://readstraightup.com
> ewencp@readstraightup.com
> (201) 286-7785
>
>
> On Mon, Oct 28, 2013 at 9:24 AM, Patrick Wendell <pwendell@gmail.com>wrote:
>
>> Hey Lucas,
>>
>> Could you provide some rough psuedo-code for your job? One question is:
>> are you loading the data from cassandra every time you perform an action,
>> or do you cache() the dataset first? If you have a dataset that's already
>> in an RDD, it's very hard for me to imaging that filters and aggregations
>> could possibly take 4 minutes... should be more like seconds.
>>
>> - Patrick
>>
>>
>> On Mon, Oct 28, 2013 at 9:11 AM, Lucas Fernandes Brunialti <
>> lbrunialti@igcorp.com.br> wrote:
>>
>>> Hello,
>>>
>>> We're using Spark to run analytics and ML jobs against Cassandra. Our
>>> analytics jobs are simple (filters and counts) and we're trying to improve
>>> the performance, these jobs takes around 4 minutes querying 160Gb (size of
>>> our dataset). Also, we use 5 workers and 1 master, EC2 m1.xlarge with 8gb
>>> in jvm heap.
>>>
>>> We tried to increase the jvm heap to 12gb, but we had no gain in
>>> performance. We're using CACHE_ONLY (after some tests we've found it
>>> better), also it's not caching everything, just around 1000 of 2500 blocks.
>>> Maybe the cache is not impacting on performance, just the cassandra IO (?)
>>>
>>> I saw that people from ooyala can do analytics jobs in milliseconds (
>>> http://www.youtube.com/watch?v=6kHlArorzvs), any advices?
>>>
>>> Appreciate the help!
>>>
>>> Lucas.
>>>
>>> --
>>>
>>> Lucas Fernandes Brunialti
>>>
>>> *Dev/Ops Software Engineer*
>>>
>>> *+55 9 6512 4514*
>>>
>>> *lbrunialti@igcorp.com.br* <lbrunialti@igcorp.com.br>
>>>
>>
>>
>


-- 

Lucas Fernandes Brunialti

*Dev/Ops Software Engineer*

*+55 11 96512 4514*

*lbrunialti@igcorp.com.br* <lbrunialti@igcorp.com.br>

Mime
View raw message