spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucas Fernandes Brunialti <lbrunia...@igcorp.com.br>
Subject Re: Job duration
Date Mon, 28 Oct 2013 20:21:45 GMT
Hello,

I count everts per date/time after that code, like the code below:

JavaPairRDD<String, Integer> eventPerDate = events.map(

                        new PairFunction<Tuple2<String, String>, String,
Integer>() {

                                @Override

                                public Tuple2<String, Integer>
call(Tuple2<String, String> tuple) throws Exception {

                                        return new Tuple2<String,
Integer>(tuple._1, 1);

                                }

                        }).reduceByKey(new Function2<Integer, Integer,
Integer>() {

                                @Override

                                public Integer call(Integer i1, Integer i2)
throws Exception {

                                       return i1 + i2;

                                }

                        }

                );


Also, there are other jobs that run agains that RDD `events` cached. So,
the jobs that are taking about 4 minutes are the ones run against cached
(or partially cached) datasets, the 'agregator' job that cache the RDD
takes about 7 minutes. The most part of my log is, so the actions that are
taking time are the subsequential ones:


13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task
3.0:2062 as TID 6838 on executor 1: ip-10-8-19-185.ec2.internal
(PROCESS_LOCAL)

13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task
3.0:2062 as 2621 bytes in 0 ms

13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Added rdd_2_2243 in memory on
domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 34.5 MB, free:
1371.7 MB)

13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Added rdd_2_2259 in memory on
domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 6.4 MB, free: 1365.2
MB)

13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Finished TID 6818 in
2545 ms on ip-10-4-231-4.ec2.internal (progress: 2220/2300)

13/10/28 20:01:05 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3,
2123)

13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task
3.0:2157 as TID 6839 on executor 3: ip-10-4-231-4.ec2.internal
(PROCESS_LOCAL)

13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task
3.0:2157 as 2619 bytes in 1 ms

13/10/28 20:01:06 INFO cluster.ClusterTaskSetManager: Finished TID 6782 in
7985 ms on domU-12-31-39-0A-90-F2.compute-1.internal (progress: 2221/2300)


Thank you very much!


Lucas.


2013/10/28 Patrick Wendell <pwendell@gmail.com>

> Hey Lucas,
>
> This code still needs to read the entire initial dataset from Cassandra,
> so that's probably what's taking most of the time. Also, it doesn't show
> here the operations you are actually doing.
>
> What happens when you look in the Spark web UI or the logs? Can you tell
> which stages are taking the most time? Is it the initial load or is it the
> subsequent actions?
>
> - Patrick
>
>
> On Mon, Oct 28, 2013 at 10:05 AM, Lucas Fernandes Brunialti <
> lbrunialti@igcorp.com.br> wrote:
>
>> Hi,
>>
>> Patrick, we cache the RDD before doing operations. We tried to cache the
>> cassRdd (flatMap) but doesn't help. The piece of code of one job (we're
>> using java, so the code is bigger) is in the end of the email.
>>
>> Ewen, yes, we're re-reading from Cassandra, I found that it is better
>> than serialize and read the blocks in/from disk, as my dataset doesn't fit
>> in memory...
>>
>> I also tried the kryoSerializer, also with no success. We're sharing the
>> events (RDD) for the other jobs. The parameters for the jobs are:
>>
>> System.setProperty("spark.storage.memoryFraction", "0.7");
>>
>> System.setProperty("spark.executor.memory", "12g");
>>
>> System.setProperty("spark.storage.StorageLevel", "MEMORY_ONLY");
>>
>>
>>
>> Thanks!
>>
>> Lucas.
>>
>>
>>
>> @SuppressWarnings("unchecked")
>>
>>  JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd =
>> context.newAPIHadoopRDD(job.getConfiguration(),
>>
>>    ColumnFamilyInputFormat.class
>> .asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
>>
>>    ByteBuffer.class, SortedMap.class
>> ).persist(StorageLevel.MEMORY_ONLY());
>>
>>
>>  JavaRDD<Tuple2<String, String>> events = casRdd.flatMap(new FlatMapFunction<Tuple2<ByteBuffer,
>> SortedMap<ByteBuffer, IColumn>>, Tuple2<String, String>>() {
>>
>> @Override
>>
>> public Iterable<Tuple2<String, String>> call(Tuple2<ByteBuffer,
>> SortedMap<ByteBuffer, IColumn>> tuple) throws Exception {
>>
>> ArrayList<Tuple2<String, String>> events = new ArrayList<Tuple2<String,
>> String>>();
>>
>> for (ByteBuffer columnName : tuple._2().keySet()) {
>>
>> String timestamp = ByteBufferUtil.string(columnName);
>>
>> String event =
>> ByteBufferUtil.string(tuple._2().get((ByteBufferUtil.bytes(timestamp))).value());
>>
>> Calendar date = Calendar.getInstance();
>>
>> date.setTimeInMillis(Long.parseLong(timestamp));
>>
>> String key = String.valueOf(date.get(Calendar.DAY_OF_MONTH)) + "-" +
>> String.valueOf(date.get(Calendar.MONTH));
>>
>> events.add(new Tuple2<String, String>(key, event));
>>
>> }
>>
>> return events;
>>
>> }
>>
>>  });
>>
>>
>>    events = events.filter(new Function<Tuple2<String,String>, Boolean>(){
>>
>>   @Override
>>
>> public Boolean call(Tuple2<String, String> tuple) throws Exception {
>>
>> return tuple._2.contains("ARTICLE");
>>
>> }
>>
>>     }).persist(StorageLevel.MEMORY_ONLY());
>>
>>
>> //other operations
>>
>>
>> 2013/10/28 Ewen Cheslack-Postava <me@ewencp.org>
>>
>>> Well, he did mention that not everything was staying in the cache, so
>>> even with an ongoing job they're probably be re-reading from Cassandra. It
>>> sounds to me like the first issue to address is why things are being
>>> evicted.
>>>
>>> -Ewen
>>>
>>> -----
>>> Ewen Cheslack-Postava
>>> StraightUp | http://readstraightup.com
>>> ewencp@readstraightup.com
>>> (201) 286-7785
>>>
>>>
>>> On Mon, Oct 28, 2013 at 9:24 AM, Patrick Wendell <pwendell@gmail.com>wrote:
>>>
>>>> Hey Lucas,
>>>>
>>>> Could you provide some rough psuedo-code for your job? One question is:
>>>> are you loading the data from cassandra every time you perform an action,
>>>> or do you cache() the dataset first? If you have a dataset that's already
>>>> in an RDD, it's very hard for me to imaging that filters and aggregations
>>>> could possibly take 4 minutes... should be more like seconds.
>>>>
>>>> - Patrick
>>>>
>>>>
>>>> On Mon, Oct 28, 2013 at 9:11 AM, Lucas Fernandes Brunialti <
>>>> lbrunialti@igcorp.com.br> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We're using Spark to run analytics and ML jobs against Cassandra. Our
>>>>> analytics jobs are simple (filters and counts) and we're trying to improve
>>>>> the performance, these jobs takes around 4 minutes querying 160Gb (size
of
>>>>> our dataset). Also, we use 5 workers and 1 master, EC2 m1.xlarge with
8gb
>>>>> in jvm heap.
>>>>>
>>>>> We tried to increase the jvm heap to 12gb, but we had no gain in
>>>>> performance. We're using CACHE_ONLY (after some tests we've found it
>>>>> better), also it's not caching everything, just around 1000 of 2500 blocks.
>>>>> Maybe the cache is not impacting on performance, just the cassandra IO
(?)
>>>>>
>>>>> I saw that people from ooyala can do analytics jobs in milliseconds (
>>>>> http://www.youtube.com/watch?v=6kHlArorzvs), any advices?
>>>>>
>>>>> Appreciate the help!
>>>>>
>>>>> Lucas.
>>>>>
>>>>> --
>>>>>
>>>>> Lucas Fernandes Brunialti
>>>>>
>>>>> *Dev/Ops Software Engineer*
>>>>>
>>>>> *+55 9 6512 4514*
>>>>>
>>>>> *lbrunialti@igcorp.com.br* <lbrunialti@igcorp.com.br>
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>>
>> Lucas Fernandes Brunialti
>>
>> *Dev/Ops Software Engineer*
>>
>> *+55 11 96512 4514*
>>
>> *lbrunialti@igcorp.com.br* <lbrunialti@igcorp.com.br>
>>
>
>

Mime
View raw message