spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From DB Tsai <dbt...@dbtsai.com>
Subject Re: Is there any way to control the parallelism in LogisticRegression
Date Thu, 04 Sep 2014 06:48:33 GMT
For saving the memory, I recommend you compress the cached RDD, and it will
be couple times smaller than original data sets.


Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, Sep 3, 2014 at 9:28 PM, Jiusheng Chen <chenjiusheng@gmail.com>
wrote:

> Thanks DB and Xiangrui. Glad to know you guys are actively working on it.
>
> Another thing, did we evaluate the loss of using Float to store values?
> currently it is Double. Use fewer bits has the benifit of memory footprint
> reduction. According to Google, they even uses 16 bits (a special
> encoding scheme called q2.13)
> <http://jmlr.org/proceedings/papers/v28/golovin13.pdf> in their learner
> without measurable loss, and can save 75% memory.
>
>
> On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai <dbtsai@dbtsai.com> wrote:
>
>> With David's help today, we were able to implement elastic net glm in
>> Spark. It's surprising easy, and with just some modification in breeze's
>> OWLQN code, it just works without further investigation.
>>
>> We did benchmark, and the coefficients are within 0.5% differences
>> compared with R's glmnet package. I guess this is first truly distributed
>> glmnet implementation.
>>
>> Still require some effort to have it in mllib; mostly api cleanup work.
>>
>> 1) I'll submit a PR to breeze which implements weighted regularization in
>> OWLQN.
>> 2) This also depends on https://issues.apache.org/jira/browse/SPARK-2505
>> which we have internal version requiring some cleanup for open source
>> project.
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> -------------------------------------------------------
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <mengxr@gmail.com> wrote:
>>
>>> +DB & David (They implemented QWLQN on Spark today.)
>>> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <chenjiusheng@gmail.com> wrote:
>>>
>>>> Hi Xiangrui,
>>>>
>>>> A side-by question about MLLib.
>>>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only
>>>> support L2 regurization, the doc explains it: "The L1 regularization by
>>>> using L1Updater
>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
>>>> will not work since the soft-thresholding logic in L1Updater is designed
>>>> for gradient descent."
>>>>
>>>> Since the algorithm comes from Breeze and I noticed Breeze actually
>>>> supports L1 (OWLQN
>>>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering
>>>> if there is some special considerations that current MLLib didn't support
>>>> OWLQN? And any plan to add it?
>>>>
>>>> Thanks for your time!
>>>>
>>>>
>>>>
>>>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <dong128@gmail.com>
>>>> wrote:
>>>>
>>>>> Update.
>>>>>
>>>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which
>>>>> sounds could control the locality. The default value is 0.1 (smaller
value
>>>>> means lower locality). I change it to 1.0 (full locality) and use #3
>>>>> approach, then find a lot improvement (20%~40%). Although the Web UI
still
>>>>> shows the type of task as 'ANY' and the input is from shuffle read, but
the
>>>>> real performance is much better before change this parameter.
>>>>> [image: Inline image 1]
>>>>>
>>>>> I think the benefit includes:
>>>>>
>>>>> 1. This approach keep the physical partition size small, but make each
>>>>> task handle multiple partitions. So the memory requested for
>>>>> deserialization is reduced, which also reduce the GC time. That is exactly
>>>>> what we observed in our job.
>>>>>
>>>>> 2. This approach will not hit the 2G limitation, because it not change
>>>>> the partition size.
>>>>>
>>>>> And I also think that, Spark may change this default value, or at
>>>>> least expose this parameter to users (*CoalescedRDD *is a private
>>>>> class, and *RDD*.*coalesce* also don't have a parameter to control
>>>>> that).
>>>>>
>>>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <mengxr@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sorry, I missed #2. My suggestion is the same as #2. You need to
set a
>>>>>> bigger numPartitions to avoid hitting integer bound or 2G limitation,
>>>>>> at the cost of increased shuffle size per iteration. If you use a
>>>>>> CombineInputFormat and then cache, it will try to give you roughly
the
>>>>>> same size per partition. There will be some remote fetches from HDFS
>>>>>> but still cheaper than calling RDD.repartition().
>>>>>>
>>>>>> For coalesce without shuffle, I don't know how to set the right number
>>>>>> of partitions either ...
>>>>>>
>>>>>> -Xiangrui
>>>>>>
>>>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <dong128@gmail.com>
>>>>>> wrote:
>>>>>> > Hi Xiangrui,
>>>>>> >
>>>>>> > Thanks for your reply!
>>>>>> >
>>>>>> > Yes, our data is very sparse, but RDD.repartition invoke
>>>>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I
think
>>>>>> it has
>>>>>> > the same effect with #2, right?
>>>>>> >
>>>>>> > For CombineInputFormat, although I haven't tried it, but it
sounds
>>>>>> that it
>>>>>> > will combine multiple partitions into a large partition if I
cache
>>>>>> it, so
>>>>>> > same issues as #1?
>>>>>> >
>>>>>> > For coalesce, could you share some best practice how to set
the
>>>>>> right number
>>>>>> > of partitions to avoid locality problem?
>>>>>> >
>>>>>> > Thanks!
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <mengxr@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> Assuming that your data is very sparse, I would recommend
>>>>>> >> RDD.repartition. But if it is not the case and you don't
want to
>>>>>> >> shuffle the data, you can try a CombineInputFormat and then
parse
>>>>>> the
>>>>>> >> lines into labeled points. Coalesce may cause locality problems
if
>>>>>> you
>>>>>> >> didn't use the right number of partitions. -Xiangrui
>>>>>> >>
>>>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <
>>>>>> dong128@gmail.com>
>>>>>> >> wrote:
>>>>>> >> > I think this has the same effect and issue with #1,
right?
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <
>>>>>> chenjiusheng@gmail.com>
>>>>>> >> > wrote:
>>>>>> >> >>
>>>>>> >> >> How about increase HDFS file extent size? like
current value is
>>>>>> 128M,
>>>>>> >> >> we
>>>>>> >> >> make it 512M or bigger.
>>>>>> >> >>
>>>>>> >> >>
>>>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong
<
>>>>>> dong128@gmail.com>
>>>>>> >> >> wrote:
>>>>>> >> >>>
>>>>>> >> >>> Hi all,
>>>>>> >> >>>
>>>>>> >> >>> We are trying to use Spark MLlib to train super
large data
>>>>>> (100M
>>>>>> >> >>> features
>>>>>> >> >>> and 5B rows). The input data in HDFS has ~26K
partitions. By
>>>>>> default,
>>>>>> >> >>> MLlib
>>>>>> >> >>> will create a task for every partition at each
iteration. But
>>>>>> because
>>>>>> >> >>> our
>>>>>> >> >>> dimensions are also very high, such large number
of tasks will
>>>>>> >> >>> increase
>>>>>> >> >>> large network overhead to transfer the weight
vector. So we
>>>>>> want to
>>>>>> >> >>> reduce
>>>>>> >> >>> the number of tasks, we tried below ways:
>>>>>> >> >>>
>>>>>> >> >>> 1. Coalesce partitions without shuffling, then
cache.
>>>>>> >> >>>
>>>>>> >> >>> data.coalesce(numPartitions).cache()
>>>>>> >> >>>
>>>>>> >> >>> This works fine for relative small data, but
when data is
>>>>>> increasing
>>>>>> >> >>> and
>>>>>> >> >>> numPartitions is fixed, the size of one partition
will be
>>>>>> large. This
>>>>>> >> >>> introduces two issues: the first is, the larger
partition will
>>>>>> need
>>>>>> >> >>> larger
>>>>>> >> >>> object and more memory at runtime, and trigger
GC more
>>>>>> frequently; the
>>>>>> >> >>> second is, we meet the issue 'size exceeds
integer.max_value'
>>>>>> error,
>>>>>> >> >>> which
>>>>>> >> >>> seems be caused by the size of one partition
larger than 2G
>>>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>>>>> >> >>>
>>>>>> >> >>> 2. Coalesce partitions with shuffling, then
cache.
>>>>>> >> >>>
>>>>>> >> >>> data.coalesce(numPartitions, true).cache()
>>>>>> >> >>>
>>>>>> >> >>> It could mitigate the second issue in #1 at
some degree, but
>>>>>> fist
>>>>>> >> >>> issue
>>>>>> >> >>> is still there, and it also will introduce
large amount of
>>>>>> shullfling.
>>>>>> >> >>>
>>>>>> >> >>> 3. Cache data first, and coalesce partitions.
>>>>>> >> >>>
>>>>>> >> >>> data.cache().coalesce(numPartitions)
>>>>>> >> >>>
>>>>>> >> >>> In this way, the number of cached partitions
is not change,
>>>>>> but each
>>>>>> >> >>> task
>>>>>> >> >>> read the data from multiple partitions. However,
I find the
>>>>>> task will
>>>>>> >> >>> loss
>>>>>> >> >>> locality by this way. I find a lot of 'ANY'
tasks, that means
>>>>>> that
>>>>>> >> >>> tasks
>>>>>> >> >>> read data from other nodes, and become slower
than that read
>>>>>> data from
>>>>>> >> >>> local
>>>>>> >> >>> memory.
>>>>>> >> >>>
>>>>>> >> >>> I think the best way should like #3, but leverage
locality as
>>>>>> more as
>>>>>> >> >>> possible. Is there any way to do that? Any
suggestions?
>>>>>> >> >>>
>>>>>> >> >>> Thanks!
>>>>>> >> >>>
>>>>>> >> >>> --
>>>>>> >> >>> ZHENG, Xu-dong
>>>>>> >> >>>
>>>>>> >> >>
>>>>>> >> >
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > --
>>>>>> >> > 郑旭东
>>>>>> >> > ZHENG, Xu-dong
>>>>>> >> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > 郑旭东
>>>>>> > ZHENG, Xu-dong
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> 郑旭东
>>>>> ZHENG, Xu-dong
>>>>>
>>>>>
>>>>
>>
>

Mime
View raw message