spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ldmtwo <ldm...@gmail.com>
Subject experimental solution to nesting RDDs
Date Wed, 24 Sep 2014 23:33:50 GMT
I want to share and brainstorm on an experiment before I try it all the way.
I hope that Spark contributors can comment. To be clear, it is not my intent
to use MLLib where I get partial control on the work being done and I'm not
seeing it scale well enough yet. I have fundamental questions about Spark
such as, can it handle 100k or even 10 million stages? Can this clever hacky
strategy get around the limitation of only managing RDDs from the driver?
Can I iterate over permutations (as with nesting) of an RDD set without
calling cartesian() and having memory explosion?

I've been using Spark for a while and lately I've come up with a solution to
a problem. I need to access 1TB-10TB in a distributed fashion. Clearly, the
data cannot be on a single node, but I can have replication if that helps
performance. Some of the algorithms require permutations of degree 2 or 3
(e.g. N=4 unique elements, then we have 16 or 64 permutations). It makes
sense to only store the data once and not replicate it. For some algorithms,
I must iteratively update data as I go along, which breaks the write once
model Spark tends to employ. 

An example algorithm could be covariance, QR decomposition or matrix
multiplication. These three use matrices and I can break them into rows,
columns, or submatrix rectangle blocks. Here are the strategies I have come
up with to use RDDs or Broadcast variables.

First, I noticed the possibility to use broadcast variables for some tasks,
but I worry that the driver would have to choke on the entire data set first
and then each node may receive a copy whether it needs it or not. Is it
possible to create Broadcast variables within a RDD function call like map.
I'm not sure if this could work, but Broadcast class is serializable and
should not have an issue being used as Array[Broadcast[MyType]] or
RDD[Broadcast[MyType]]. Broadcast has the problem of being written once and
then sent out all over without the possibility to modify and sync/update
aside from the local copy. It would be great for read-only matrices and each
Task should only request the data elements it needs.

Next, lets talk about RDDs and their hidden potential.  RDDs however have
the update ability built in. I can create an Array[RDD[MyType]] and each RDD
has a size of 1. When I need to read MyType, I would access
myRDDArray(i).first. When I need to update MyType, I would call
myRDDArray(i).map{t=>newValue}. The array would be very large of course and
that is where I don't know what would happen. Would all of the RDDs try to
be on the first node? I want to have partition size = 1 on all of them. 

Let's assume that MyType is a column (Array[Double]) of a matrix and we are
doing covariance. Using Broadcast variables, I could create an RDD[Int] of
the column range R, then do something like R.cartesian(R).map{(a,b)=>
covar(column(a), column(b)}. Imagine column(x) is bcastArray(x).value. This
works well enough for getting the processing distributed, but memory may not
be. If I needed to update a value (in a different algo), how could I do
that? I don't know how to make read-write scenarios work.

Back to the other scenario (RDD method), suppose we are doing QR
decomposition and I'm finding Q (column major matrix with updates). When
calculating the dot product of two vectors, I set it up as RDD function
calls. How would this work out? Can Spark handle 100k, 10m, or 1bn stages?
Below shows how I compute the sum of products of two columns (commented code
followed by the new version. k refers to an outer loop, m and n are matrix
dimensions, j is for the 2nd loop and i would be the 3rd loop. Yes, I know
that it's not 100% the same. 
 var j = k+1
    while(j<n){
        var s = 0.0f;
        var i=k
//        while(i&lt;m){
//            s += Qcolk(i) * Qcolj(i)
//            i=i+1
//        }
       s = QcoljRDD.zip(QcolkRDD).reduce((a,b)=> (a._1 * a._2 + b._1 *
b._2,1f))._1

The goal is not so much to do the work in record speed, but to break limits
regarding matrix size and memory requirement. A 1TB matrix is a real issue
with the data we are trying to analyze and that's only 100k patients out of
10s of millions available. I need to find a way to make algorithms that are
not easily distributed into a distributed problem.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/experimental-solution-to-nesting-RDDs-tp15095.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message