Hi Alexander,
>From your example code, using the GridPartitioner, you will have 1 column,
and 5 rows. When you perform an A^T^A multiplication, you will generate a
separate GridPartitioner with 5 columns and 5 rows. Therefore you are
observing a huge shuffle. If you would generate a diagonalblock matrix as
an example (5x5), you should not observe any shuffle.
Basically, your example causes the worst kind of shuffle. We can implement
RowBasedPartitioning, and ColumnBasedPartitioning for optimization, but we
didn't initially see it necessary to expose the partitioners to users, and
didn't add them (you can find the old implementations here
<https://github.com/brkyvz/spark/commit/9ae85aa1ebabdc099d7f655bc1d9021d34d2910f>
).
Hope that helps!
Best,
Burak
On Tue, Jul 14, 2015 at 9:37 AM, Ulanov, Alexander <alexander.ulanov@hp.com>
wrote:
> Hi Rakesh,
>
>
>
> I am not interested in a particular case of A^T*A. This case is a handy
> setup so I donâ€™t need to create another matrix and force the blocks to
> colocate. Basically, I am trying to understand the effectiveness of
> BlockMatrix for multiplication of distributed matrices. It seems that I am
> missing something or using it wrong.
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Rakesh Chalasani [mailto:vnit.rakesh@gmail.com]
> *Sent:* Tuesday, July 14, 2015 9:05 AM
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: BlockMatrix multiplication
>
>
>
> Hi Alexander:
>
>
>
> Aw, I missed the 'cogroup' on BlockMatrix multiply! I stand corrected.
> Check
>
>
> https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala#L361
>
>
>
> BlockMatrix multiply uses a custom partitioner called GridPartitioner,
> that might be causing the shuffle; which, in your special case need not
> happen. But, from what I understood from your code, I don't think this is
> an issue since your special case can be handled using computeGramMatrix on
> RowMatrix. Is there a reason you did not use that?
>
>
>
> Rakesh
>
>
>
>
>
> On Tue, Jul 14, 2015 at 11:03 AM Ulanov, Alexander <
> alexander.ulanov@hp.com> wrote:
>
> Hi Rakesh,
>
>
>
> Thanks for suggestion. Each block of original matrix is in separate
> partition. Each block of transposed matrix is also in a separate partition.
> The partition numbers are the same for the blocks that undergo
> multiplication. Each partition is on a separate worker. Basically, I want
> to force each worker to multiply only 2 blocks. This should be the optimal
> configuration for multiplication, as far as I understand. Having several
> blocks in each partition as you suggested is not optimal, is it?
>
>
>
> Best regards, Alexander
>
>
>
> Block matrix stores the data as key>Matrix pairs and multiply does a
> reduceByKey operations, aggregating matrices per key. Since you said each
> block is residing in a separate partition, reduceByKey might be effectively
> shuffling all of the data. A better way to go about this is to allow
> multiple blocks within each partition so that reduceByKey does a local
> reduce before aggregating across nodes.
>
>
>
> Rakesh
>
>
>
> On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander <alexander.ulanov@hp.com>
> wrote:
>
> Dear Spark developers,
>
>
>
> I am trying to perform BlockMatrix multiplication in Spark. My test is as
> follows: 1)create a matrix of N blocks, so that each row of block matrix
> contains only 1 block and each block resides in separate partition on
> separate node, 2)transpose the block matrix and 3)multiply the transposed
> matrix by the original nontransposed one. This should preserve the data
> locality, so there should be no need for shuffle. However, I observe huge
> shuffle with the block matrix size of 50000x10000 and one block
> 10000x10000, 5 blocks per matrix. Could you suggest what is wrong?
>
>
>
> My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16
> GB RAM.
>
> Below is the test code:
>
>
>
> import org.apache.spark.mllib.linalg.Matrices
>
> import org.apache.spark.mllib.linalg.distributed.BlockMatrix
>
> val parallelism = 5
>
> val blockSize = 10000
>
> val rows = parallelism * blockSize
>
> val columns = blockSize
>
> val size = rows * columns
>
> assert(rows % blockSize == 0)
>
> assert(columns % blockSize == 0)
>
> val rowBlocks = rows / blockSize
>
> val columnBlocks = columns / blockSize
>
> val rdd = sc.parallelize( {
>
> for(i < 0 until rowBlocks; j < 0 until columnBlocks)
> yield (i, j)
>
> }, parallelism).map( coord => (coord,
> Matrices.rand(blockSize, blockSize, util.Random.self)))
>
> val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
>
> bm.validate()
>
> val mb = bm.transpose.cache()
>
> mb.validate()
>
> val t = System.nanoTime()
>
> val ata = mb.multiply(bm)
>
> ata.validate()
>
> println(rows + "x" + columns + ", block:" + blockSize + "\t" +
> (System.nanoTime()  t) / 1e9)
>
>
>
>
>
> Best regards, Alexander
>
>
