spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: [streaming] DStream with window performance issue
Date Tue, 08 Sep 2015 16:21:25 GMT
That doesn't really matter.  With the direct stream you'll get all objects
for a given topicpartition in the same spark partition.  You know what
topic it's from via hasOffsetRanges.  Then you can deserialize
appropriately based on topic.

On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей <alexey.ponkin@ya.ru>
wrote:

> The thing is, that these topics contain absolutely different AVRO
> objects(Array[Byte]) that I need to deserialize to different Java(Scala)
> objects, filter and then map to tuple (String, String). So i have 3 streams
> with different avro object in there. I need to cast them(using some
> business rules) to pairs and unite.
>
> --
> Яндекс.Почта — надёжная почта
> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>
>
> 08.09.2015, 19:11, "Cody Koeninger" <cody@koeninger.org>:
> > I'm not 100% sure what's going on there, but why are you doing a union
> in the first place?
> >
> > If you want multiple topics in a stream, just pass them all in the set
> of topics to one call to createDirectStream
> >
> > On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <alexey.ponkin@ya.ru>
> wrote:
> >> Ok.
> >> Spark 1.4.1 on yarn
> >>
> >> Here is my application
> >> I have 4 different Kafka topics(different object streams)
> >>
> >> type Edge = (String,String)
> >>
> >> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter(
> nonEmpty ).map( toEdge )
> >> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter(
> nonEmpty ).map( toEdge )
> >> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter(
> nonEmpty ).map( toEdge )
> >>
> >> val u = a union b union c
> >>
> >> val source = u.window(Seconds(600), Seconds(10))
> >>
> >> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter(
> nonEmpty ).map( toEdge )
> >>
> >> val joinResult = source.rightOuterJoin( z )
> >> joinResult.foreachRDD { rdd=>
> >>   rdd.foreachPartition { partition =>
> >>      .... // save to result topic in kafka
> >>    }
> >>  }
> >>
> >> The 'window' function in the code above is constantly growing,
> >> no matter how many events appeared in corresponding kafka topics
> >>
> >> but if I change one line from
> >>
> >> val source = u.window(Seconds(600), Seconds(10))
> >>
> >> to
> >>
> >> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
> >>
> >> val source = u.transform(_.partitionBy(partitioner.value)
> ).window(Seconds(600), Seconds(10))
> >>
> >> Everything works perfect.
> >>
> >> Perhaps the problem was in WindowedDStream
> >>
> >> I forced to use PartitionerAwareUnionRDD( partitionBy the same
> partitioner ) instead of UnionRDD.
> >>
> >> Nonetheless I did not see any hints about such a bahaviour in doc.
> >> Is it a bug or absolutely normal behaviour?
> >>
> >> 08.09.2015, 17:03, "Cody Koeninger" <cody@koeninger.org>:
> >>
> >>>  Can you provide more info (what version of spark, code example)?
> >>>
> >>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <alexey.ponkin@ya.ru>
> wrote:
> >>>>  Hi,
> >>>>
> >>>>  I have an application with 2 streams, which are joined together.
> >>>>  Stream1 - is simple DStream(relativly small size batch chunks)
> >>>>  Stream2 - is a windowed DStream(with duration for example 60 seconds)
> >>>>
> >>>>  Stream1 and Stream2 are Kafka direct stream.
> >>>>  The problem is that according to logs window operation is constantly
> increasing(<a href="
> http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php
> ">screen</a>).
> >>>>  And also I see gap in pocessing window(<a href="
> http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php">screen</a>)
> in logs there are no events in that period.
> >>>>  So what is happen in that gap and why window is constantly
> insreasing?
> >>>>
> >>>>  Thank you in advance
> >>>>
> >>>>  ---------------------------------------------------------------------
> >>>>  To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>>>  For additional commands, e-mail: user-help@spark.apache.org
>

Mime
View raw message