[ https://issues.apache.org/jira/browse/GIRAPH127?page=com.atlassian.jira.plugin.system.issuetabpanels:alltabpanel
]
Avery Ching reassigned GIRAPH127:

Assignee: Semih Salihoglu
Looking forward to this.
> Extending the API with a master.compute() function.
> 
>
> Key: GIRAPH127
> URL: https://issues.apache.org/jira/browse/GIRAPH127
> Project: Giraph
> Issue Type: New Feature
> Components: bsp, examples, graph
> Reporter: Semih Salihoglu
> Assignee: Semih Salihoglu
>
> First of all, sorry for the long explanation to this feature.
> I want to expand the API of Giraph with a new function called master.compute(), that
would get called at the master before each superstep and I will try to explain the purpose
that it would serve with an example. Let's say we want to implement the following simplified
version of the kmeans clustering algorithm. Pseudocode below:
> * Input G(V, E), k, numEdgesThreshold, maxIterations
> * Algorithm:
> * int numEdgesCrossingClusters = Integer.MAX_INT;
> * int iterationNo = 0;
> * while ((numEdgesCrossingCluster > numEdgesThreshold) && iterationNo <
maxIterations) {
> * iterationNo++;
> * int[] clusterCenters = pickKClusterCenters(k, G);
> * findClusterCenters(G, clusterCenters);
> * numEdgesCrossingClusters = countNumEdgesCrossingClusters();
> * }
> The algorithm goes through the following steps in iterations:
> 1) Pick k random initial cluster centers
> 2) Assign each vertex to the cluster center that it's closest to (in Giraph, this can
be implemented in message passing similar to how ShortestPaths is implemented):
> 3) Count the nuimber of edges crossing clusters
> 4) Go back to step 1, if there are a lot of edges crossing clusters and we haven't exceeded
maximum number of iterations yet.
> In an algorithm like this, step 2 and 3 are where most of the work happens and both parts
have very neat messagepassing implementations. I'll try to give an overview without going
into the details. Let's say we define a Vertex in Giraph to hold a custom Writable object
that holds 2 integer values and sends a message with upto 2 integer values.
> Step 2 is very similar to ShortestPaths algorithm and has two stages: In the first stage,
each vertex checks to see whether or not it's one of the cluster centers. If so, it assigns
itself the value (id, 0), otherwise it assigns itself (Null, Null). In the 2nd stage, the
vertices assign themselves to the minimum distance cluster center by looking at their neighbors
(cluster centers, distance) values (received as 2 integer messages) and their current values,
and changing their values if they find a lower distance cluster center. This happens in x
number of supersteps until every vertex converges.
> Step 3, counting the number of edges crossing clusters, is also very easy to implement
in Giraph. Once each vertex has a cluster center, the number of edges crossing clusters can
be counted by an aggregator, let's say called "numedgescrossing". It would again have two
stages: First stage, every vertex just sends its cluster id to all its neighbors. Second stage,
every vertex looks at their neighbors' cluster ids in the messages, and for each cluster id
that is not equal to its own cluster id, it increments "numedgescrossing" by 1.
> The other 2 steps, step 1 and 4, are very simple sequential computations. Step 1 just
picks k random vertex ids and puts it into an aggregator. Step 4 just compares "numedgescrossing"
by a threshold and also checks whether or not the algorithm has exceeded maxIterations (not
supersteps but iterations of going through Steps 14). With the current API, it's not clear
where to do these computations. There is a per worker function preSuperstep() that can be
implemented, but if we decide to pick a special worker, let's say worker 1, to pick the k
vertices then we'd waste an entire superstep where only worker 1 would do work, (by picking
k vertices in preSuperstep() and put them into an aggregator), and all other workers would
be idle. Trying to do this in worker 1 in postSuperstep() would not work either because, worker
1 needs to know that all the vertices have converged to understand that it's time to pick
k vertices or it's time do check in step 4, which would only be available to it in the beginning
of the next superstep.
> A master.compute() extension would run at the master and before the superstep and would
modify the aggregator that would keep the k vertices before the aggregators are broadcast
to the workers, which are all very short sequential computations, so they would not waste
resources the way a preSuperstep() or postSuperstep() approach would do. It would also enable
running new algorithms like kmeans that are composed of very vertexcentric computations glued
together by small sequential ones. It would basically boost Giraph with sequential computation
in a nonwasteful way.
> I am a phd student at Stanford and I have been working on my own BSP/Pregel implementation
since last year. It's called GPS. I haven't distributed it, mainly because in September I
learned about Giraph and I decided to slow down on working on it :). We have basically been
using GPS as our own research platform. The source code for GPS is here if any one is interested
(https://subversion.assembla.com/svn/phdprojects/gps/trunk/). We have the master.compute()
feature in GPS, and here's an example of KMeans implementation in GPS with master.compute():
(https://subversion.assembla.com/svn/phdprojects/gps/trunk/src/java/gps/examples/kmeans/).
(Aggregators are called GlobalObjects in GPS). There is another example (https://subversion.assembla.com/svn/phdprojects/gps/trunk/src/java/gps/examples/randomgraphcoarsening/),
which I'll skip explaining because it's very detailed and would make the similar points that
I am trying to make with kmeans. Master.compute() in general would make it possible to glue
together any graph algorithm that is composed of multiple stages with different message types
and computations that is conducive to run with vertex.compute(). There are many examples of
such algorithms: recursive partitioning, triangle counting, even much simpler things like
finding shortests paths for 100 vertices in pieces (first to 5 vertices, then to another 5,
then to another 5, etc..), which would be good because trying to find shortests paths to 100
vertices require a very large messages (would need to store 100 integers per message)).
> If the Giraph team approves, I would like to take a similar approach in implementing
this feature in Giraph as I've done in GPS. Overall:
> Add a Master.java to org.apache.giraph.graph, that is default Master, with a compute
function that by default aggregates all aggregators and does the check of whether or not the
computation has ended (by comparining numVertices with numFinishedVertices). This would be
a refactoring of org.apache.giraph.graph.BspServiceMaster class (as far as I can see).
> Extend GiraphJob to have a setMaster() method to set a master class (by default it would
be the default master above)
> The rest would be sending the custom master class to probably all workers but only the
master would instantiate it with reflection. I need to learn more on how to do these, I am
not familiar with that part of the Giraph code base yet.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
