spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From DB Tsai <dbt...@dbtsai.com>
Subject Re: Is there any way to control the parallelism in LogisticRegression
Date Sat, 06 Sep 2014 19:30:36 GMT
Yes. But you need to store RDD as *serialized* Java objects. See the
session of storage level
http://spark.apache.org/docs/latest/programming-guide.html

Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Thu, Sep 4, 2014 at 8:06 PM, Jiusheng Chen <chenjiusheng@gmail.com>
wrote:

> Thanks DB.
> Did you mean this?
>
> spark.rdd.compress      true
>
>
> On Thu, Sep 4, 2014 at 2:48 PM, DB Tsai <dbtsai@dbtsai.com> wrote:
>
>> For saving the memory, I recommend you compress the cached RDD, and it
>> will be couple times smaller than original data sets.
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> -------------------------------------------------------
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Wed, Sep 3, 2014 at 9:28 PM, Jiusheng Chen <chenjiusheng@gmail.com>
>> wrote:
>>
>>> Thanks DB and Xiangrui. Glad to know you guys are actively working on it.
>>>
>>> Another thing, did we evaluate the loss of using Float to store values?
>>> currently it is Double. Use fewer bits has the benifit of memory footprint
>>> reduction. According to Google, they even uses 16 bits (a special
>>> encoding scheme called q2.13)
>>> <http://jmlr.org/proceedings/papers/v28/golovin13.pdf> in their learner
>>> without measurable loss, and can save 75% memory.
>>>
>>>
>>> On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai <dbtsai@dbtsai.com> wrote:
>>>
>>>> With David's help today, we were able to implement elastic net glm in
>>>> Spark. It's surprising easy, and with just some modification in breeze's
>>>> OWLQN code, it just works without further investigation.
>>>>
>>>> We did benchmark, and the coefficients are within 0.5% differences
>>>> compared with R's glmnet package. I guess this is first truly distributed
>>>> glmnet implementation.
>>>>
>>>> Still require some effort to have it in mllib; mostly api cleanup work.
>>>>
>>>> 1) I'll submit a PR to breeze which implements weighted regularization
>>>> in OWLQN.
>>>> 2) This also depends on
>>>> https://issues.apache.org/jira/browse/SPARK-2505 which we have
>>>> internal version requiring some cleanup for open source project.
>>>>
>>>>
>>>> Sincerely,
>>>>
>>>> DB Tsai
>>>> -------------------------------------------------------
>>>> My Blog: https://www.dbtsai.com
>>>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>>>
>>>>
>>>> On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <mengxr@gmail.com> wrote:
>>>>
>>>>> +DB & David (They implemented QWLQN on Spark today.)
>>>>> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <chenjiusheng@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Xiangrui,
>>>>>>
>>>>>> A side-by question about MLLib.
>>>>>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only
>>>>>> support L2 regurization, the doc explains it: "The L1 regularization
by
>>>>>> using L1Updater
>>>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
>>>>>> will not work since the soft-thresholding logic in L1Updater is designed
>>>>>> for gradient descent."
>>>>>>
>>>>>> Since the algorithm comes from Breeze and I noticed Breeze actually
>>>>>> supports L1 (OWLQN
>>>>>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>).
Wondering
>>>>>> if there is some special considerations that current MLLib didn't
support
>>>>>> OWLQN? And any plan to add it?
>>>>>>
>>>>>> Thanks for your time!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <dong128@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Update.
>>>>>>>
>>>>>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*,
>>>>>>> which sounds could control the locality. The default value is
0.1 (smaller
>>>>>>> value means lower locality). I change it to 1.0 (full locality)
and use #3
>>>>>>> approach, then find a lot improvement (20%~40%). Although the
Web UI still
>>>>>>> shows the type of task as 'ANY' and the input is from shuffle
read, but the
>>>>>>> real performance is much better before change this parameter.
>>>>>>> [image: Inline image 1]
>>>>>>>
>>>>>>> I think the benefit includes:
>>>>>>>
>>>>>>> 1. This approach keep the physical partition size small, but
make
>>>>>>> each task handle multiple partitions. So the memory requested
for
>>>>>>> deserialization is reduced, which also reduce the GC time. That
is exactly
>>>>>>> what we observed in our job.
>>>>>>>
>>>>>>> 2. This approach will not hit the 2G limitation, because it not
>>>>>>> change the partition size.
>>>>>>>
>>>>>>> And I also think that, Spark may change this default value, or
at
>>>>>>> least expose this parameter to users (*CoalescedRDD *is a private
>>>>>>> class, and *RDD*.*coalesce* also don't have a parameter to control
>>>>>>> that).
>>>>>>>
>>>>>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <mengxr@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Sorry, I missed #2. My suggestion is the same as #2. You
need to
>>>>>>>> set a
>>>>>>>> bigger numPartitions to avoid hitting integer bound or 2G
>>>>>>>> limitation,
>>>>>>>> at the cost of increased shuffle size per iteration. If you
use a
>>>>>>>> CombineInputFormat and then cache, it will try to give you
roughly
>>>>>>>> the
>>>>>>>> same size per partition. There will be some remote fetches
from HDFS
>>>>>>>> but still cheaper than calling RDD.repartition().
>>>>>>>>
>>>>>>>> For coalesce without shuffle, I don't know how to set the
right
>>>>>>>> number
>>>>>>>> of partitions either ...
>>>>>>>>
>>>>>>>> -Xiangrui
>>>>>>>>
>>>>>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <dong128@gmail.com>
>>>>>>>> wrote:
>>>>>>>> > Hi Xiangrui,
>>>>>>>> >
>>>>>>>> > Thanks for your reply!
>>>>>>>> >
>>>>>>>> > Yes, our data is very sparse, but RDD.repartition invoke
>>>>>>>> > RDD.coalesce(numPartitions, shuffle = true) internally,
so I
>>>>>>>> think it has
>>>>>>>> > the same effect with #2, right?
>>>>>>>> >
>>>>>>>> > For CombineInputFormat, although I haven't tried it,
but it
>>>>>>>> sounds that it
>>>>>>>> > will combine multiple partitions into a large partition
if I
>>>>>>>> cache it, so
>>>>>>>> > same issues as #1?
>>>>>>>> >
>>>>>>>> > For coalesce, could you share some best practice how
to set the
>>>>>>>> right number
>>>>>>>> > of partitions to avoid locality problem?
>>>>>>>> >
>>>>>>>> > Thanks!
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <mengxr@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >>
>>>>>>>> >> Assuming that your data is very sparse, I would
recommend
>>>>>>>> >> RDD.repartition. But if it is not the case and you
don't want to
>>>>>>>> >> shuffle the data, you can try a CombineInputFormat
and then
>>>>>>>> parse the
>>>>>>>> >> lines into labeled points. Coalesce may cause locality
problems
>>>>>>>> if you
>>>>>>>> >> didn't use the right number of partitions. -Xiangrui
>>>>>>>> >>
>>>>>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong
<
>>>>>>>> dong128@gmail.com>
>>>>>>>> >> wrote:
>>>>>>>> >> > I think this has the same effect and issue
with #1, right?
>>>>>>>> >> >
>>>>>>>> >> >
>>>>>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen
<
>>>>>>>> chenjiusheng@gmail.com>
>>>>>>>> >> > wrote:
>>>>>>>> >> >>
>>>>>>>> >> >> How about increase HDFS file extent size?
like current value
>>>>>>>> is 128M,
>>>>>>>> >> >> we
>>>>>>>> >> >> make it 512M or bigger.
>>>>>>>> >> >>
>>>>>>>> >> >>
>>>>>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG,
Xu-dong <
>>>>>>>> dong128@gmail.com>
>>>>>>>> >> >> wrote:
>>>>>>>> >> >>>
>>>>>>>> >> >>> Hi all,
>>>>>>>> >> >>>
>>>>>>>> >> >>> We are trying to use Spark MLlib to
train super large data
>>>>>>>> (100M
>>>>>>>> >> >>> features
>>>>>>>> >> >>> and 5B rows). The input data in HDFS
has ~26K partitions. By
>>>>>>>> default,
>>>>>>>> >> >>> MLlib
>>>>>>>> >> >>> will create a task for every partition
at each iteration.
>>>>>>>> But because
>>>>>>>> >> >>> our
>>>>>>>> >> >>> dimensions are also very high, such
large number of tasks
>>>>>>>> will
>>>>>>>> >> >>> increase
>>>>>>>> >> >>> large network overhead to transfer
the weight vector. So we
>>>>>>>> want to
>>>>>>>> >> >>> reduce
>>>>>>>> >> >>> the number of tasks, we tried below
ways:
>>>>>>>> >> >>>
>>>>>>>> >> >>> 1. Coalesce partitions without shuffling,
then cache.
>>>>>>>> >> >>>
>>>>>>>> >> >>> data.coalesce(numPartitions).cache()
>>>>>>>> >> >>>
>>>>>>>> >> >>> This works fine for relative small
data, but when data is
>>>>>>>> increasing
>>>>>>>> >> >>> and
>>>>>>>> >> >>> numPartitions is fixed, the size of
one partition will be
>>>>>>>> large. This
>>>>>>>> >> >>> introduces two issues: the first is,
the larger partition
>>>>>>>> will need
>>>>>>>> >> >>> larger
>>>>>>>> >> >>> object and more memory at runtime,
and trigger GC more
>>>>>>>> frequently; the
>>>>>>>> >> >>> second is, we meet the issue 'size
exceeds
>>>>>>>> integer.max_value' error,
>>>>>>>> >> >>> which
>>>>>>>> >> >>> seems be caused by the size of one
partition larger than 2G
>>>>>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>>>>>>> >> >>>
>>>>>>>> >> >>> 2. Coalesce partitions with shuffling,
then cache.
>>>>>>>> >> >>>
>>>>>>>> >> >>> data.coalesce(numPartitions, true).cache()
>>>>>>>> >> >>>
>>>>>>>> >> >>> It could mitigate the second issue
in #1 at some degree, but
>>>>>>>> fist
>>>>>>>> >> >>> issue
>>>>>>>> >> >>> is still there, and it also will introduce
large amount of
>>>>>>>> shullfling.
>>>>>>>> >> >>>
>>>>>>>> >> >>> 3. Cache data first, and coalesce partitions.
>>>>>>>> >> >>>
>>>>>>>> >> >>> data.cache().coalesce(numPartitions)
>>>>>>>> >> >>>
>>>>>>>> >> >>> In this way, the number of cached partitions
is not change,
>>>>>>>> but each
>>>>>>>> >> >>> task
>>>>>>>> >> >>> read the data from multiple partitions.
However, I find the
>>>>>>>> task will
>>>>>>>> >> >>> loss
>>>>>>>> >> >>> locality by this way. I find a lot
of 'ANY' tasks, that
>>>>>>>> means that
>>>>>>>> >> >>> tasks
>>>>>>>> >> >>> read data from other nodes, and become
slower than that read
>>>>>>>> data from
>>>>>>>> >> >>> local
>>>>>>>> >> >>> memory.
>>>>>>>> >> >>>
>>>>>>>> >> >>> I think the best way should like #3,
but leverage locality
>>>>>>>> as more as
>>>>>>>> >> >>> possible. Is there any way to do that?
Any suggestions?
>>>>>>>> >> >>>
>>>>>>>> >> >>> Thanks!
>>>>>>>> >> >>>
>>>>>>>> >> >>> --
>>>>>>>> >> >>> ZHENG, Xu-dong
>>>>>>>> >> >>>
>>>>>>>> >> >>
>>>>>>>> >> >
>>>>>>>> >> >
>>>>>>>> >> >
>>>>>>>> >> > --
>>>>>>>> >> > 郑旭东
>>>>>>>> >> > ZHENG, Xu-dong
>>>>>>>> >> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > --
>>>>>>>> > 郑旭东
>>>>>>>> > ZHENG, Xu-dong
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> 郑旭东
>>>>>>> ZHENG, Xu-dong
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Mime
View raw message