spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xiangrui Meng <men...@gmail.com>
Subject Re: Is there any way to control the parallelism in LogisticRegression
Date Thu, 04 Sep 2014 02:34:13 GMT
+DB & David (They implemented QWLQN on Spark today.)
On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <chenjiusheng@gmail.com> wrote:

> Hi Xiangrui,
>
> A side-by question about MLLib.
> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only support
> L2 regurization, the doc explains it: "The L1 regularization by using
> L1Updater
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
> will not work since the soft-thresholding logic in L1Updater is designed
> for gradient descent."
>
> Since the algorithm comes from Breeze and I noticed Breeze actually
> supports L1 (OWLQN
> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering
> if there is some special considerations that current MLLib didn't support
> OWLQN? And any plan to add it?
>
> Thanks for your time!
>
>
>
> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <dong128@gmail.com>
> wrote:
>
>> Update.
>>
>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which
>> sounds could control the locality. The default value is 0.1 (smaller value
>> means lower locality). I change it to 1.0 (full locality) and use #3
>> approach, then find a lot improvement (20%~40%). Although the Web UI still
>> shows the type of task as 'ANY' and the input is from shuffle read, but the
>> real performance is much better before change this parameter.
>> [image: Inline image 1]
>>
>> I think the benefit includes:
>>
>> 1. This approach keep the physical partition size small, but make each
>> task handle multiple partitions. So the memory requested for
>> deserialization is reduced, which also reduce the GC time. That is exactly
>> what we observed in our job.
>>
>> 2. This approach will not hit the 2G limitation, because it not change
>> the partition size.
>>
>> And I also think that, Spark may change this default value, or at least
>> expose this parameter to users (*CoalescedRDD *is a private class, and
>> *RDD*.*coalesce* also don't have a parameter to control that).
>>
>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <mengxr@gmail.com> wrote:
>>
>>> Sorry, I missed #2. My suggestion is the same as #2. You need to set a
>>> bigger numPartitions to avoid hitting integer bound or 2G limitation,
>>> at the cost of increased shuffle size per iteration. If you use a
>>> CombineInputFormat and then cache, it will try to give you roughly the
>>> same size per partition. There will be some remote fetches from HDFS
>>> but still cheaper than calling RDD.repartition().
>>>
>>> For coalesce without shuffle, I don't know how to set the right number
>>> of partitions either ...
>>>
>>> -Xiangrui
>>>
>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <dong128@gmail.com>
>>> wrote:
>>> > Hi Xiangrui,
>>> >
>>> > Thanks for your reply!
>>> >
>>> > Yes, our data is very sparse, but RDD.repartition invoke
>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think it
>>> has
>>> > the same effect with #2, right?
>>> >
>>> > For CombineInputFormat, although I haven't tried it, but it sounds
>>> that it
>>> > will combine multiple partitions into a large partition if I cache it,
>>> so
>>> > same issues as #1?
>>> >
>>> > For coalesce, could you share some best practice how to set the right
>>> number
>>> > of partitions to avoid locality problem?
>>> >
>>> > Thanks!
>>> >
>>> >
>>> >
>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <mengxr@gmail.com>
>>> wrote:
>>> >>
>>> >> Assuming that your data is very sparse, I would recommend
>>> >> RDD.repartition. But if it is not the case and you don't want to
>>> >> shuffle the data, you can try a CombineInputFormat and then parse the
>>> >> lines into labeled points. Coalesce may cause locality problems if you
>>> >> didn't use the right number of partitions. -Xiangrui
>>> >>
>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <dong128@gmail.com>
>>> >> wrote:
>>> >> > I think this has the same effect and issue with #1, right?
>>> >> >
>>> >> >
>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <
>>> chenjiusheng@gmail.com>
>>> >> > wrote:
>>> >> >>
>>> >> >> How about increase HDFS file extent size? like current value
is
>>> 128M,
>>> >> >> we
>>> >> >> make it 512M or bigger.
>>> >> >>
>>> >> >>
>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <
>>> dong128@gmail.com>
>>> >> >> wrote:
>>> >> >>>
>>> >> >>> Hi all,
>>> >> >>>
>>> >> >>> We are trying to use Spark MLlib to train super large data
(100M
>>> >> >>> features
>>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions.
By
>>> default,
>>> >> >>> MLlib
>>> >> >>> will create a task for every partition at each iteration.
But
>>> because
>>> >> >>> our
>>> >> >>> dimensions are also very high, such large number of tasks
will
>>> >> >>> increase
>>> >> >>> large network overhead to transfer the weight vector. So
we want
>>> to
>>> >> >>> reduce
>>> >> >>> the number of tasks, we tried below ways:
>>> >> >>>
>>> >> >>> 1. Coalesce partitions without shuffling, then cache.
>>> >> >>>
>>> >> >>> data.coalesce(numPartitions).cache()
>>> >> >>>
>>> >> >>> This works fine for relative small data, but when data
is
>>> increasing
>>> >> >>> and
>>> >> >>> numPartitions is fixed, the size of one partition will
be large.
>>> This
>>> >> >>> introduces two issues: the first is, the larger partition
will
>>> need
>>> >> >>> larger
>>> >> >>> object and more memory at runtime, and trigger GC more
>>> frequently; the
>>> >> >>> second is, we meet the issue 'size exceeds integer.max_value'
>>> error,
>>> >> >>> which
>>> >> >>> seems be caused by the size of one partition larger than
2G
>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>> >> >>>
>>> >> >>> 2. Coalesce partitions with shuffling, then cache.
>>> >> >>>
>>> >> >>> data.coalesce(numPartitions, true).cache()
>>> >> >>>
>>> >> >>> It could mitigate the second issue in #1 at some degree,
but fist
>>> >> >>> issue
>>> >> >>> is still there, and it also will introduce large amount
of
>>> shullfling.
>>> >> >>>
>>> >> >>> 3. Cache data first, and coalesce partitions.
>>> >> >>>
>>> >> >>> data.cache().coalesce(numPartitions)
>>> >> >>>
>>> >> >>> In this way, the number of cached partitions is not change,
but
>>> each
>>> >> >>> task
>>> >> >>> read the data from multiple partitions. However, I find
the task
>>> will
>>> >> >>> loss
>>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that
means that
>>> >> >>> tasks
>>> >> >>> read data from other nodes, and become slower than that
read data
>>> from
>>> >> >>> local
>>> >> >>> memory.
>>> >> >>>
>>> >> >>> I think the best way should like #3, but leverage locality
as
>>> more as
>>> >> >>> possible. Is there any way to do that? Any suggestions?
>>> >> >>>
>>> >> >>> Thanks!
>>> >> >>>
>>> >> >>> --
>>> >> >>> ZHENG, Xu-dong
>>> >> >>>
>>> >> >>
>>> >> >
>>> >> >
>>> >> >
>>> >> > --
>>> >> > 郑旭东
>>> >> > ZHENG, Xu-dong
>>> >> >
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > 郑旭东
>>> > ZHENG, Xu-dong
>>> >
>>>
>>
>>
>>
>> --
>> 郑旭东
>> ZHENG, Xu-dong
>>
>>
>

Mime
View raw message