spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dev loper <spark...@gmail.com>
Subject Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Date Sun, 13 Nov 2016 03:16:05 GMT
I haven't tried rdd.distinct. I thought since redcuceByKey itself is not
helping me even with a sliding window here ,so i thought rdd.distinct might
not  help . I will write a minimal code for reproducing the issue and share
it with you guys. One another point I want to bring in is that I am unable
to reproduce the issue  when I am running on my local box , but when I
deploy the code in yarn cluster  with 34 executors the problem is easily
reproduced . Similarly when I am using Spark. CreateStream with one
partition the issue is not reproduced and when I am using spark
DirectStream to consume kafka with 100 partitions the issue can be easily
reproduced. The duplicates are not happening on the same executor as per
log print, its happening on different executors . I don't know whether last
point helps.

On Sun, Nov 13, 2016 at 5:22 AM, ayan guha <guha.ayan@gmail.com> wrote:

> Have you tried rdd.distinc?
>
> On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger <cody@koeninger.org>
> wrote:
>
>> Can you come up with a minimal reproducible example?
>>
>> Probably unrelated, but why are you doing a union of 3 streams?
>>
>> On Sat, Nov 12, 2016 at 10:29 AM, dev loper <sparkemr@gmail.com> wrote:
>> > There are no failures or errors.  Irrespective of that I am seeing
>> > duplicates. The steps and stages are all successful and even the
>> speculation
>> > is turned off .
>> >
>> > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <cody@koeninger.org>
>> wrote:
>> >>
>> >> Are you certain you aren't getting any failed tasks or other errors?
>> >> Output actions like foreach aren't exactly once and will be retried on
>> >> failures.
>> >>
>> >>
>> >> On Nov 12, 2016 06:36, "dev loper" <sparkemr@gmail.com> wrote:
>> >>>
>> >>> Dear fellow Spark Users,
>> >>>
>> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>> >>> listens to Campaigns based on live stock feeds and the batch duration
>> is 5
>> >>> seconds. The applications uses Kafka DirectStream and based on the
>> feed
>> >>> source there are three streams. As given in the code snippet I am
>> doing a
>> >>> union of three streams and I am trying to remove the duplicate
>> campaigns
>> >>> received using reduceByKey based on the customer and campaignId. I
>> could see
>> >>> lot of duplicate email being send out for the same key in the same
>> batch.I
>> >>> was expecting reduceByKey to remove the duplicate campaigns in a
>> batch based
>> >>> on customer and campaignId. In logs I am even printing the the
>> key,batch
>> >>> time before sending the email and I could clearly see duplicates. I
>> could
>> >>> see some duplicates getting removed after adding log in reduceByKey
>> >>> Function, but its not eliminating completely .
>> >>>
>> >>> JavaDStream<Campaign> matchedCampaigns =
>> >>> stream1.transform(CmpManager::getMatchedCampaigns)
>> >>>             .union(stream2).union(stream3).cache();
>> >>>
>> >>> JavaPairDStream<String, Campaign> uniqueCampaigns =
>> >>> matchedCampaigns.mapToPair(campaign->{
>> >>>         String key=campaign.getCustomer()+"_"+campaign.getId();
>> >>>         return new Tuple2<String, Campaigns>(key, campaign);
>> >>>     })
>> >>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
>> >>>
>> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>> >>>
>> >>> I am not able to figure out where I am going wrong here . Please help
>> me
>> >>> here to get rid of this weird problem. Previously we were using
>> createStream
>> >>> for listening to Kafka Queue (number of partitions 1) , there we
>> didn't face
>> >>> this issue. But when we moved to directStream (number of partitions
>> 100) we
>> >>> could easily reproduce this issue on high load .
>> >>>
>> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
>> >>> instead of reduceByKey Operation, But even that didn't
>> >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1,
>> Durations.Seconds(5),
>> >>> Durations.Seconds(5))
>> >>>
>> >>> I have even requested for help on Stackoverflow , But I haven't
>> received
>> >>> any solutions to this issue.
>> >>>
>> >>> Stack Overflow Link
>> >>> ================
>> >>>
>> >>> https://stackoverflow.com/questions/40559858/spark-streaming
>> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>> >>>
>> >>>
>> >>> Thanks and Regards
>> >>> Dev
>> >
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Mime
View raw message