spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Snively <>
Subject Re: Roadblock with Spark 0.8.0 ActorStream
Date Sun, 06 Oct 2013 21:07:20 GMT
Hi Matei!

On Oct 4, 2013, at 1:53 PM, Matei Zaharia wrote:

> Ah, I see, I thought calling "!" on an ActorRef required an implicit ActorSystem to be
in scope, but maybe that's not so.

You'd think so, wouldn't you? But the situation is stranger in some sense: an ActorRef strongly
abstracts access to the machinery to resolve the recipient of the message, including any relevant
ActorSystem(s), since the ActorRef could even be to an Actor in a different ActorSystem.

> It seems like the actual issue is that ActorRef can't be sent around with just Java Serialization,
but rather need to be sent as an Akka message.

That's my understanding from the e-mail thread and docs I linked to as well.

> Specifically, when they are deserialized, this needs to be done in a thread with an Akka-internal
ThreadLocal variable set to point to an actual ActorSystem.

Yep. That's the specific content of the message from the thrown exception, too.

> Anyway, there's an important reason why we serialize closures using standard Java Serialization
instead of Akka's send, which is that not all our task launch methods go through Akka. When
running on Mesos for example, Mesos has its own API for sending tasks to nodes. (More generally,
we also want to save the closure *before* the user might modify the state it closes over,
so we can get deterministic re-execution.)

I had intuited that this must be the case; it's what I was alluding to when I said it was
"understandable" that Spark Streaming would "manually" serialize tasks.

> If there was a way to correctly set the thread-local ActorSystem variable that Akka depends
on so that ActorRefs get deserialized, that would be a great solution.

<$> :-)

So my request to the Spark Streaming team is precisely that this be included in your deserialization

> Otherwise I'd recommend passing the address as a string URL (akka://host:port/whatever)
instead of an ActorRef, and creating an ActorRef on the remote node from it. Hopefully one
of these works.

This workaround does not, presumably because I'm using third-party code (spray-io in this
case) that participates in my closure and includes ActorRefs that it manages. It seems clear
enough to me that this will be an issue in the general case: "don't close over ActorRefs"
is not realistic advice when Spark Streaming clients use third-party code and control neither
the deserialization code nor the construction of the thread that does the deserialization
(otherwise we could simply use JavaSerializer.currentSystem.withValue(...) { ... } in our
client code, but that also does not work).

> Anyway, thanks for bringing up this issue -- it's a confusing one and we should have
a recommended solution for it.

Thank you for your patience in listening to my explanation. Unfortunately, the only feasible
solution I see is a minor point release of Spark Streaming that includes the JavaSerializer.currentSystem.withValue()
invocation, again due to the fact that you control both the deserialization implementation
and the construction of the thread doing the deserialization.

> Matei

Best regards,

View raw message