spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Elmer Garduno <gard...@gmail.com>
Subject Re: Custom stream
Date Fri, 06 Sep 2013 03:07:19 GMT
We took a different approach by using an ActorStream. It depends on the
akka-rabbitmq library [1]. I don't know if the approach is completely
correct, but it worked for us.  You can find it here:

https://gist.github.com/elmer-garduno/6459000

Usage:

 val lines = ssc.actorStream[String](Props(new RMQReceiver("user",
"password", "exchange", "host", "queue", "topic")),
      "SocketReceiver")

[1] https://github.com/thenewmotion/akka-rabbitmq


On Thu, Sep 5, 2013 at 7:34 PM, Vadim Chekan <kot.begemot@gmail.com> wrote:

> Hi all,
>
> I'm working on spark streaming prototype and I need to consume message
> stream from RabbitMQ. I looked at existing DStreams and created my own,
>
> class RabbitMQInputDStream(....) extends NetworkInputDStream[T]
>   def getReceiver ... = new
> RabbitMqReceiver(...).asInstanceOf[NetworkReceiver[T]]
>
> I initiate StreamingContext similar to examples:
> object Main extends App {
>     val ctx = new StreamingContext(master, "CpcServiceStreaming",
> Seconds(5))
>     val stream = new RabbitMQInputDStream[Click](ctx, "localhost", "app",
> "rabbitmqapp")
>
>     ctx.registerInputStream(stream)
>     val budget = stream.map(c => (c.AdGuid, c.Money)).reduceByKey(_ + _)
>     budget.print()
> ...
> }
>
> It starts, but during initialization I'm getting "Object of
> RabbitMQInputDStream is being serialized  possibly as a part of closure of
> an RDD operation. This is because  the DStream object is being referred to
> from within the closure.  Please rewrite the RDD operation inside this
> DStream to avoid this"
>
> I'm not quite sure where it is called from and what has been captures in
> closure, because call stack indicates an actor reacting to message (so the
> originator is not easily discovered). But now I suspect it is somehow
> related to the fact that all stream constructors are encapsulated withing
> StreamingContext class. Am I right? And what can I do to write my own
> DStream?
>
> Perhaps I can use QueueInputDStream as a workaround, but I'm curios to
> understand the roots of the issue.
>
> Environment: spark from "scala-2.10" branch, scala-2.10.2
>
> Vadim Chekan
>
> --
> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
> explicitly specified
>

Mime
View raw message