giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eli Reisman (JIRA)" <>
Subject [jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
Date Tue, 11 Sep 2012 17:22:07 GMT


Eli Reisman commented on GIRAPH-322:

Thanks Maja, great advice. I will check into all these things. I know exactly what you mean
about the spill to disk, it will require some thought to integrate. That has been the problem
with this whole approach: Giraph is hard-wired to Partition -> Vertex -> List<M>
and broadcast-friendly data structures really go against the grain.

The one issue that also makes spills to disk a tricky option is that once spilled to disk
each M object is written over and over (not a referenceId or something), and when read back
they are "fresh instances" again not ref's to the same object. So we get a bunch of re-duplication
on read from disk. Eliminating this problem will require some thought and overhauls.

Our data already fits in memory well, its just a matter of getting the messaging out of a
N^2 growth situation and into a (N * a constant) area, where the amortizing can do the rest
of the job. The nice thing about the amortizing (its an ugly solution I know) is that when
only some workers are sending on any given superstep, the steps pass extremely quickly. once
something works well, I will attempt to generalize the solution. Forcing the amortizing responsibility
out on the application users is not ideal, even if SimpleTriangleClosingVertex provides an
example to copy.

Thanks for the tip about the constructor, the one from the original class (if I remember right)
was just empty braces so I left it to default. Nice save!

So you're saying if I set the command line opts correctly, the Vertex#compute() cycle will
actually grind to a halt on each worker while the message system copies to/from disk whenever
messages pile up? if so, this is worth playing with for sure, i did not see that behavior
when I was trying to run the disk spill options, amybe I just set it up wrong? without the
compute cycle stopping during spill reads/writes, the messages just pile up in memory instead
of on the network. Either way the re-duplication on disk read is going to keep that from being
a one-stop solution for us.

As far as 1 partition per worker, yes its funny I considered an alternate JIRA when i was
coding this to address the SendMessageCache issue you mentioned. Since the reason for lots
of small partitions per worker was to evenly redistribute them on worker crash, it hasn't
really hurt us to use 1 per worker for this purpose since it drastically reduces messages
required per worker, and there is no re-distribution of partitions on worker crash, crash
just ends the job run for Giraph in its current form. Anyway a fix for both message caches
in this regard is a great idea, I will take a 2nd look at this.

As for how I think the folks on this end want the final solution to work, we're working toward
"spill to disk only when there's no other choice" so if it is a part of the solution it should
ideally be there just as an emergency buffer against overload that kicks in once in a while.
But just getting it to work at the scale we want is my main goal, we can tune the use case
after that!

OK, off to try this stuff...thanks so much for your thoughtful input!
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message
growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>                 Key: GIRAPH-322
>                 URL:
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures
and code paths used to transport messages through a Giraph application and out on the network.
While messages to a single vertex can be combined (and should be) in some applications that
could make use of this broadcast messaging, the out of control message growth of algorithms
like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently
it does not present a robust solution for disk-spill message stores. I figure I can get some
advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit.
This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges()
implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or
is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their
delivery and only duplicating message on the network when they are bound for different partitions.
This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't
do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents
an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to
disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and
ideas/suggestions crop up.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see:

View raw message