spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matei Zaharia <matei.zaha...@gmail.com>
Subject Re: Roadblock with Spark 0.8.0 ActorStream
Date Fri, 04 Oct 2013 19:03:44 GMT
Hi Paul,

Just FYI, I'm not sure Akka was designed to pass ActorSystems across closures the way you're
doing. Also, there's a bit of a misunderstanding about closures on RDDs. Consider this change
you made to ActorWordCount:

 lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).foreach {
      x => x foreach { x1 => con ! x1 }
    }

You say that using "con" in the foreach on RDD x won't work, but that makes a lot of sense
-- foreach on RDDs could be running on a completely different node! Akka doesn't support passing
ActorSystems across nodes, and in the best case, what might've happened with this code is
that the remote node would send a message to the "con" object in your main program, which
would then send stuff over the network. Basically, you have to be really aware of when an
operation is running on a distributed collection versus locally in the main program. RDD.foreach,
map, etc all run code on remote nodes of the cluster. DStream.foreach is an exception (and
I'm kind of surprised it's called foreach now, because it used to be called foreachRDD) in
that it lets you run stuff locally on the RDD object.

The fact that Akka passes ActorRefs around makes this really confusing, unfortunately, because
those can be passed across nodes, and it looks like they can fail in weird ways if they're
not for a local ActorSystem. I'd really recommend looking into another way to send these messages
that fails in a more obvious way if you are trying to send them from a remote machine. Could
be as simple as having a non-Serializable object that handles the sending and that you just
call into.

Matei


On Oct 4, 2013, at 8:27 AM, Paul Snively <psnively@icloud.com> wrote:

> Hi Prashant!
> 
>> That's very interesting. So this appears to be a general issue with ActorStream,
then?
>> 
>> With all DStream's I suppose. 
>  
> OK, so that seems like a bug independent of the issue around having no ActorSystem in
scope for runJob(), then.
> 
>> I have not spent enough time on spray-io yet, but if it is implemented as ActorExtension,
then using actorStream is the best fit. For example I have used akka-camel/akka-zeromq to
receive streams from variety of endpoints. I am not suggesting you should use it, although
it has SSL support etc. That was the original motivation while designing it. But if it can
be somehow used in a different and more useful way, I had encourage it. 
> 
>  I'm reluctant to add another abstraction layer here, since the motivation for using
spray-io in the first place is its extreme scalability, and we mostly want to rely on Spark
Streaming here for managing availability and recovery. To me, it seems like we've identified
two obvious issues: one revolving around foreach ... foreach ... over a DStream not working
correctly, the other revolving around runJob() not having an ActorSystem in scope. While I
understand that there is much other work to do, both of these seem, first of all, rather serious,
and secondly, as if they should have relatively easy fixes. Of course, I'm open to being corrected
in this belief. :-)
> 
>> I had like to understand a bit further, as to what are your ultimate intention in
a bit more detail on architecture from a higher level. Probably I am hinting on a redesign.
>  
> That's certainly a valid option!
> 
> Without discussing things I'm not at liberty to, all I can really say is that I'm trying
to build a real-time streaming server that takes a connection from a client that expects to
be able to then send arbitrary bytes to the server. On the server, I want a DStream per socket
connection to process the incoming data. There's other stuff, such as this needing to support
TLS mutual authentication, but I'll get there when I have data flowing. :-)
> 
> It's really quite simple, or at least it should be, and again, I believe it will be,
when the bugs we've identified are fixed.
> 
>> In your case, the code is still complicated to spot where exactly we are leaking
an ActorRef in the test case ?
>  
> If it helps, the only ActorRef in my code now is in the SpraySslClient, i.e. nowhere
that's being serialized.
> 
>> or in our Handler. 
>  
> I have to respectfully suggest it is indeed this. OTOH, is it really a "leak?" I am,
in fact, closing over objects that, sooner or later, construct ActorRefs. The whole point
of my task is to do something with data coming from an ActorRef that represents a TCP connection,
then send the result back to that same ActorRef. I guess I'm having a little trouble understanding
how many actorStream use cases there are that wouldn't involve ultimately closing over at
least one ActorRef. :-)
> 
>> It is also possible to start multiple actorStream receiver in advance and then using
their supervisor to start more actor as receiver for you, for example on each connection (if
that makes any sense, take a look at it). 
>  
> I'm afraid I didn't quite follow that, but it sounds interesting!
> 
> At this point, though, I have to suggest that the path forward, simply for Spark Streaming
to work correctly out of the box, is clear. I have created an issue against the missing ActorSystem
in runJob() in JIRA, but it sounds like we need another one for the foreach ... foreach ...
issue. I'm afraid I don't know exactly how to characterize that issue. Could I possibly impose
upon you to create that ticket?
> 
> Thanks again for digging into tis!
> Paul
> 


Mime
View raw message