spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ayan guha <guha.a...@gmail.com>
Subject Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Date Sat, 12 Nov 2016 23:52:56 GMT
Have you tried rdd.distinc?

On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger <cody@koeninger.org> wrote:

> Can you come up with a minimal reproducible example?
>
> Probably unrelated, but why are you doing a union of 3 streams?
>
> On Sat, Nov 12, 2016 at 10:29 AM, dev loper <sparkemr@gmail.com> wrote:
> > There are no failures or errors.  Irrespective of that I am seeing
> > duplicates. The steps and stages are all successful and even the
> speculation
> > is turned off .
> >
> > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <cody@koeninger.org>
> wrote:
> >>
> >> Are you certain you aren't getting any failed tasks or other errors?
> >> Output actions like foreach aren't exactly once and will be retried on
> >> failures.
> >>
> >>
> >> On Nov 12, 2016 06:36, "dev loper" <sparkemr@gmail.com> wrote:
> >>>
> >>> Dear fellow Spark Users,
> >>>
> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
> >>> listens to Campaigns based on live stock feeds and the batch duration
> is 5
> >>> seconds. The applications uses Kafka DirectStream and based on the feed
> >>> source there are three streams. As given in the code snippet I am
> doing a
> >>> union of three streams and I am trying to remove the duplicate
> campaigns
> >>> received using reduceByKey based on the customer and campaignId. I
> could see
> >>> lot of duplicate email being send out for the same key in the same
> batch.I
> >>> was expecting reduceByKey to remove the duplicate campaigns in a batch
> based
> >>> on customer and campaignId. In logs I am even printing the the
> key,batch
> >>> time before sending the email and I could clearly see duplicates. I
> could
> >>> see some duplicates getting removed after adding log in reduceByKey
> >>> Function, but its not eliminating completely .
> >>>
> >>> JavaDStream<Campaign> matchedCampaigns =
> >>> stream1.transform(CmpManager::getMatchedCampaigns)
> >>>             .union(stream2).union(stream3).cache();
> >>>
> >>> JavaPairDStream<String, Campaign> uniqueCampaigns =
> >>> matchedCampaigns.mapToPair(campaign->{
> >>>         String key=campaign.getCustomer()+"_"+campaign.getId();
> >>>         return new Tuple2<String, Campaigns>(key, campaign);
> >>>     })
> >>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
> >>>
> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
> >>>
> >>> I am not able to figure out where I am going wrong here . Please help
> me
> >>> here to get rid of this weird problem. Previously we were using
> createStream
> >>> for listening to Kafka Queue (number of partitions 1) , there we
> didn't face
> >>> this issue. But when we moved to directStream (number of partitions
> 100) we
> >>> could easily reproduce this issue on high load .
> >>>
> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
> >>> instead of reduceByKey Operation, But even that didn't
> >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1,
> Durations.Seconds(5),
> >>> Durations.Seconds(5))
> >>>
> >>> I have even requested for help on Stackoverflow , But I haven't
> received
> >>> any solutions to this issue.
> >>>
> >>> Stack Overflow Link
> >>> ================
> >>>
> >>> https://stackoverflow.com/questions/40559858/spark-
> streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
> >>>
> >>>
> >>> Thanks and Regards
> >>> Dev
> >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha

Mime
View raw message