spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Понькин Алексей <alexey.pon...@ya.ru>
Subject Re: [streaming] DStream with window performance issue
Date Wed, 09 Sep 2015 15:00:03 GMT
That`s correct, I have 10 seconds batch.
The problem is actually in processing time, it is increasing constantly no matter how small
or large my window duration is.
I am trying to prepare some example code to clarify my use case.

-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1&t=1


09.09.2015, 17:04, "Cody Koeninger" <cody@koeninger.org>:
> It looked like from your graphs that you had a 10 second batch time, but that your processing
time was consistently 11 seconds.  If that's correct, then yes your delay is going to keep
growing.  You'd need to either increase your batch time, or get your processing time down
(either by adding more resources or changing your code).
>
> I'd expect adding a repartition / shuffle to increase processing time, not decrease it. 
What are you seeing after adding the partitionBy call?
>
> On Tue, Sep 8, 2015 at 5:33 PM, Понькин Алексей <alexey.ponkin@ya.ru>
wrote:
>> Oh my, I implemented one directStream instead of union of three but it is still growing
exponential with window method.
>>
>> --
>> Яндекс.Почта — надёжная почта
>> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>>
>> 08.09.2015, 23:53, "Cody Koeninger" <cody@koeninger.org>:
>>
>>> Yeah, that's the general idea.
>>>
>>> When you say hard code topic name, do you mean  Set(topicA, topicB, topicB)
?  You should be able to use a variable for that - read it from a config file, whatever.
>>>
>>> If you're talking about the match statement, yeah you'd need to hardcode your
cases.
>>>
>>> On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей <alexey.ponkin@ya.ru>
wrote:
>>>> Ok. I got it!
>>>> But it seems that I need to hard code topic name.
>>>>
>>>> something like that?
>>>>
>>>> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder,
DefaultDecoder](
>>>>   ssc, kafkaParams, Set(topicA, topicB, topicB))
>>>>   .transform{ rdd =>
>>>>     val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>     rdd.mapPartitionsWithIndex(
>>>>       (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) =>
>>>>         offsetRange(idx).topic match {
>>>>           case "topicA" => ...
>>>>           case "topicB" => ...
>>>>           case _ => ....
>>>>         }
>>>>      )
>>>>     }
>>>>
>>>> 08.09.2015, 19:21, "Cody Koeninger" <cody@koeninger.org>:
>>>>> That doesn't really matter.  With the direct stream you'll get all objects
for a given topicpartition in the same spark partition.  You know what topic it's from via
hasOffsetRanges.  Then you can deserialize appropriately based on topic.
>>>>>
>>>>> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей <alexey.ponkin@ya.ru>
wrote:
>>>>>> The thing is, that these topics contain absolutely different AVRO
objects(Array[Byte]) that I need to deserialize to different Java(Scala) objects, filter and
then map to tuple (String, String). So i have 3 streams with different avro object in there.
I need to cast them(using some business rules) to pairs and unite.
>>>>>>
>>>>>> --
>>>>>> Яндекс.Почта — надёжная почта
>>>>>> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>>>>>>
>>>>>> 08.09.2015, 19:11, "Cody Koeninger" <cody@koeninger.org>:
>>>>>>
>>>>>>> I'm not 100% sure what's going on there, but why are you doing
a union in the first place?
>>>>>>>
>>>>>>> If you want multiple topics in a stream, just pass them all in
the set of topics to one call to createDirectStream
>>>>>>>
>>>>>>> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <alexey.ponkin@ya.ru>
wrote:
>>>>>>>> Ok.
>>>>>>>> Spark 1.4.1 on yarn
>>>>>>>>
>>>>>>>> Here is my application
>>>>>>>> I have 4 different Kafka topics(different object streams)
>>>>>>>>
>>>>>>>> type Edge = (String,String)
>>>>>>>>
>>>>>>>> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter(
nonEmpty ).map( toEdge )
>>>>>>>> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter(
nonEmpty ).map( toEdge )
>>>>>>>> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter(
nonEmpty ).map( toEdge )
>>>>>>>>
>>>>>>>> val u = a union b union c
>>>>>>>>
>>>>>>>> val source = u.window(Seconds(600), Seconds(10))
>>>>>>>>
>>>>>>>> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter(
nonEmpty ).map( toEdge )
>>>>>>>>
>>>>>>>> val joinResult = source.rightOuterJoin( z )
>>>>>>>> joinResult.foreachRDD { rdd=>
>>>>>>>>   rdd.foreachPartition { partition =>
>>>>>>>>      .... // save to result topic in kafka
>>>>>>>>    }
>>>>>>>>  }
>>>>>>>>
>>>>>>>> The 'window' function in the code above is constantly growing,
>>>>>>>> no matter how many events appeared in corresponding kafka
topics
>>>>>>>>
>>>>>>>> but if I change one line from
>>>>>>>>
>>>>>>>> val source = u.window(Seconds(600), Seconds(10))
>>>>>>>>
>>>>>>>> to
>>>>>>>>
>>>>>>>> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
>>>>>>>>
>>>>>>>> val source = u.transform(_.partitionBy(partitioner.value)
).window(Seconds(600), Seconds(10))
>>>>>>>>
>>>>>>>> Everything works perfect.
>>>>>>>>
>>>>>>>> Perhaps the problem was in WindowedDStream
>>>>>>>>
>>>>>>>> I forced to use PartitionerAwareUnionRDD( partitionBy the
same partitioner ) instead of UnionRDD.
>>>>>>>>
>>>>>>>> Nonetheless I did not see any hints about such a bahaviour
in doc.
>>>>>>>> Is it a bug or absolutely normal behaviour?
>>>>>>>>
>>>>>>>> 08.09.2015, 17:03, "Cody Koeninger" <cody@koeninger.org>:
>>>>>>>>
>>>>>>>>>  Can you provide more info (what version of spark, code
example)?
>>>>>>>>>
>>>>>>>>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <alexey.ponkin@ya.ru>
wrote:
>>>>>>>>>>  Hi,
>>>>>>>>>>
>>>>>>>>>>  I have an application with 2 streams, which are
joined together.
>>>>>>>>>>  Stream1 - is simple DStream(relativly small size
batch chunks)
>>>>>>>>>>  Stream2 - is a windowed DStream(with duration for
example 60 seconds)
>>>>>>>>>>
>>>>>>>>>>  Stream1 and Stream2 are Kafka direct stream.
>>>>>>>>>>  The problem is that according to logs window operation
is constantly increasing(<a href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php">screen</a>).
>>>>>>>>>>  And also I see gap in pocessing window(<a href="http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php">screen</a>)
in logs there are no events in that period.
>>>>>>>>>>  So what is happen in that gap and why window is
constantly insreasing?
>>>>>>>>>>
>>>>>>>>>>  Thank you in advance
>>>>>>>>>>
>>>>>>>>>>  ---------------------------------------------------------------------
>>>>>>>>>>  To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>  For additional commands, e-mail: user-help@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message