spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jiusheng Chen <chenjiush...@gmail.com>
Subject Re: Is there any way to control the parallelism in LogisticRegression
Date Thu, 04 Sep 2014 02:18:06 GMT
Hi Xiangrui,

A side-by question about MLLib.
It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only support
L2 regurization, the doc explains it: "The L1 regularization by using
L1Updater
<http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
will not work since the soft-thresholding logic in L1Updater is designed
for gradient descent."

Since the algorithm comes from Breeze and I noticed Breeze actually
supports L1 (OWLQN
<http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering if
there is some special considerations that current MLLib didn't support
OWLQN? And any plan to add it?

Thanks for your time!



On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <dong128@gmail.com> wrote:

> Update.
>
> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which
> sounds could control the locality. The default value is 0.1 (smaller value
> means lower locality). I change it to 1.0 (full locality) and use #3
> approach, then find a lot improvement (20%~40%). Although the Web UI still
> shows the type of task as 'ANY' and the input is from shuffle read, but the
> real performance is much better before change this parameter.
> [image: Inline image 1]
>
> I think the benefit includes:
>
> 1. This approach keep the physical partition size small, but make each
> task handle multiple partitions. So the memory requested for
> deserialization is reduced, which also reduce the GC time. That is exactly
> what we observed in our job.
>
> 2. This approach will not hit the 2G limitation, because it not change the
> partition size.
>
> And I also think that, Spark may change this default value, or at least
> expose this parameter to users (*CoalescedRDD *is a private class, and
> *RDD*.*coalesce* also don't have a parameter to control that).
>
> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <mengxr@gmail.com> wrote:
>
>> Sorry, I missed #2. My suggestion is the same as #2. You need to set a
>> bigger numPartitions to avoid hitting integer bound or 2G limitation,
>> at the cost of increased shuffle size per iteration. If you use a
>> CombineInputFormat and then cache, it will try to give you roughly the
>> same size per partition. There will be some remote fetches from HDFS
>> but still cheaper than calling RDD.repartition().
>>
>> For coalesce without shuffle, I don't know how to set the right number
>> of partitions either ...
>>
>> -Xiangrui
>>
>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <dong128@gmail.com>
>> wrote:
>> > Hi Xiangrui,
>> >
>> > Thanks for your reply!
>> >
>> > Yes, our data is very sparse, but RDD.repartition invoke
>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think it
>> has
>> > the same effect with #2, right?
>> >
>> > For CombineInputFormat, although I haven't tried it, but it sounds that
>> it
>> > will combine multiple partitions into a large partition if I cache it,
>> so
>> > same issues as #1?
>> >
>> > For coalesce, could you share some best practice how to set the right
>> number
>> > of partitions to avoid locality problem?
>> >
>> > Thanks!
>> >
>> >
>> >
>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <mengxr@gmail.com>
>> wrote:
>> >>
>> >> Assuming that your data is very sparse, I would recommend
>> >> RDD.repartition. But if it is not the case and you don't want to
>> >> shuffle the data, you can try a CombineInputFormat and then parse the
>> >> lines into labeled points. Coalesce may cause locality problems if you
>> >> didn't use the right number of partitions. -Xiangrui
>> >>
>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <dong128@gmail.com>
>> >> wrote:
>> >> > I think this has the same effect and issue with #1, right?
>> >> >
>> >> >
>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <
>> chenjiusheng@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> How about increase HDFS file extent size? like current value is
>> 128M,
>> >> >> we
>> >> >> make it 512M or bigger.
>> >> >>
>> >> >>
>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <dong128@gmail.com
>> >
>> >> >> wrote:
>> >> >>>
>> >> >>> Hi all,
>> >> >>>
>> >> >>> We are trying to use Spark MLlib to train super large data
(100M
>> >> >>> features
>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By
>> default,
>> >> >>> MLlib
>> >> >>> will create a task for every partition at each iteration. But
>> because
>> >> >>> our
>> >> >>> dimensions are also very high, such large number of tasks will
>> >> >>> increase
>> >> >>> large network overhead to transfer the weight vector. So we
want to
>> >> >>> reduce
>> >> >>> the number of tasks, we tried below ways:
>> >> >>>
>> >> >>> 1. Coalesce partitions without shuffling, then cache.
>> >> >>>
>> >> >>> data.coalesce(numPartitions).cache()
>> >> >>>
>> >> >>> This works fine for relative small data, but when data is
>> increasing
>> >> >>> and
>> >> >>> numPartitions is fixed, the size of one partition will be large.
>> This
>> >> >>> introduces two issues: the first is, the larger partition will
need
>> >> >>> larger
>> >> >>> object and more memory at runtime, and trigger GC more frequently;
>> the
>> >> >>> second is, we meet the issue 'size exceeds integer.max_value'
>> error,
>> >> >>> which
>> >> >>> seems be caused by the size of one partition larger than 2G
>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>> >> >>>
>> >> >>> 2. Coalesce partitions with shuffling, then cache.
>> >> >>>
>> >> >>> data.coalesce(numPartitions, true).cache()
>> >> >>>
>> >> >>> It could mitigate the second issue in #1 at some degree, but
fist
>> >> >>> issue
>> >> >>> is still there, and it also will introduce large amount of
>> shullfling.
>> >> >>>
>> >> >>> 3. Cache data first, and coalesce partitions.
>> >> >>>
>> >> >>> data.cache().coalesce(numPartitions)
>> >> >>>
>> >> >>> In this way, the number of cached partitions is not change,
but
>> each
>> >> >>> task
>> >> >>> read the data from multiple partitions. However, I find the
task
>> will
>> >> >>> loss
>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means
that
>> >> >>> tasks
>> >> >>> read data from other nodes, and become slower than that read
data
>> from
>> >> >>> local
>> >> >>> memory.
>> >> >>>
>> >> >>> I think the best way should like #3, but leverage locality
as more
>> as
>> >> >>> possible. Is there any way to do that? Any suggestions?
>> >> >>>
>> >> >>> Thanks!
>> >> >>>
>> >> >>> --
>> >> >>> ZHENG, Xu-dong
>> >> >>>
>> >> >>
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > 郑旭东
>> >> > ZHENG, Xu-dong
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > 郑旭东
>> > ZHENG, Xu-dong
>> >
>>
>
>
>
> --
> 郑旭东
> ZHENG, Xu-dong
>
>

Mime
View raw message