spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Понькин Алексей <alexey.pon...@ya.ru>
Subject Re: [streaming] DStream with window performance issue
Date Tue, 08 Sep 2015 21:16:34 GMT
Thank you very much for great answer!

-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1&t=1


08.09.2015, 23:53, "Cody Koeninger" <cody@koeninger.org>:
> Yeah, that's the general idea.
>
> When you say hard code topic name, do you mean  Set(topicA, topicB, topicB) ?  You
should be able to use a variable for that - read it from a config file, whatever.
>
> If you're talking about the match statement, yeah you'd need to hardcode your cases.
>
> On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей <alexey.ponkin@ya.ru>
wrote:
>> Ok. I got it!
>> But it seems that I need to hard code topic name.
>>
>> something like that?
>>
>> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder,
DefaultDecoder](
>>   ssc, kafkaParams, Set(topicA, topicB, topicB))
>>   .transform{ rdd =>
>>     val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>     rdd.mapPartitionsWithIndex(
>>       (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) =>
>>         offsetRange(idx).topic match {
>>           case "topicA" => ...
>>           case "topicB" => ...
>>           case _ => ....
>>         }
>>      )
>>     }
>>
>> 08.09.2015, 19:21, "Cody Koeninger" <cody@koeninger.org>:
>>> That doesn't really matter.  With the direct stream you'll get all objects for
a given topicpartition in the same spark partition.  You know what topic it's from via hasOffsetRanges. 
Then you can deserialize appropriately based on topic.
>>>
>>> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей <alexey.ponkin@ya.ru>
wrote:
>>>> The thing is, that these topics contain absolutely different AVRO objects(Array[Byte])
that I need to deserialize to different Java(Scala) objects, filter and then map to tuple
(String, String). So i have 3 streams with different avro object in there. I need to cast
them(using some business rules) to pairs and unite.
>>>>
>>>> --
>>>> Яндекс.Почта — надёжная почта
>>>> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>>>>
>>>> 08.09.2015, 19:11, "Cody Koeninger" <cody@koeninger.org>:
>>>>
>>>>> I'm not 100% sure what's going on there, but why are you doing a union
in the first place?
>>>>>
>>>>> If you want multiple topics in a stream, just pass them all in the set
of topics to one call to createDirectStream
>>>>>
>>>>> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <alexey.ponkin@ya.ru>
wrote:
>>>>>> Ok.
>>>>>> Spark 1.4.1 on yarn
>>>>>>
>>>>>> Here is my application
>>>>>> I have 4 different Kafka topics(different object streams)
>>>>>>
>>>>>> type Edge = (String,String)
>>>>>>
>>>>>> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter(
nonEmpty ).map( toEdge )
>>>>>> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter(
nonEmpty ).map( toEdge )
>>>>>> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter(
nonEmpty ).map( toEdge )
>>>>>>
>>>>>> val u = a union b union c
>>>>>>
>>>>>> val source = u.window(Seconds(600), Seconds(10))
>>>>>>
>>>>>> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter(
nonEmpty ).map( toEdge )
>>>>>>
>>>>>> val joinResult = source.rightOuterJoin( z )
>>>>>> joinResult.foreachRDD { rdd=>
>>>>>>   rdd.foreachPartition { partition =>
>>>>>>      .... // save to result topic in kafka
>>>>>>    }
>>>>>>  }
>>>>>>
>>>>>> The 'window' function in the code above is constantly growing,
>>>>>> no matter how many events appeared in corresponding kafka topics
>>>>>>
>>>>>> but if I change one line from
>>>>>>
>>>>>> val source = u.window(Seconds(600), Seconds(10))
>>>>>>
>>>>>> to
>>>>>>
>>>>>> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
>>>>>>
>>>>>> val source = u.transform(_.partitionBy(partitioner.value) ).window(Seconds(600),
Seconds(10))
>>>>>>
>>>>>> Everything works perfect.
>>>>>>
>>>>>> Perhaps the problem was in WindowedDStream
>>>>>>
>>>>>> I forced to use PartitionerAwareUnionRDD( partitionBy the same partitioner
) instead of UnionRDD.
>>>>>>
>>>>>> Nonetheless I did not see any hints about such a bahaviour in doc.
>>>>>> Is it a bug or absolutely normal behaviour?
>>>>>>
>>>>>> 08.09.2015, 17:03, "Cody Koeninger" <cody@koeninger.org>:
>>>>>>
>>>>>>>  Can you provide more info (what version of spark, code example)?
>>>>>>>
>>>>>>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <alexey.ponkin@ya.ru>
wrote:
>>>>>>>>  Hi,
>>>>>>>>
>>>>>>>>  I have an application with 2 streams, which are joined
together.
>>>>>>>>  Stream1 - is simple DStream(relativly small size batch
chunks)
>>>>>>>>  Stream2 - is a windowed DStream(with duration for example
60 seconds)
>>>>>>>>
>>>>>>>>  Stream1 and Stream2 are Kafka direct stream.
>>>>>>>>  The problem is that according to logs window operation
is constantly increasing(<a href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php">screen</a>).
>>>>>>>>  And also I see gap in pocessing window(<a href="http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php">screen</a>)
in logs there are no events in that period.
>>>>>>>>  So what is happen in that gap and why window is constantly
insreasing?
>>>>>>>>
>>>>>>>>  Thank you in advance
>>>>>>>>
>>>>>>>>  ---------------------------------------------------------------------
>>>>>>>>  To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>  For additional commands, e-mail: user-help@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message