spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Prashant Sharma <>
Subject Re: Roadblock with Spark 0.8.0 ActorStream
Date Thu, 03 Oct 2013 10:59:44 GMT
Trying to replicate it following change to ActorWordCount can reproduce it.

 lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).foreach
      x => x foreach { x1 => con ! x1 }

While this works
 lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).foreach
      x => con ! x.collect()

There is another important thing, for now it is not possible to pass
ActorRef while creating a props instance for actorStream. The reason is, it
serializes the actorRef and sends it to runJob(which is dumb to not have
actor system). So alternative approach like by passing path(as string) or
ActorPath and then instantiate the ActorRef, may be used.

This should be fixable I suppose in future, but for that I have to learn
how to hack into our runjob mechanism. Will try to check it ASAP. For now
may be we can create these issues in jira ?

Please let me know if this fixes your problems.

On Thu, Oct 3, 2013 at 12:27 PM, Prashant Sharma <>wrote:

> Hi Paul,
> Forgive me, for I have not seen spray-io/akka-io at all and trying to
> understand what is done.
> I feel the problem is in this statement
>     stream foreach { rdd => rdd foreach { item => connection !
> Tcp.Write(item) } }
> AFAIR this use to work earlier. So in order to be sure about the above, I
> will try to write a minimalist sample that sends stuff to a remote actor
> processed by a dstream in foreachRdd.
> By any chance have you tried it ?
> On Wed, Oct 2, 2013 at 11:07 PM, Paul Snively <> wrote:
>> Hi everyone,
>> I think I have a knack for trying things a bit outside the box. :-)
>> I'm trying to develop a TLS-aware streaming TCP server that creates a
>> Spark Stream for each incoming connection. Since Akka 2.2's new IO module
>> has been backported to Akka 2.0.5 thanks to <
>>>, it seemed the easiest
>> thing to do would be to use Spark's ActorStream support to create a DStream
>> from the ConnectionHandler Actor with the Receiver trait mixed in, and the
>> Tcp.Received(data) case simply calling pushBlock(data). My server class
>> just takes a StreamingContext and a Function2[ActorRef,
>> DStream[ByteString], Unit] and, when a connection is accepted, constructs
>> the DStream, then passes the connection and DStream to the callback.
>> I was able to write this, and it compiles OK, but my test throws an
>> IllegalStateException because deserializing a task is attempting to
>> deserialize an ActorRef, presumably the TCP connection, without an
>> ActorSystem in scope. The exception message is helpful:
>> java.lang.**IllegalStateException: Trying to deserialize a serialized
>> ActorRef without an ActorSystem in scope. Use 'akka.serialization.**
>> Serialization.currentSystem.**withValue(system) { ... }'
>> However, my attempts to, e.g. wrap my callback invocation in this are
>> unsuccessful, since I don't have access to an ExtendedActorSystem to pass
>> in.
>> A small project with code and my failing test is at <
>>>. If
>> someone has a bit of time to help me look into this, I'd be grateful. Note
>> that this is a very generic task that I think more people than just me
>> would be interested in. :-)
>> Thanks!
>> Paul
> --
> s


View raw message