spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jiusheng Chen <chenjiush...@gmail.com>
Subject Re: Is there any way to control the parallelism in LogisticRegression
Date Fri, 05 Sep 2014 03:06:03 GMT
Thanks DB.
Did you mean this?

spark.rdd.compress      true


On Thu, Sep 4, 2014 at 2:48 PM, DB Tsai <dbtsai@dbtsai.com> wrote:

> For saving the memory, I recommend you compress the cached RDD, and it
> will be couple times smaller than original data sets.
>
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Wed, Sep 3, 2014 at 9:28 PM, Jiusheng Chen <chenjiusheng@gmail.com>
> wrote:
>
>> Thanks DB and Xiangrui. Glad to know you guys are actively working on it.
>>
>> Another thing, did we evaluate the loss of using Float to store values?
>> currently it is Double. Use fewer bits has the benifit of memory footprint
>> reduction. According to Google, they even uses 16 bits (a special
>> encoding scheme called q2.13)
>> <http://jmlr.org/proceedings/papers/v28/golovin13.pdf> in their learner
>> without measurable loss, and can save 75% memory.
>>
>>
>> On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai <dbtsai@dbtsai.com> wrote:
>>
>>> With David's help today, we were able to implement elastic net glm in
>>> Spark. It's surprising easy, and with just some modification in breeze's
>>> OWLQN code, it just works without further investigation.
>>>
>>> We did benchmark, and the coefficients are within 0.5% differences
>>> compared with R's glmnet package. I guess this is first truly distributed
>>> glmnet implementation.
>>>
>>> Still require some effort to have it in mllib; mostly api cleanup work.
>>>
>>> 1) I'll submit a PR to breeze which implements weighted regularization
>>> in OWLQN.
>>> 2) This also depends on https://issues.apache.org/jira/browse/SPARK-2505
>>> which we have internal version requiring some cleanup for open source
>>> project.
>>>
>>>
>>> Sincerely,
>>>
>>> DB Tsai
>>> -------------------------------------------------------
>>> My Blog: https://www.dbtsai.com
>>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>>
>>>
>>> On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <mengxr@gmail.com> wrote:
>>>
>>>> +DB & David (They implemented QWLQN on Spark today.)
>>>> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <chenjiusheng@gmail.com> wrote:
>>>>
>>>>> Hi Xiangrui,
>>>>>
>>>>> A side-by question about MLLib.
>>>>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only
>>>>> support L2 regurization, the doc explains it: "The L1 regularization
by
>>>>> using L1Updater
>>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
>>>>> will not work since the soft-thresholding logic in L1Updater is designed
>>>>> for gradient descent."
>>>>>
>>>>> Since the algorithm comes from Breeze and I noticed Breeze actually
>>>>> supports L1 (OWLQN
>>>>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering
>>>>> if there is some special considerations that current MLLib didn't support
>>>>> OWLQN? And any plan to add it?
>>>>>
>>>>> Thanks for your time!
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <dong128@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Update.
>>>>>>
>>>>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which
>>>>>> sounds could control the locality. The default value is 0.1 (smaller
value
>>>>>> means lower locality). I change it to 1.0 (full locality) and use
#3
>>>>>> approach, then find a lot improvement (20%~40%). Although the Web
UI still
>>>>>> shows the type of task as 'ANY' and the input is from shuffle read,
but the
>>>>>> real performance is much better before change this parameter.
>>>>>> [image: Inline image 1]
>>>>>>
>>>>>> I think the benefit includes:
>>>>>>
>>>>>> 1. This approach keep the physical partition size small, but make
>>>>>> each task handle multiple partitions. So the memory requested for
>>>>>> deserialization is reduced, which also reduce the GC time. That is
exactly
>>>>>> what we observed in our job.
>>>>>>
>>>>>> 2. This approach will not hit the 2G limitation, because it not
>>>>>> change the partition size.
>>>>>>
>>>>>> And I also think that, Spark may change this default value, or at
>>>>>> least expose this parameter to users (*CoalescedRDD *is a private
>>>>>> class, and *RDD*.*coalesce* also don't have a parameter to control
>>>>>> that).
>>>>>>
>>>>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <mengxr@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Sorry, I missed #2. My suggestion is the same as #2. You need
to set
>>>>>>> a
>>>>>>> bigger numPartitions to avoid hitting integer bound or 2G limitation,
>>>>>>> at the cost of increased shuffle size per iteration. If you use
a
>>>>>>> CombineInputFormat and then cache, it will try to give you roughly
>>>>>>> the
>>>>>>> same size per partition. There will be some remote fetches from
HDFS
>>>>>>> but still cheaper than calling RDD.repartition().
>>>>>>>
>>>>>>> For coalesce without shuffle, I don't know how to set the right
>>>>>>> number
>>>>>>> of partitions either ...
>>>>>>>
>>>>>>> -Xiangrui
>>>>>>>
>>>>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <dong128@gmail.com>
>>>>>>> wrote:
>>>>>>> > Hi Xiangrui,
>>>>>>> >
>>>>>>> > Thanks for your reply!
>>>>>>> >
>>>>>>> > Yes, our data is very sparse, but RDD.repartition invoke
>>>>>>> > RDD.coalesce(numPartitions, shuffle = true) internally,
so I think
>>>>>>> it has
>>>>>>> > the same effect with #2, right?
>>>>>>> >
>>>>>>> > For CombineInputFormat, although I haven't tried it, but
it sounds
>>>>>>> that it
>>>>>>> > will combine multiple partitions into a large partition
if I cache
>>>>>>> it, so
>>>>>>> > same issues as #1?
>>>>>>> >
>>>>>>> > For coalesce, could you share some best practice how to
set the
>>>>>>> right number
>>>>>>> > of partitions to avoid locality problem?
>>>>>>> >
>>>>>>> > Thanks!
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <mengxr@gmail.com>
>>>>>>> wrote:
>>>>>>> >>
>>>>>>> >> Assuming that your data is very sparse, I would recommend
>>>>>>> >> RDD.repartition. But if it is not the case and you don't
want to
>>>>>>> >> shuffle the data, you can try a CombineInputFormat and
then parse
>>>>>>> the
>>>>>>> >> lines into labeled points. Coalesce may cause locality
problems
>>>>>>> if you
>>>>>>> >> didn't use the right number of partitions. -Xiangrui
>>>>>>> >>
>>>>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <
>>>>>>> dong128@gmail.com>
>>>>>>> >> wrote:
>>>>>>> >> > I think this has the same effect and issue with
#1, right?
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen
<
>>>>>>> chenjiusheng@gmail.com>
>>>>>>> >> > wrote:
>>>>>>> >> >>
>>>>>>> >> >> How about increase HDFS file extent size? like
current value
>>>>>>> is 128M,
>>>>>>> >> >> we
>>>>>>> >> >> make it 512M or bigger.
>>>>>>> >> >>
>>>>>>> >> >>
>>>>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong
<
>>>>>>> dong128@gmail.com>
>>>>>>> >> >> wrote:
>>>>>>> >> >>>
>>>>>>> >> >>> Hi all,
>>>>>>> >> >>>
>>>>>>> >> >>> We are trying to use Spark MLlib to train
super large data
>>>>>>> (100M
>>>>>>> >> >>> features
>>>>>>> >> >>> and 5B rows). The input data in HDFS has
~26K partitions. By
>>>>>>> default,
>>>>>>> >> >>> MLlib
>>>>>>> >> >>> will create a task for every partition
at each iteration. But
>>>>>>> because
>>>>>>> >> >>> our
>>>>>>> >> >>> dimensions are also very high, such large
number of tasks will
>>>>>>> >> >>> increase
>>>>>>> >> >>> large network overhead to transfer the
weight vector. So we
>>>>>>> want to
>>>>>>> >> >>> reduce
>>>>>>> >> >>> the number of tasks, we tried below ways:
>>>>>>> >> >>>
>>>>>>> >> >>> 1. Coalesce partitions without shuffling,
then cache.
>>>>>>> >> >>>
>>>>>>> >> >>> data.coalesce(numPartitions).cache()
>>>>>>> >> >>>
>>>>>>> >> >>> This works fine for relative small data,
but when data is
>>>>>>> increasing
>>>>>>> >> >>> and
>>>>>>> >> >>> numPartitions is fixed, the size of one
partition will be
>>>>>>> large. This
>>>>>>> >> >>> introduces two issues: the first is, the
larger partition
>>>>>>> will need
>>>>>>> >> >>> larger
>>>>>>> >> >>> object and more memory at runtime, and
trigger GC more
>>>>>>> frequently; the
>>>>>>> >> >>> second is, we meet the issue 'size exceeds
integer.max_value'
>>>>>>> error,
>>>>>>> >> >>> which
>>>>>>> >> >>> seems be caused by the size of one partition
larger than 2G
>>>>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>>>>>> >> >>>
>>>>>>> >> >>> 2. Coalesce partitions with shuffling,
then cache.
>>>>>>> >> >>>
>>>>>>> >> >>> data.coalesce(numPartitions, true).cache()
>>>>>>> >> >>>
>>>>>>> >> >>> It could mitigate the second issue in #1
at some degree, but
>>>>>>> fist
>>>>>>> >> >>> issue
>>>>>>> >> >>> is still there, and it also will introduce
large amount of
>>>>>>> shullfling.
>>>>>>> >> >>>
>>>>>>> >> >>> 3. Cache data first, and coalesce partitions.
>>>>>>> >> >>>
>>>>>>> >> >>> data.cache().coalesce(numPartitions)
>>>>>>> >> >>>
>>>>>>> >> >>> In this way, the number of cached partitions
is not change,
>>>>>>> but each
>>>>>>> >> >>> task
>>>>>>> >> >>> read the data from multiple partitions.
However, I find the
>>>>>>> task will
>>>>>>> >> >>> loss
>>>>>>> >> >>> locality by this way. I find a lot of 'ANY'
tasks, that means
>>>>>>> that
>>>>>>> >> >>> tasks
>>>>>>> >> >>> read data from other nodes, and become
slower than that read
>>>>>>> data from
>>>>>>> >> >>> local
>>>>>>> >> >>> memory.
>>>>>>> >> >>>
>>>>>>> >> >>> I think the best way should like #3, but
leverage locality as
>>>>>>> more as
>>>>>>> >> >>> possible. Is there any way to do that?
Any suggestions?
>>>>>>> >> >>>
>>>>>>> >> >>> Thanks!
>>>>>>> >> >>>
>>>>>>> >> >>> --
>>>>>>> >> >>> ZHENG, Xu-dong
>>>>>>> >> >>>
>>>>>>> >> >>
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > --
>>>>>>> >> > 郑旭东
>>>>>>> >> > ZHENG, Xu-dong
>>>>>>> >> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > --
>>>>>>> > 郑旭东
>>>>>>> > ZHENG, Xu-dong
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> 郑旭东
>>>>>> ZHENG, Xu-dong
>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Mime
View raw message