giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eli Reisman (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
Date Wed, 12 Sep 2012 18:45:08 GMT

    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13454229#comment-13454229
] 

Eli Reisman commented on GIRAPH-322:
------------------------------------

No problem Hyunsik, this patch is undergoing change and not ready for action yet, if you're
playing with testing the test.sh script, go right ahead!

Maja: Thats good to know about spill to disk, that is not what I saw before and I should definitely
play with this some more, I must have set the params wrong. The thing about disk spill here
is no one seems to bite on the idea of replacing any existing solutions with Giraph if spill
to disk is involved, most folks here seem already convinced that a solution that spills to
disk too much is not different enough from the existing solutions for a given use case to
warrant changing the approach. The use cases for Giraph here revolve around scale out on many
machines and running in-memory. I am hoping by getting your params set right and using it
as an emergency buffer when memory is just too tight we can gradually loosen up this view
and let users decide for themselves if its an option they want to exercise, but as it stands
now I need to pursue solutions that are at least feasible in memory first. I am excited to
see this might still be a viable option to make this work. Thanks for the advice!

As far as where the crash occurs, I'm saying the whole graph data I'm using loads no problem
and can exist in memory. So far as I tested an earlier messaging solution and GIRAPH-314 with
amortizing on different numbers of supersteps, the data in-memory was not where the errors
were reported. They were coming on the Netty receiving side as all the messages were arriving
at their destinations. When the messages arrive, the data (for the most part) is aggregating
how many times a 2nd degree neighbor is encountered in incoming messages, so the data we aggregate
at each vertex is not going to grow as fast as the accumulated size of the messages the vertex
receives. Thats why the amortizing helped so much, if we can process each message we can GC
it quickly and the local storage per vertex only grows in size when a new unique neighbor
ID is encountered in an incoming message. 

So...we'll see what actually happens, this is an experiment after all, but so far so good.
The volume on the network and messages sitting on the sender or receiver end before processing
in a given superstep seem to represent the memory overload dangers. If we amortize message
volume over supersteps and de-duplicate transmissions, this pressure is relieved. How much
remains to be seen. I am still operating from a laptop and haven't been able to get set up
to do testing yet so no word. But i think your fix was just the right thing to get past the
error I was seeing, thanks!

Two things I want to try in another iteration on this patch right away:

1) Try to use the PartitionOwner to send one copy to each host worker and spread out duplicate
references to partitions on the remote owner rather than grouping them early as we spoke about.
In this way perhaps even when a use case does not combine this with -Dhash.userPartitionCount
we can at least deduplicate volume of messages on the wire to the same host all the time.
If this goes well, it will not be hard to throw up another JIRA and convert SendMessageCache
similarly.

2) make sure the solution is robust for non-idempotent messaging by removing the M -> Set<I>
in favor of a M -> List<I> model to retain duplicate deliveries of the same message
to the same vertex. In cases like PageRank where the same DoubleWritable might get sent to
the same vertex from neighbors on the same sending worker from multiple partitions on that
worker, this message has to be seen as a "fresh delivery" of that value as the receiver iterates
on its message for each sender that "sent" a copy originally.

3) try it out, see what happens!

Thanks for all the advice, I really appreciate your input. I'll have another patch up soon
to see if we're heading in the right direction with this...




                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message
growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures
and code paths used to transport messages through a Giraph application and out on the network.
While messages to a single vertex can be combined (and should be) in some applications that
could make use of this broadcast messaging, the out of control message growth of algorithms
like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently
it does not present a robust solution for disk-spill message stores. I figure I can get some
advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit.
This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges()
implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or
is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their
delivery and only duplicating message on the network when they are bound for different partitions.
This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't
do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents
an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to
disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and
ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message