spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guillaume Pitel <guillaume.pi...@exensa.com>
Subject Re: Directly broadcasting (sort of) RDDs
Date Mon, 23 Mar 2015 12:00:36 GMT
Not far, but not exactly. The RDD could be too big to fit in memory,

The idea is more like a worker-side rdd.lookup() with local cache.

Guillaume
> In a sentence, is this the idea of collecting an RDD to memory on each
> executor directly?
>
> On Sun, Mar 22, 2015 at 10:56 PM, Sandy Ryza <sandy.ryza@cloudera.com> wrote:
>> Hi Guillaume,
>>
>> I've long thought something like this would be useful - i.e. the ability to
>> broadcast RDDs directly without first pulling data through the driver.  If I
>> understand correctly, your requirement to "block" a matrix up and only fetch
>> the needed parts could be implemented on top of this by splitting an RDD
>> into a set of smaller RDDs and then broadcasting each one on its own.
>>
>> Unfortunately nobody is working on this currently (and I couldn't promise to
>> have bandwidth to review it at the moment either), but I suspect we'll
>> eventually need to add something like this for map joins in Hive on Spark
>> and Spark SQL.
>>
>> -Sandy
>>
>>
>>
>> On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel
>> <guillaume.pitel@exensa.com> wrote:
>>> Hi,
>>>
>>> Thanks for your answer. This is precisely the use case I'm interested in,
>>> but I know it already, I should have mentionned it. Unfortunately this
>>> implementation of BlockMatrix has (in my opinion) some disadvantages (the
>>> fact that it split the matrix by range instead of using a modulo is bad for
>>> block skewness). Besides, and more importantly, as I was writing, it uses
>>> the join solution (actually a cogroup :
>>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala,
>>> line 361). The reduplication of the elements of the dense matrix is thus
>>> dependent on the block size.
>>>
>>> Actually I'm wondering if what I want to achieve could be made with a
>>> simple modification to the join, allowing a partition to be weakly cached
>>> wafter being retrieved.
>>>
>>> Guillaume
>>>
>>>
>>> There is block matrix in Spark 1.3 -
>>> http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix
>>>
>>>
>>>
>>>
>>>
>>> However I believe it only supports dense matrix blocks.
>>>
>>>
>>>
>>>
>>> Still, might be possible to use it or exetend
>>>
>>>
>>>
>>>
>>> JIRAs:
>>>
>>>
>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434
>>>
>>>
>>>
>>>
>>>
>>> Was based on
>>>
>>>
>>> https://github.com/amplab/ml-matrix
>>>
>>>
>>>
>>>
>>>
>>> Another lib:
>>>
>>>
>>> https://github.com/PasaLab/marlin/blob/master/README.md
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> —
>>> Sent from Mailbox
>>>
>>> On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel
>>> <guillaume.pitel@exensa.com> wrote:
>>>
>>> Hi,
>>> I have an idea that I would like to discuss with the Spark devs. The
>>> idea comes from a very real problem that I have struggled with since
>>> almost a year. My problem is very simple, it's a dense matrix * sparse
>>> matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
>>> divided in X large blocks (one block per partition), and a sparse matrix
>>> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
>>> most efficient way to perform the operation is to collectAsMap() the
>>> dense matrix and broadcast it, then perform the block-local
>>> mutliplications, and combine the results by column.
>>> This is quite fine, unless the matrix is too big to fit in memory
>>> (especially since the multiplication is performed several times
>>> iteratively, and the broadcasts are not always cleaned from memory as I
>>> would naively expect).
>>> When the dense matrix is too big, a second solution is to split the big
>>> sparse matrix in several RDD, and do several broadcasts. Doing this
>>> creates quite a big overhead, but it mostly works, even though I often
>>> face some problems with unaccessible broadcast files, for instance.
>>> Then there is the terrible but apparently very effective good old join.
>>> Since X blocks of the sparse matrix use the same block from the dense
>>> matrix, I suspect that the dense matrix is somehow replicated X times
>>> (either on disk or in the network), which is the reason why the join
>>> takes so much time.
>>> After this bit of a context, here is my idea : would it be possible to
>>> somehow "broadcast" (or maybe more accurately, share or serve) a
>>> persisted RDD which is distributed on all workers, in a way that would,
>>> a bit like the IndexedRDD, allow a task to access a partition or an
>>> element of a partition in the closure, with a worker-local memory cache
>>> . i.e. the information about where each block resides would be
>>> distributed on the workers, to allow them to access parts of the RDD
>>> directly. I think that's already a bit how RDD are shuffled ?
>>> The RDD could stay distributed (no need to collect then broadcast), and
>>> only necessary transfers would be required.
>>> Is this a bad idea, is it already implemented somewhere (I would love it
>>> !) ?or is it something that could add efficiency not only for my use
>>> case, but maybe for others ? Could someone give me some hint about how I
>>> could add this possibility to Spark ? I would probably try to extend a
>>> RDD into a specific SharedIndexedRDD with a special lookup that would be
>>> allowed from tasks as a special case, and that would try to contact the
>>> blockManager and reach the corresponding data from the right worker.
>>> Thanks in advance for your advices
>>> Guillaume
>>> --
>>> eXenSa
>>> 	
>>> *Guillaume PITEL, Président*
>>> +33(0)626 222 431
>>> eXenSa S.A.S. <http://www.exensa.com/>
>>> 41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>>>
>>>
>>>
>>> --
>>> Guillaume PITEL, Président
>>> +33(0)626 222 431
>>>
>>> eXenSa S.A.S.
>>> 41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>


-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705


Mime
View raw message