spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Charles Li <littlee1...@gmail.com>
Subject Re: Questions about disk IOs
Date Fri, 25 Jul 2014 16:07:09 GMT
Hi Xiangrui,

I have 16 * 40 cpu cores in total. But I am only using 200 partitions on the 200 executors.
I use coalesce without shuffle to reduce the default partition of RDD.

The shuffle size from the WebUI is nearly 100m.

On Jul 25, 2014, at 23:51, Xiangrui Meng <mengxr@gmail.com> wrote:

> How many partitions did you use and how many CPU cores in total? The
> former shouldn't be much larger than the latter. Could you also check
> the shuffle size from the WebUI? -Xiangrui
> 
> On Fri, Jul 25, 2014 at 4:10 AM, Charles Li <littlee1032@gmail.com> wrote:
>> Hi Xiangrui,
>> 
>> Thanks for your treeAggregate patch. It is very helpful.
>> After applying your patch in my local repos, the new spark can handle more partition
than before.
>> But after some iteration(mapPartition + reduceByKey), the reducer seems become more
slower and finally hang.
>> 
>> The logs shows there always 1 message pending in the outbox, and we are waiting for
it. Are you aware this kind issue?
>> How can I know which message is pending?  Where is it supposed to go?
>> 
>> Log:
>> 
>> 14/07/25 17:49:54 INFO storage.BlockManager: Found block rdd_2_158 locally
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO executor.Executor: Serialized size of result for 302 is 752
>> 14/07/25 17:50:03 INFO executor.Executor: Sending result for 302 directly to driver
>> 14/07/25 17:50:03 INFO executor.Executor: Finished task ID 302
>> 14/07/25 17:50:34 INFO network.ConnectionManager: Accepted connection from [*********/**********]
>> 14/07/25 17:50:34 INFO network.SendingConnection: Initiating connection to [********/************]
>> 14/07/25 17:50:34 INFO network.SendingConnection: Connected to [********/********],
1 messages pending
>> 14/07/25 17:51:28 INFO storage.ShuffleBlockManager: Deleted all files for shuffle
0
>> 14/07/25 17:51:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 742
>> 14/07/25 17:51:37 INFO executor.Executor: Running task ID 742
>> 14/07/25 17:51:37 INFO storage.BlockManager: Found block broadcast_1 locally
>> 14/07/25 17:51:38 INFO spark.MapOutputTrackerWorker: Updating epoch to 1 and clearing
cache
>> 14/07/25 17:51:38 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:38 INFO storage.BlockManager: Found block rdd_2_158 locally
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO executor.Executor: Serialized size of result for 742 is 752
>> 14/07/25 17:51:48 INFO executor.Executor: Sending result for 742 directly to driver
>> 14/07/25 17:51:48 INFO executor.Executor: Finished task ID 742
>> <—— I have shutdown the App
>> 14/07/25 18:16:36 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a
shutdown
>> 
>> On Jul 2, 2014, at 0:08, Xiangrui Meng <mengxr@gmail.com> wrote:
>> 
>>> Try to reduce number of partitions to match the number of cores. We
>>> will add treeAggregate to reduce the communication cost.
>>> 
>>> PR: https://github.com/apache/spark/pull/1110
>>> 
>>> -Xiangrui
>>> 
>>> On Tue, Jul 1, 2014 at 12:55 AM, Charles Li <littlee1032@gmail.com> wrote:
>>>> Hi Spark,
>>>> 
>>>> I am running LBFGS on our user data. The data size with Kryo serialisation
is about 210G. The weight size is around 1,300,000. I am quite confused that the performance
is very close whether the data is cached or not.
>>>> 
>>>> The program is simple:
>>>> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
>>>> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not cached
>>>> gradient = new LogisticGrandient();
>>>> updater = new SquaredL2Updater();
>>>> initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
>>>> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections,
convergeTol, maxIter, regParam, initWeight);
>>>> 
>>>> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its cluster
mode. Below are some arguments I am using:
>>>> —executor-memory 10G
>>>> —num-executors 50
>>>> —executor-cores 2
>>>> 
>>>> Storage Using:
>>>> When caching:
>>>> Cached Partitions 951
>>>> Fraction Cached 100%
>>>> Size in Memory 215.7GB
>>>> Size in Tachyon 0.0B
>>>> Size on Disk 1029.7MB
>>>> 
>>>> The time cost by every aggregate is around 5 minutes with cache enabled.
Lots of disk IOs can be seen on the hadoop node. I have the same result with cache disabled.
>>>> 
>>>> Should data points caching improve the performance? Should caching decrease
the disk IO?
>>>> 
>>>> Thanks in advance.
>> 


Mime
View raw message