spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrick Wendell <pwend...@gmail.com>
Subject Re: Job duration
Date Mon, 28 Oct 2013 20:56:49 GMT
Hey Lucas,

How many unique keys do you have when you do these aggregations? Also, when
you look in the web UI, can you tell how much in-memory storage is being
used overall by the events RDD and the casRDD?

- Patrick


On Mon, Oct 28, 2013 at 1:21 PM, Lucas Fernandes Brunialti <
lbrunialti@igcorp.com.br> wrote:

> Hello,
>
> I count everts per date/time after that code, like the code below:
>
> JavaPairRDD<String, Integer> eventPerDate = events.map(
>
>                         new PairFunction<Tuple2<String, String>, String,
> Integer>() {
>
>                                 @Override
>
>                                 public Tuple2<String, Integer>
> call(Tuple2<String, String> tuple) throws Exception {
>
>                                         return new Tuple2<String,
> Integer>(tuple._1, 1);
>
>                                 }
>
>                         }).reduceByKey(new Function2<Integer, Integer,
> Integer>() {
>
>                                 @Override
>
>                                 public Integer call(Integer i1, Integer
> i2) throws Exception {
>
>                                        return i1 + i2;
>
>                                 }
>
>                         }
>
>                 );
>
>
> Also, there are other jobs that run agains that RDD `events` cached. So,
> the jobs that are taking about 4 minutes are the ones run against cached
> (or partially cached) datasets, the 'agregator' job that cache the RDD
> takes about 7 minutes. The most part of my log is, so the actions that are
> taking time are the subsequential ones:
>
>
> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task
> 3.0:2062 as TID 6838 on executor 1: ip-10-8-19-185.ec2.internal
> (PROCESS_LOCAL)
>
> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task
> 3.0:2062 as 2621 bytes in 0 ms
>
> 13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Added rdd_2_2243 in memory on
> domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 34.5 MB, free:
> 1371.7 MB)
>
> 13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Added rdd_2_2259 in memory on
> domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 6.4 MB, free: 1365.2
> MB)
>
> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Finished TID 6818 in
> 2545 ms on ip-10-4-231-4.ec2.internal (progress: 2220/2300)
>
> 13/10/28 20:01:05 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3,
> 2123)
>
> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task
> 3.0:2157 as TID 6839 on executor 3: ip-10-4-231-4.ec2.internal
> (PROCESS_LOCAL)
>
> 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task
> 3.0:2157 as 2619 bytes in 1 ms
>
> 13/10/28 20:01:06 INFO cluster.ClusterTaskSetManager: Finished TID 6782 in
> 7985 ms on domU-12-31-39-0A-90-F2.compute-1.internal (progress: 2221/2300)
>
>
> Thank you very much!
>
>
> Lucas.
>
>
> 2013/10/28 Patrick Wendell <pwendell@gmail.com>
>
>> Hey Lucas,
>>
>> This code still needs to read the entire initial dataset from Cassandra,
>> so that's probably what's taking most of the time. Also, it doesn't show
>> here the operations you are actually doing.
>>
>> What happens when you look in the Spark web UI or the logs? Can you tell
>> which stages are taking the most time? Is it the initial load or is it the
>> subsequent actions?
>>
>> - Patrick
>>
>>
>> On Mon, Oct 28, 2013 at 10:05 AM, Lucas Fernandes Brunialti <
>> lbrunialti@igcorp.com.br> wrote:
>>
>>> Hi,
>>>
>>> Patrick, we cache the RDD before doing operations. We tried to cache the
>>> cassRdd (flatMap) but doesn't help. The piece of code of one job (we're
>>> using java, so the code is bigger) is in the end of the email.
>>>
>>> Ewen, yes, we're re-reading from Cassandra, I found that it is better
>>> than serialize and read the blocks in/from disk, as my dataset doesn't fit
>>> in memory...
>>>
>>> I also tried the kryoSerializer, also with no success. We're sharing the
>>> events (RDD) for the other jobs. The parameters for the jobs are:
>>>
>>> System.setProperty("spark.storage.memoryFraction", "0.7");
>>>
>>> System.setProperty("spark.executor.memory", "12g");
>>>
>>> System.setProperty("spark.storage.StorageLevel", "MEMORY_ONLY");
>>>
>>>
>>>
>>> Thanks!
>>>
>>> Lucas.
>>>
>>>
>>>
>>> @SuppressWarnings("unchecked")
>>>
>>>  JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd =
>>> context.newAPIHadoopRDD(job.getConfiguration(),
>>>
>>>    ColumnFamilyInputFormat.class
>>> .asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
>>>
>>>    ByteBuffer.class, SortedMap.class
>>> ).persist(StorageLevel.MEMORY_ONLY());
>>>
>>>
>>>  JavaRDD<Tuple2<String, String>> events = casRdd.flatMap(new FlatMapFunction<Tuple2<ByteBuffer,
>>> SortedMap<ByteBuffer, IColumn>>, Tuple2<String, String>>()
{
>>>
>>> @Override
>>>
>>> public Iterable<Tuple2<String, String>> call(Tuple2<ByteBuffer,
>>> SortedMap<ByteBuffer, IColumn>> tuple) throws Exception {
>>>
>>> ArrayList<Tuple2<String, String>> events = new ArrayList<Tuple2<String,
>>> String>>();
>>>
>>> for (ByteBuffer columnName : tuple._2().keySet()) {
>>>
>>> String timestamp = ByteBufferUtil.string(columnName);
>>>
>>> String event =
>>> ByteBufferUtil.string(tuple._2().get((ByteBufferUtil.bytes(timestamp))).value());
>>>
>>> Calendar date = Calendar.getInstance();
>>>
>>> date.setTimeInMillis(Long.parseLong(timestamp));
>>>
>>> String key = String.valueOf(date.get(Calendar.DAY_OF_MONTH)) + "-" +
>>> String.valueOf(date.get(Calendar.MONTH));
>>>
>>> events.add(new Tuple2<String, String>(key, event));
>>>
>>> }
>>>
>>> return events;
>>>
>>> }
>>>
>>>  });
>>>
>>>
>>>    events = events.filter(new Function<Tuple2<String,String>, Boolean>(){
>>>
>>>   @Override
>>>
>>> public Boolean call(Tuple2<String, String> tuple) throws Exception {
>>>
>>> return tuple._2.contains("ARTICLE");
>>>
>>> }
>>>
>>>     }).persist(StorageLevel.MEMORY_ONLY());
>>>
>>>
>>> //other operations
>>>
>>>
>>> 2013/10/28 Ewen Cheslack-Postava <me@ewencp.org>
>>>
>>>> Well, he did mention that not everything was staying in the cache, so
>>>> even with an ongoing job they're probably be re-reading from Cassandra. It
>>>> sounds to me like the first issue to address is why things are being
>>>> evicted.
>>>>
>>>> -Ewen
>>>>
>>>> -----
>>>> Ewen Cheslack-Postava
>>>> StraightUp | http://readstraightup.com
>>>> ewencp@readstraightup.com
>>>> (201) 286-7785
>>>>
>>>>
>>>> On Mon, Oct 28, 2013 at 9:24 AM, Patrick Wendell <pwendell@gmail.com>wrote:
>>>>
>>>>> Hey Lucas,
>>>>>
>>>>> Could you provide some rough psuedo-code for your job? One question
>>>>> is: are you loading the data from cassandra every time you perform an
>>>>> action, or do you cache() the dataset first? If you have a dataset that's
>>>>> already in an RDD, it's very hard for me to imaging that filters and
>>>>> aggregations could possibly take 4 minutes... should be more like seconds.
>>>>>
>>>>> - Patrick
>>>>>
>>>>>
>>>>> On Mon, Oct 28, 2013 at 9:11 AM, Lucas Fernandes Brunialti <
>>>>> lbrunialti@igcorp.com.br> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> We're using Spark to run analytics and ML jobs against Cassandra.
Our
>>>>>> analytics jobs are simple (filters and counts) and we're trying to
improve
>>>>>> the performance, these jobs takes around 4 minutes querying 160Gb
(size of
>>>>>> our dataset). Also, we use 5 workers and 1 master, EC2 m1.xlarge
with 8gb
>>>>>> in jvm heap.
>>>>>>
>>>>>> We tried to increase the jvm heap to 12gb, but we had no gain in
>>>>>> performance. We're using CACHE_ONLY (after some tests we've found
it
>>>>>> better), also it's not caching everything, just around 1000 of 2500
blocks.
>>>>>> Maybe the cache is not impacting on performance, just the cassandra
IO (?)
>>>>>>
>>>>>> I saw that people from ooyala can do analytics jobs in milliseconds
(
>>>>>> http://www.youtube.com/watch?v=6kHlArorzvs), any advices?
>>>>>>
>>>>>> Appreciate the help!
>>>>>>
>>>>>> Lucas.
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Lucas Fernandes Brunialti
>>>>>>
>>>>>> *Dev/Ops Software Engineer*
>>>>>>
>>>>>> *+55 9 6512 4514*
>>>>>>
>>>>>> *lbrunialti@igcorp.com.br* <lbrunialti@igcorp.com.br>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Lucas Fernandes Brunialti
>>>
>>> *Dev/Ops Software Engineer*
>>>
>>> *+55 11 96512 4514*
>>>
>>> *lbrunialti@igcorp.com.br* <lbrunialti@igcorp.com.br>
>>>
>>
>>
>
>
>

Mime
View raw message