spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <so...@cloudera.com>
Subject Re: Directly broadcasting (sort of) RDDs
Date Mon, 23 Mar 2015 00:42:10 GMT
In a sentence, is this the idea of collecting an RDD to memory on each
executor directly?

On Sun, Mar 22, 2015 at 10:56 PM, Sandy Ryza <sandy.ryza@cloudera.com> wrote:
> Hi Guillaume,
>
> I've long thought something like this would be useful - i.e. the ability to
> broadcast RDDs directly without first pulling data through the driver.  If I
> understand correctly, your requirement to "block" a matrix up and only fetch
> the needed parts could be implemented on top of this by splitting an RDD
> into a set of smaller RDDs and then broadcasting each one on its own.
>
> Unfortunately nobody is working on this currently (and I couldn't promise to
> have bandwidth to review it at the moment either), but I suspect we'll
> eventually need to add something like this for map joins in Hive on Spark
> and Spark SQL.
>
> -Sandy
>
>
>
> On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel
> <guillaume.pitel@exensa.com> wrote:
>>
>> Hi,
>>
>> Thanks for your answer. This is precisely the use case I'm interested in,
>> but I know it already, I should have mentionned it. Unfortunately this
>> implementation of BlockMatrix has (in my opinion) some disadvantages (the
>> fact that it split the matrix by range instead of using a modulo is bad for
>> block skewness). Besides, and more importantly, as I was writing, it uses
>> the join solution (actually a cogroup :
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala,
>> line 361). The reduplication of the elements of the dense matrix is thus
>> dependent on the block size.
>>
>> Actually I'm wondering if what I want to achieve could be made with a
>> simple modification to the join, allowing a partition to be weakly cached
>> wafter being retrieved.
>>
>> Guillaume
>>
>>
>> There is block matrix in Spark 1.3 -
>> http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix
>>
>>
>>
>>
>>
>> However I believe it only supports dense matrix blocks.
>>
>>
>>
>>
>> Still, might be possible to use it or exetend
>>
>>
>>
>>
>> JIRAs:
>>
>>
>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434
>>
>>
>>
>>
>>
>> Was based on
>>
>>
>> https://github.com/amplab/ml-matrix
>>
>>
>>
>>
>>
>> Another lib:
>>
>>
>> https://github.com/PasaLab/marlin/blob/master/README.md
>>
>>
>>
>>
>>
>>
>>
>> —
>> Sent from Mailbox
>>
>> On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel
>> <guillaume.pitel@exensa.com> wrote:
>>
>> Hi,
>> I have an idea that I would like to discuss with the Spark devs. The
>> idea comes from a very real problem that I have struggled with since
>> almost a year. My problem is very simple, it's a dense matrix * sparse
>> matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
>> divided in X large blocks (one block per partition), and a sparse matrix
>> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
>> most efficient way to perform the operation is to collectAsMap() the
>> dense matrix and broadcast it, then perform the block-local
>> mutliplications, and combine the results by column.
>> This is quite fine, unless the matrix is too big to fit in memory
>> (especially since the multiplication is performed several times
>> iteratively, and the broadcasts are not always cleaned from memory as I
>> would naively expect).
>> When the dense matrix is too big, a second solution is to split the big
>> sparse matrix in several RDD, and do several broadcasts. Doing this
>> creates quite a big overhead, but it mostly works, even though I often
>> face some problems with unaccessible broadcast files, for instance.
>> Then there is the terrible but apparently very effective good old join.
>> Since X blocks of the sparse matrix use the same block from the dense
>> matrix, I suspect that the dense matrix is somehow replicated X times
>> (either on disk or in the network), which is the reason why the join
>> takes so much time.
>> After this bit of a context, here is my idea : would it be possible to
>> somehow "broadcast" (or maybe more accurately, share or serve) a
>> persisted RDD which is distributed on all workers, in a way that would,
>> a bit like the IndexedRDD, allow a task to access a partition or an
>> element of a partition in the closure, with a worker-local memory cache
>> . i.e. the information about where each block resides would be
>> distributed on the workers, to allow them to access parts of the RDD
>> directly. I think that's already a bit how RDD are shuffled ?
>> The RDD could stay distributed (no need to collect then broadcast), and
>> only necessary transfers would be required.
>> Is this a bad idea, is it already implemented somewhere (I would love it
>> !) ?or is it something that could add efficiency not only for my use
>> case, but maybe for others ? Could someone give me some hint about how I
>> could add this possibility to Spark ? I would probably try to extend a
>> RDD into a specific SharedIndexedRDD with a special lookup that would be
>> allowed from tasks as a special case, and that would try to contact the
>> blockManager and reach the corresponding data from the right worker.
>> Thanks in advance for your advices
>> Guillaume
>> --
>> eXenSa
>> 	
>> *Guillaume PITEL, Président*
>> +33(0)626 222 431
>> eXenSa S.A.S. <http://www.exensa.com/>
>> 41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>>
>>
>>
>> --
>> Guillaume PITEL, Président
>> +33(0)626 222 431
>>
>> eXenSa S.A.S.
>> 41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Mime
View raw message