spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrick Wendell <pwend...@gmail.com>
Subject Re: Job duration
Date Tue, 29 Oct 2013 00:40:34 GMT
Hey Lucas,

Would you mind showing a screenshot of the memory dashboard as well? Also,
could you show the distribution of task times within one of the stages (you
can get this screen by just clicking on one of the stages - it shows the
distribution at the top)?

- Patrick


On Mon, Oct 28, 2013 at 3:01 PM, Lucas Fernandes Brunialti <
lbrunialti@igcorp.com.br> wrote:

> Hello Patrick,
>
> It's 182 unique keys (after the aggregations), before the aggregations I
> would say that are thousands of keys. Most part of the time i see 38gb out
> of 40gb used, and the events RDD is using 35gb (around 90% cached). Most
> time is wasted in reduceByKey (see screenshot).
>
> Thanks a lot Patrick!
>
> Lucas.
>
>
> 2013/10/28 Patrick Wendell <pwendell@gmail.com>
>
>> Hey Lucas,
>>
>> How many unique keys do you have when you do these aggregations? Also,
>> when you look in the web UI, can you tell how much in-memory storage is
>> being used overall by the events RDD and the casRDD?
>>
>> - Patrick
>>
>>
>> On Mon, Oct 28, 2013 at 1:21 PM, Lucas Fernandes Brunialti <
>> lbrunialti@igcorp.com.br> wrote:
>>
>>> Hello,
>>>
>>> I count everts per date/time after that code, like the code below:
>>>
>>> JavaPairRDD<String, Integer> eventPerDate = events.map(
>>>
>>>                         new PairFunction<Tuple2<String, String>,
>>> String, Integer>() {
>>>
>>>                                 @Override
>>>
>>>                                 public Tuple2<String, Integer>
>>> call(Tuple2<String, String> tuple) throws Exception {
>>>
>>>                                         return new Tuple2<String,
>>> Integer>(tuple._1, 1);
>>>
>>>                                 }
>>>
>>>                         }).reduceByKey(new Function2<Integer, Integer,
>>> Integer>() {
>>>
>>>                                 @Override
>>>
>>>                                 public Integer call(Integer i1, Integer
>>> i2) throws Exception {
>>>
>>>                                        return i1 + i2;
>>>
>>>                                 }
>>>
>>>                         }
>>>
>>>                 );
>>>
>>>
>>> Also, there are other jobs that run agains that RDD `events` cached. So,
>>> the jobs that are taking about 4 minutes are the ones run against cached
>>> (or partially cached) datasets, the 'agregator' job that cache the RDD
>>> takes about 7 minutes. The most part of my log is, so the actions that are
>>> taking time are the subsequential ones:
>>>
>>>
>>> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task
>>> 3.0:2062 as TID 6838 on executor 1: ip-10-8-19-185.ec2.internal
>>> (PROCESS_LOCAL)
>>>
>>> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task
>>> 3.0:2062 as 2621 bytes in 0 ms
>>>
>>> 13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
>>> Added rdd_2_2243 in memory on
>>> domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 34.5 MB, free:
>>> 1371.7 MB)
>>>
>>> 13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
>>> Added rdd_2_2259 in memory on
>>> domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 6.4 MB, free: 1365.2
>>> MB)
>>>
>>> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Finished TID 6818
>>> in 2545 ms on ip-10-4-231-4.ec2.internal (progress: 2220/2300)
>>>
>>> 13/10/28 20:01:05 INFO scheduler.DAGScheduler: Completed
>>> ShuffleMapTask(3, 2123)
>>>
>>> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task
>>> 3.0:2157 as TID 6839 on executor 3: ip-10-4-231-4.ec2.internal
>>> (PROCESS_LOCAL)
>>>
>>> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task
>>> 3.0:2157 as 2619 bytes in 1 ms
>>>
>>> 13/10/28 20:01:06 INFO cluster.ClusterTaskSetManager: Finished TID 6782
>>> in 7985 ms on domU-12-31-39-0A-90-F2.compute-1.internal (progress:
>>> 2221/2300)
>>>
>>>
>>> Thank you very much!
>>>
>>>
>>> Lucas.
>>>
>>>
>>> 2013/10/28 Patrick Wendell <pwendell@gmail.com>
>>>
>>>> Hey Lucas,
>>>>
>>>> This code still needs to read the entire initial dataset from
>>>> Cassandra, so that's probably what's taking most of the time. Also, it
>>>> doesn't show here the operations you are actually doing.
>>>>
>>>> What happens when you look in the Spark web UI or the logs? Can you
>>>> tell which stages are taking the most time? Is it the initial load or is
it
>>>> the subsequent actions?
>>>>
>>>> - Patrick
>>>>
>>>>
>>>> On Mon, Oct 28, 2013 at 10:05 AM, Lucas Fernandes Brunialti <
>>>> lbrunialti@igcorp.com.br> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Patrick, we cache the RDD before doing operations. We tried to cache
>>>>> the cassRdd (flatMap) but doesn't help. The piece of code of one job
(we're
>>>>> using java, so the code is bigger) is in the end of the email.
>>>>>
>>>>> Ewen, yes, we're re-reading from Cassandra, I found that it is better
>>>>> than serialize and read the blocks in/from disk, as my dataset doesn't
fit
>>>>> in memory...
>>>>>
>>>>> I also tried the kryoSerializer, also with no success. We're sharing
>>>>> the events (RDD) for the other jobs. The parameters for the jobs are:
>>>>>
>>>>> System.setProperty("spark.storage.memoryFraction", "0.7");
>>>>>
>>>>> System.setProperty("spark.executor.memory", "12g");
>>>>>
>>>>> System.setProperty("spark.storage.StorageLevel", "MEMORY_ONLY");
>>>>>
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Lucas.
>>>>>
>>>>>
>>>>>
>>>>> @SuppressWarnings("unchecked")
>>>>>
>>>>>  JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
casRdd =
>>>>> context.newAPIHadoopRDD(job.getConfiguration(),
>>>>>
>>>>>    ColumnFamilyInputFormat.class
>>>>> .asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
>>>>>
>>>>>    ByteBuffer.class, SortedMap.class
>>>>> ).persist(StorageLevel.MEMORY_ONLY());
>>>>>
>>>>>
>>>>>  JavaRDD<Tuple2<String, String>> events = casRdd.flatMap(new
FlatMapFunction<Tuple2<ByteBuffer,
>>>>> SortedMap<ByteBuffer, IColumn>>, Tuple2<String, String>>()
{
>>>>>
>>>>> @Override
>>>>>
>>>>> public Iterable<Tuple2<String, String>> call(Tuple2<ByteBuffer,
>>>>> SortedMap<ByteBuffer, IColumn>> tuple) throws Exception {
>>>>>
>>>>> ArrayList<Tuple2<String, String>> events = newArrayList<Tuple2<String,
String>>();
>>>>>
>>>>> for (ByteBuffer columnName : tuple._2().keySet()) {
>>>>>
>>>>> String timestamp = ByteBufferUtil.string(columnName);
>>>>>
>>>>> String event =
>>>>> ByteBufferUtil.string(tuple._2().get((ByteBufferUtil.bytes(timestamp))).value());
>>>>>
>>>>> Calendar date = Calendar.getInstance();
>>>>>
>>>>> date.setTimeInMillis(Long.parseLong(timestamp));
>>>>>
>>>>> String key = String.valueOf(date.get(Calendar.DAY_OF_MONTH)) + "-" +
>>>>> String.valueOf(date.get(Calendar.MONTH));
>>>>>
>>>>> events.add(new Tuple2<String, String>(key, event));
>>>>>
>>>>> }
>>>>>
>>>>> return events;
>>>>>
>>>>> }
>>>>>
>>>>>  });
>>>>>
>>>>>
>>>>>    events = events.filter(new Function<Tuple2<String,String>,
>>>>> Boolean>() {
>>>>>
>>>>>   @Override
>>>>>
>>>>> public Boolean call(Tuple2<String, String> tuple) throws Exception
{
>>>>>
>>>>> return tuple._2.contains("ARTICLE");
>>>>>
>>>>> }
>>>>>
>>>>>     }).persist(StorageLevel.MEMORY_ONLY());
>>>>>
>>>>>
>>>>> //other operations
>>>>>
>>>>>
>>>>> 2013/10/28 Ewen Cheslack-Postava <me@ewencp.org>
>>>>>
>>>>>> Well, he did mention that not everything was staying in the cache,
so
>>>>>> even with an ongoing job they're probably be re-reading from Cassandra.
It
>>>>>> sounds to me like the first issue to address is why things are being
>>>>>> evicted.
>>>>>>
>>>>>> -Ewen
>>>>>>
>>>>>> -----
>>>>>> Ewen Cheslack-Postava
>>>>>> StraightUp | http://readstraightup.com
>>>>>> ewencp@readstraightup.com
>>>>>> (201) 286-7785
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 28, 2013 at 9:24 AM, Patrick Wendell <pwendell@gmail.com>wrote:
>>>>>>
>>>>>>> Hey Lucas,
>>>>>>>
>>>>>>> Could you provide some rough psuedo-code for your job? One question
>>>>>>> is: are you loading the data from cassandra every time you perform
an
>>>>>>> action, or do you cache() the dataset first? If you have a dataset
that's
>>>>>>> already in an RDD, it's very hard for me to imaging that filters
and
>>>>>>> aggregations could possibly take 4 minutes... should be more
like seconds.
>>>>>>>
>>>>>>> - Patrick
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 28, 2013 at 9:11 AM, Lucas Fernandes Brunialti <
>>>>>>> lbrunialti@igcorp.com.br> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> We're using Spark to run analytics and ML jobs against Cassandra.
>>>>>>>> Our analytics jobs are simple (filters and counts) and we're
trying to
>>>>>>>> improve the performance, these jobs takes around 4 minutes
querying 160Gb
>>>>>>>> (size of our dataset). Also, we use 5 workers and 1 master,
EC2 m1.xlarge
>>>>>>>> with 8gb in jvm heap.
>>>>>>>>
>>>>>>>> We tried to increase the jvm heap to 12gb, but we had no
gain in
>>>>>>>> performance. We're using CACHE_ONLY (after some tests we've
found it
>>>>>>>> better), also it's not caching everything, just around 1000
of 2500 blocks.
>>>>>>>> Maybe the cache is not impacting on performance, just the
cassandra IO (?)
>>>>>>>>
>>>>>>>> I saw that people from ooyala can do analytics jobs in milliseconds
>>>>>>>> (http://www.youtube.com/watch?v=6kHlArorzvs), any advices?
>>>>>>>>
>>>>>>>> Appreciate the help!
>>>>>>>>
>>>>>>>> Lucas.
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Lucas Fernandes Brunialti
>>>>>>>>
>>>>>>>> *Dev/Ops Software Engineer*
>>>>>>>>
>>>>>>>> *+55 9 6512 4514*
>>>>>>>>
>>>>>>>> *lbrunialti@igcorp.com.br* <lbrunialti@igcorp.com.br>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Lucas Fernandes Brunialti
>>>>>
>>>>> *Dev/Ops Software Engineer*
>>>>>
>>>>> *+55 11 96512 4514*
>>>>>
>>>>> *lbrunialti@igcorp.com.br* <lbrunialti@igcorp.com.br>
>>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>

Mime
View raw message