spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Snively <psniv...@icloud.com>
Subject Re: Roadblock with Spark 0.8.0 ActorStream
Date Fri, 04 Oct 2013 20:13:43 GMT
Hi Matei!

On Oct 4, 2013, at 12:03 PM, Matei Zaharia wrote:

> Hi Paul,
> 
> Just FYI, I'm not sure Akka was designed to pass ActorSystems across closures the way
you're doing.

There may be some confusion here: I'm most certainly not closing over an ActorSystem!

> Also, there's a bit of a misunderstanding about closures on RDDs. Consider this change
you made to ActorWordCount:
> 
>  lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).foreach {
>       x => x foreach { x1 => con ! x1 }
>     }

I believe Prashant made that change to see if he could reproduce my issue. He was able to,
but not, I think, for reasons that either you or I expect, about which more in a moment.

> You say that using "con" in the foreach on RDD x won't work, but that makes a lot of
sense -- foreach on RDDs could be running on a completely different node!

As I would hope and expect!

> Akka doesn't support passing ActorSystems across nodes

This is the part that's confusing me: no one is asking it to.

> and in the best case

Which I would instead call the expected case... :-)

> what might've happened with this code is that the remote node would send a message to
the "con" object in your main program, which would then send stuff over the network.

Exactly. What I close over is an ActorRef, not an ActorSystem. ActorRefs are deliberately
Serializable, and, when Akka's Serialization infrastructure is used to do the serialization
and deserialization, ActorRefs sent to different nodes work exactly as you describe, assuming
the Akka configuration includes the RemoteActorRefProvider, which Spark's, of course, does.

> Basically, you have to be really aware of when an operation is running on a distributed
collection versus locally in the main program. RDD.foreach, map, etc all run code on remote
nodes of the cluster. DStream.foreach is an exception (and I'm kind of surprised it's called
foreach now, because it used to be called foreachRDD) in that it lets you run stuff locally
on the RDD object.

While your point is well taken, I can assure you that I'm well aware of this, and in fact,
for this use case, the replication, lineage, exactly-once semantics, and recovery features
of Spark Streaming are precisely what motivates its use.

> The fact that Akka passes ActorRefs around makes this really confusing, unfortunately,
because those can be passed across nodes, and it looks like they can fail in weird ways if
they're not for a local ActorSystem.

I'm frankly confused by this, for basically two reasons. One is that I've been working with
Akka's remote actor support for a few years now, and have never before run into any such issues:
ActorRefs serialize and deserialize fine, becoming remote ActorRefs on the destination node,
and messages sent to them are sent to the originating node, exactly as you would expect and
is documented at <http://doc.akka.io/docs/akka/2.0.5/scala/serialization.html#Serializing_ActorRefs>
and <http://doc.akka.io/docs/akka/2.0.5/general/addressing.html#What_is_an_Actor_Reference_>
bullets 2 and 4. The second reason is that actorStream is a documented, shipping feature of
Spark Streaming, and the documentation makes no mention of limitations with respect to stream
processes closing over ActorRefs—as, again frankly, it should not, given that I'm constructing
those ActorRefs using Spark's ActorSystem, which is configured to use the RemoteActorRefProvider.

In short, I am doing everything according to Hoyle. I'm afraid I have to stick to my guns
here: Spark Streaming's failure to deserialize these ActorRefs correctly is a serious defect,
the shape of which is probably very similar to what's described at <https://groups.google.com/forum/#!topic/akka-user/umVoYFmXogI>.
That is, I'm guessing that Spark is doing "manual serialization" rather than "letting Akka
do the serialization," and for Spark's architecture this makes perfect sense. It just leaves
exactly the gap Prashant and I are describing: there's a bit more work to do in order to correctly
serialize and deserialize ActorRefs, but what that work consists of is documented—even in
the message of the exception that's thrown!

> I'd really recommend looking into another way to send these messages that fails in a
more obvious way if you are trying to send them from a remote machine. Could be as simple
as having a non-Serializable object that handles the sending and that you just call into.

Thank you for the suggestion, but no. If we need to find an alternative to Spark Streaming
for this project, that will be a shame, but I need to be clear: this is not about flaky Akka
remoting behavior or serializing ActorSystems (which aren't Serializable anyway) vs. ActorRefs.
It's a known consequence of doing serialization of ActorRefs "manually" vs. with Akka's serializer
via simply sending messages. Again, perfectly understandable and far from intuitively obvious
as a potential problem. But there it is, and it would be helpful, as we're still in proof-of-concept
phase, to have some information as to how seriously we should expect this issue to be taken.
I will do my best this weekend to craft an appropriate pull request with a proposed solution
if at all possible.

> Matei

Best regards,
Paul



Mime
View raw message