spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sandy Ryza <sandy.r...@cloudera.com>
Subject Re: Directly broadcasting (sort of) RDDs
Date Sun, 22 Mar 2015 22:56:04 GMT
Hi Guillaume,

I've long thought something like this would be useful - i.e. the ability to
broadcast RDDs directly without first pulling data through the driver.  If
I understand correctly, your requirement to "block" a matrix up and only
fetch the needed parts could be implemented on top of this by splitting an
RDD into a set of smaller RDDs and then broadcasting each one on its own.

Unfortunately nobody is working on this currently (and I couldn't promise
to have bandwidth to review it at the moment either), but I suspect we'll
eventually need to add something like this for map joins in Hive on Spark
and Spark SQL.

-Sandy



On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel <guillaume.pitel@exensa.com
> wrote:

>  Hi,
>
> Thanks for your answer. This is precisely the use case I'm interested in,
> but I know it already, I should have mentionned it. Unfortunately this
> implementation of BlockMatrix has (in my opinion) some disadvantages (the
> fact that it split the matrix by range instead of using a modulo is bad for
> block skewness). Besides, and more importantly, as I was writing, it uses
> the join solution (actually a cogroup :
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala,
> line 361). The reduplication of the elements of the dense matrix is thus
> dependent on the block size.
>
> Actually I'm wondering if what I want to achieve could be made with a
> simple modification to the join, allowing a partition to be weakly cached
> wafter being retrieved.
>
> Guillaume
>
>
>  There is block matrix in Spark 1.3 - http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix
>
>
>
>
>
> However I believe it only supports dense matrix blocks.
>
>
>
>
> Still, might be possible to use it or exetend
>
>
>
>
> JIRAs:
>
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434
>
>
>
>
>
> Was based on
>
> https://github.com/amplab/ml-matrix
>
>
>
>
>
> Another lib:
>
> https://github.com/PasaLab/marlin/blob/master/README.md
>
>
>
>
>
>
>
> —
> Sent from Mailbox
>
> On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel<guillaume.pitel@exensa.com> <guillaume.pitel@exensa.com>
wrote:
>
>
>  Hi,
> I have an idea that I would like to discuss with the Spark devs. The
> idea comes from a very real problem that I have struggled with since
> almost a year. My problem is very simple, it's a dense matrix * sparse
> matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
> divided in X large blocks (one block per partition), and a sparse matrix
> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
> most efficient way to perform the operation is to collectAsMap() the
> dense matrix and broadcast it, then perform the block-local
> mutliplications, and combine the results by column.
> This is quite fine, unless the matrix is too big to fit in memory
> (especially since the multiplication is performed several times
> iteratively, and the broadcasts are not always cleaned from memory as I
> would naively expect).
> When the dense matrix is too big, a second solution is to split the big
> sparse matrix in several RDD, and do several broadcasts. Doing this
> creates quite a big overhead, but it mostly works, even though I often
> face some problems with unaccessible broadcast files, for instance.
> Then there is the terrible but apparently very effective good old join.
> Since X blocks of the sparse matrix use the same block from the dense
> matrix, I suspect that the dense matrix is somehow replicated X times
> (either on disk or in the network), which is the reason why the join
> takes so much time.
> After this bit of a context, here is my idea : would it be possible to
> somehow "broadcast" (or maybe more accurately, share or serve) a
> persisted RDD which is distributed on all workers, in a way that would,
> a bit like the IndexedRDD, allow a task to access a partition or an
> element of a partition in the closure, with a worker-local memory cache
> . i.e. the information about where each block resides would be
> distributed on the workers, to allow them to access parts of the RDD
> directly. I think that's already a bit how RDD are shuffled ?
> The RDD could stay distributed (no need to collect then broadcast), and
> only necessary transfers would be required.
> Is this a bad idea, is it already implemented somewhere (I would love it
> !) ?or is it something that could add efficiency not only for my use
> case, but maybe for others ? Could someone give me some hint about how I
> could add this possibility to Spark ? I would probably try to extend a
> RDD into a specific SharedIndexedRDD with a special lookup that would be
> allowed from tasks as a special case, and that would try to contact the
> blockManager and reach the corresponding data from the right worker.
> Thanks in advance for your advices
> Guillaume
> --
> eXenSa
> 	
> *Guillaume PITEL, Président*
> +33(0)626 222 431
> eXenSa S.A.S. <http://www.exensa.com/> <http://www.exensa.com/>
> 41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>
>
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)626 222 431
>
> eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>

Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message