spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucas Fernandes Brunialti <lbrunia...@igcorp.com.br>
Subject Re: Job duration
Date Mon, 28 Oct 2013 22:01:43 GMT
Hello Patrick,

It's 182 unique keys (after the aggregations), before the aggregations I
would say that are thousands of keys. Most part of the time i see 38gb out
of 40gb used, and the events RDD is using 35gb (around 90% cached). Most
time is wasted in reduceByKey (see screenshot).

Thanks a lot Patrick!

Lucas.


2013/10/28 Patrick Wendell <pwendell@gmail.com>

> Hey Lucas,
>
> How many unique keys do you have when you do these aggregations? Also,
> when you look in the web UI, can you tell how much in-memory storage is
> being used overall by the events RDD and the casRDD?
>
> - Patrick
>
>
> On Mon, Oct 28, 2013 at 1:21 PM, Lucas Fernandes Brunialti <
> lbrunialti@igcorp.com.br> wrote:
>
>> Hello,
>>
>> I count everts per date/time after that code, like the code below:
>>
>> JavaPairRDD<String, Integer> eventPerDate = events.map(
>>
>>                         new PairFunction<Tuple2<String, String>, String,
>> Integer>() {
>>
>>                                 @Override
>>
>>                                 public Tuple2<String, Integer>
>> call(Tuple2<String, String> tuple) throws Exception {
>>
>>                                         return new Tuple2<String,
>> Integer>(tuple._1, 1);
>>
>>                                 }
>>
>>                         }).reduceByKey(new Function2<Integer, Integer,
>> Integer>() {
>>
>>                                 @Override
>>
>>                                 public Integer call(Integer i1, Integer
>> i2) throws Exception {
>>
>>                                        return i1 + i2;
>>
>>                                 }
>>
>>                         }
>>
>>                 );
>>
>>
>> Also, there are other jobs that run agains that RDD `events` cached. So,
>> the jobs that are taking about 4 minutes are the ones run against cached
>> (or partially cached) datasets, the 'agregator' job that cache the RDD
>> takes about 7 minutes. The most part of my log is, so the actions that are
>> taking time are the subsequential ones:
>>
>>
>> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task
>> 3.0:2062 as TID 6838 on executor 1: ip-10-8-19-185.ec2.internal
>> (PROCESS_LOCAL)
>>
>> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task
>> 3.0:2062 as 2621 bytes in 0 ms
>>
>> 13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
>> Added rdd_2_2243 in memory on
>> domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 34.5 MB, free:
>> 1371.7 MB)
>>
>> 13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
>> Added rdd_2_2259 in memory on
>> domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 6.4 MB, free: 1365.2
>> MB)
>>
>> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Finished TID 6818
>> in 2545 ms on ip-10-4-231-4.ec2.internal (progress: 2220/2300)
>>
>> 13/10/28 20:01:05 INFO scheduler.DAGScheduler: Completed
>> ShuffleMapTask(3, 2123)
>>
>> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task
>> 3.0:2157 as TID 6839 on executor 3: ip-10-4-231-4.ec2.internal
>> (PROCESS_LOCAL)
>>
>> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task
>> 3.0:2157 as 2619 bytes in 1 ms
>>
>> 13/10/28 20:01:06 INFO cluster.ClusterTaskSetManager: Finished TID 6782
>> in 7985 ms on domU-12-31-39-0A-90-F2.compute-1.internal (progress:
>> 2221/2300)
>>
>>
>> Thank you very much!
>>
>>
>> Lucas.
>>
>>
>> 2013/10/28 Patrick Wendell <pwendell@gmail.com>
>>
>>> Hey Lucas,
>>>
>>> This code still needs to read the entire initial dataset from Cassandra,
>>> so that's probably what's taking most of the time. Also, it doesn't show
>>> here the operations you are actually doing.
>>>
>>> What happens when you look in the Spark web UI or the logs? Can you tell
>>> which stages are taking the most time? Is it the initial load or is it the
>>> subsequent actions?
>>>
>>> - Patrick
>>>
>>>
>>> On Mon, Oct 28, 2013 at 10:05 AM, Lucas Fernandes Brunialti <
>>> lbrunialti@igcorp.com.br> wrote:
>>>
>>>> Hi,
>>>>
>>>> Patrick, we cache the RDD before doing operations. We tried to cache
>>>> the cassRdd (flatMap) but doesn't help. The piece of code of one job (we're
>>>> using java, so the code is bigger) is in the end of the email.
>>>>
>>>> Ewen, yes, we're re-reading from Cassandra, I found that it is better
>>>> than serialize and read the blocks in/from disk, as my dataset doesn't fit
>>>> in memory...
>>>>
>>>> I also tried the kryoSerializer, also with no success. We're sharing
>>>> the events (RDD) for the other jobs. The parameters for the jobs are:
>>>>
>>>> System.setProperty("spark.storage.memoryFraction", "0.7");
>>>>
>>>> System.setProperty("spark.executor.memory", "12g");
>>>>
>>>> System.setProperty("spark.storage.StorageLevel", "MEMORY_ONLY");
>>>>
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> Lucas.
>>>>
>>>>
>>>>
>>>> @SuppressWarnings("unchecked")
>>>>
>>>>  JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd
=
>>>> context.newAPIHadoopRDD(job.getConfiguration(),
>>>>
>>>>    ColumnFamilyInputFormat.class
>>>> .asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
>>>>
>>>>    ByteBuffer.class, SortedMap.class
>>>> ).persist(StorageLevel.MEMORY_ONLY());
>>>>
>>>>
>>>>  JavaRDD<Tuple2<String, String>> events = casRdd.flatMap(new
FlatMapFunction<Tuple2<ByteBuffer,
>>>> SortedMap<ByteBuffer, IColumn>>, Tuple2<String, String>>()
{
>>>>
>>>> @Override
>>>>
>>>> public Iterable<Tuple2<String, String>> call(Tuple2<ByteBuffer,
>>>> SortedMap<ByteBuffer, IColumn>> tuple) throws Exception {
>>>>
>>>> ArrayList<Tuple2<String, String>> events = newArrayList<Tuple2<String,
String>>();
>>>>
>>>> for (ByteBuffer columnName : tuple._2().keySet()) {
>>>>
>>>> String timestamp = ByteBufferUtil.string(columnName);
>>>>
>>>> String event =
>>>> ByteBufferUtil.string(tuple._2().get((ByteBufferUtil.bytes(timestamp))).value());
>>>>
>>>> Calendar date = Calendar.getInstance();
>>>>
>>>> date.setTimeInMillis(Long.parseLong(timestamp));
>>>>
>>>> String key = String.valueOf(date.get(Calendar.DAY_OF_MONTH)) + "-" +
>>>> String.valueOf(date.get(Calendar.MONTH));
>>>>
>>>> events.add(new Tuple2<String, String>(key, event));
>>>>
>>>> }
>>>>
>>>> return events;
>>>>
>>>> }
>>>>
>>>>  });
>>>>
>>>>
>>>>    events = events.filter(new Function<Tuple2<String,String>,
>>>> Boolean>() {
>>>>
>>>>   @Override
>>>>
>>>> public Boolean call(Tuple2<String, String> tuple) throws Exception
{
>>>>
>>>> return tuple._2.contains("ARTICLE");
>>>>
>>>> }
>>>>
>>>>     }).persist(StorageLevel.MEMORY_ONLY());
>>>>
>>>>
>>>> //other operations
>>>>
>>>>
>>>> 2013/10/28 Ewen Cheslack-Postava <me@ewencp.org>
>>>>
>>>>> Well, he did mention that not everything was staying in the cache, so
>>>>> even with an ongoing job they're probably be re-reading from Cassandra.
It
>>>>> sounds to me like the first issue to address is why things are being
>>>>> evicted.
>>>>>
>>>>> -Ewen
>>>>>
>>>>> -----
>>>>> Ewen Cheslack-Postava
>>>>> StraightUp | http://readstraightup.com
>>>>> ewencp@readstraightup.com
>>>>> (201) 286-7785
>>>>>
>>>>>
>>>>> On Mon, Oct 28, 2013 at 9:24 AM, Patrick Wendell <pwendell@gmail.com>wrote:
>>>>>
>>>>>> Hey Lucas,
>>>>>>
>>>>>> Could you provide some rough psuedo-code for your job? One question
>>>>>> is: are you loading the data from cassandra every time you perform
an
>>>>>> action, or do you cache() the dataset first? If you have a dataset
that's
>>>>>> already in an RDD, it's very hard for me to imaging that filters
and
>>>>>> aggregations could possibly take 4 minutes... should be more like
seconds.
>>>>>>
>>>>>> - Patrick
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 28, 2013 at 9:11 AM, Lucas Fernandes Brunialti <
>>>>>> lbrunialti@igcorp.com.br> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> We're using Spark to run analytics and ML jobs against Cassandra.
>>>>>>> Our analytics jobs are simple (filters and counts) and we're
trying to
>>>>>>> improve the performance, these jobs takes around 4 minutes querying
160Gb
>>>>>>> (size of our dataset). Also, we use 5 workers and 1 master, EC2
m1.xlarge
>>>>>>> with 8gb in jvm heap.
>>>>>>>
>>>>>>> We tried to increase the jvm heap to 12gb, but we had no gain
in
>>>>>>> performance. We're using CACHE_ONLY (after some tests we've found
it
>>>>>>> better), also it's not caching everything, just around 1000 of
2500 blocks.
>>>>>>> Maybe the cache is not impacting on performance, just the cassandra
IO (?)
>>>>>>>
>>>>>>> I saw that people from ooyala can do analytics jobs in milliseconds
(
>>>>>>> http://www.youtube.com/watch?v=6kHlArorzvs), any advices?
>>>>>>>
>>>>>>> Appreciate the help!
>>>>>>>
>>>>>>> Lucas.
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Lucas Fernandes Brunialti
>>>>>>>
>>>>>>> *Dev/Ops Software Engineer*
>>>>>>>
>>>>>>> *+55 9 6512 4514*
>>>>>>>
>>>>>>> *lbrunialti@igcorp.com.br* <lbrunialti@igcorp.com.br>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Lucas Fernandes Brunialti
>>>>
>>>> *Dev/Ops Software Engineer*
>>>>
>>>> *+55 11 96512 4514*
>>>>
>>>> *lbrunialti@igcorp.com.br* <lbrunialti@igcorp.com.br>
>>>>
>>>
>>>
>>
>>
>>
>

Mime
View raw message