spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vadim Chekan <kot.bege...@gmail.com>
Subject Custom stream
Date Fri, 06 Sep 2013 00:34:31 GMT
Hi all,

I'm working on spark streaming prototype and I need to consume message
stream from RabbitMQ. I looked at existing DStreams and created my own,

class RabbitMQInputDStream(....) extends NetworkInputDStream[T]
  def getReceiver ... = new
RabbitMqReceiver(...).asInstanceOf[NetworkReceiver[T]]

I initiate StreamingContext similar to examples:
object Main extends App {
    val ctx = new StreamingContext(master, "CpcServiceStreaming",
Seconds(5))
    val stream = new RabbitMQInputDStream[Click](ctx, "localhost", "app",
"rabbitmqapp")

    ctx.registerInputStream(stream)
    val budget = stream.map(c => (c.AdGuid, c.Money)).reduceByKey(_ + _)
    budget.print()
...
}

It starts, but during initialization I'm getting "Object of
RabbitMQInputDStream is being serialized  possibly as a part of closure of
an RDD operation. This is because  the DStream object is being referred to
from within the closure.  Please rewrite the RDD operation inside this
DStream to avoid this"

I'm not quite sure where it is called from and what has been captures in
closure, because call stack indicates an actor reacting to message (so the
originator is not easily discovered). But now I suspect it is somehow
related to the fact that all stream constructors are encapsulated withing
StreamingContext class. Am I right? And what can I do to write my own
DStream?

Perhaps I can use QueueInputDStream as a workaround, but I'm curios to
understand the roots of the issue.

Environment: spark from "scala-2.10" branch, scala-2.10.2

Vadim Chekan

-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified

Mime
View raw message