spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: [streaming] DStream with window performance issue
Date Tue, 08 Sep 2015 20:53:43 GMT
Yeah, that's the general idea.

When you say hard code topic name, do you mean  Set(topicA, topicB, topicB)
?  You should be able to use a variable for that - read it from a config
file, whatever.

If you're talking about the match statement, yeah you'd need to hardcode
your cases.


On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей <alexey.ponkin@ya.ru>
wrote:

> Ok. I got it!
> But it seems that I need to hard code topic name.
>
> something like that?
>
> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
> DefaultDecoder, DefaultDecoder](
>   ssc, kafkaParams, Set(topicA, topicB, topicB))
>   .transform{ rdd =>
>     val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>     rdd.mapPartitionsWithIndex(
>       (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) =>
>         offsetRange(idx).topic match {
>           case "topicA" => ...
>           case "topicB" => ...
>           case _ => ....
>         }
>      )
>     }
>
>
>
>
> 08.09.2015, 19:21, "Cody Koeninger" <cody@koeninger.org>:
>
> That doesn't really matter.  With the direct stream you'll get all objects
> for a given topicpartition in the same spark partition.  You know what
> topic it's from via hasOffsetRanges.  Then you can deserialize
> appropriately based on topic.
>
> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей <alexey.ponkin@ya.ru>
> wrote:
>
> The thing is, that these topics contain absolutely different AVRO
> objects(Array[Byte]) that I need to deserialize to different Java(Scala)
> objects, filter and then map to tuple (String, String). So i have 3 streams
> with different avro object in there. I need to cast them(using some
> business rules) to pairs and unite.
>
> --
> Яндекс.Почта — надёжная почта
> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>
>
> 08.09.2015, 19:11, "Cody Koeninger" <cody@koeninger.org>:
> > I'm not 100% sure what's going on there, but why are you doing a union
> in the first place?
> >
> > If you want multiple topics in a stream, just pass them all in the set
> of topics to one call to createDirectStream
> >
> > On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <alexey.ponkin@ya.ru>
> wrote:
> >> Ok.
> >> Spark 1.4.1 on yarn
> >>
> >> Here is my application
> >> I have 4 different Kafka topics(different object streams)
> >>
> >> type Edge = (String,String)
> >>
> >> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter(
> nonEmpty ).map( toEdge )
> >> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter(
> nonEmpty ).map( toEdge )
> >> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter(
> nonEmpty ).map( toEdge )
> >>
> >> val u = a union b union c
> >>
> >> val source = u.window(Seconds(600), Seconds(10))
> >>
> >> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter(
> nonEmpty ).map( toEdge )
> >>
> >> val joinResult = source.rightOuterJoin( z )
> >> joinResult.foreachRDD { rdd=>
> >>   rdd.foreachPartition { partition =>
> >>      .... // save to result topic in kafka
> >>    }
> >>  }
> >>
> >> The 'window' function in the code above is constantly growing,
> >> no matter how many events appeared in corresponding kafka topics
> >>
> >> but if I change one line from
> >>
> >> val source = u.window(Seconds(600), Seconds(10))
> >>
> >> to
> >>
> >> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
> >>
> >> val source = u.transform(_.partitionBy(partitioner.value)
> ).window(Seconds(600), Seconds(10))
> >>
> >> Everything works perfect.
> >>
> >> Perhaps the problem was in WindowedDStream
> >>
> >> I forced to use PartitionerAwareUnionRDD( partitionBy the same
> partitioner ) instead of UnionRDD.
> >>
> >> Nonetheless I did not see any hints about such a bahaviour in doc.
> >> Is it a bug or absolutely normal behaviour?
> >>
> >> 08.09.2015, 17:03, "Cody Koeninger" <cody@koeninger.org>:
> >>
> >>>  Can you provide more info (what version of spark, code example)?
> >>>
> >>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <alexey.ponkin@ya.ru>
> wrote:
> >>>>  Hi,
> >>>>
> >>>>  I have an application with 2 streams, which are joined together.
> >>>>  Stream1 - is simple DStream(relativly small size batch chunks)
> >>>>  Stream2 - is a windowed DStream(with duration for example 60 seconds)
> >>>>
> >>>>  Stream1 and Stream2 are Kafka direct stream.
> >>>>  The problem is that according to logs window operation is constantly
> increasing(<a href="
> http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php
> ">screen</a>).
> >>>>  And also I see gap in pocessing window(<a href="
> http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php">screen</a>)
> in logs there are no events in that period.
> >>>>  So what is happen in that gap and why window is constantly
> insreasing?
> >>>>
> >>>>  Thank you in advance
> >>>>
> >>>>  ---------------------------------------------------------------------
> >>>>  To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>>>  For additional commands, e-mail: user-help@spark.apache.org
>
>

Mime
View raw message