Hi all,

I'm working on spark streaming prototype and I need to consume message stream from RabbitMQ. I looked at existing DStreams and created my own,

class RabbitMQInputDStream(....) extends NetworkInputDStream[T]
  def getReceiver ... = new RabbitMqReceiver(...).asInstanceOf[NetworkReceiver[T]]

I initiate StreamingContext similar to examples:
object Main extends App {
    val ctx = new StreamingContext(master, "CpcServiceStreaming", Seconds(5))
    val stream = new RabbitMQInputDStream[Click](ctx, "localhost", "app", "rabbitmqapp")
           
    ctx.registerInputStream(stream)
    val budget = stream.map(c => (c.AdGuid, c.Money)).reduceByKey(_ + _)
    budget.print()
...
}

It starts, but during initialization I'm getting "Object of RabbitMQInputDStream is being serialized  possibly as a part of closure of an RDD operation. This is because  the DStream object is being referred to from within the closure.  Please rewrite the RDD operation inside this DStream to avoid this"

I'm not quite sure where it is called from and what has been captures in closure, because call stack indicates an actor reacting to message (so the originator is not easily discovered). But now I suspect it is somehow related to the fact that all stream constructors are encapsulated withing StreamingContext class. Am I right? And what can I do to write my own DStream?

Perhaps I can use QueueInputDStream as a workaround, but I'm curios to understand the roots of the issue.

Environment: spark from "scala-2.10" branch, scala-2.10.2

Vadim Chekan

--
From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified