I haven't tried rdd.distinct. I thought since redcuceByKey itself is not helping me even with a sliding window here ,so i thought rdd.distinct might not  help . I will write a minimal code for reproducing the issue and share it with you guys. One another point I want to bring in is that I am unable to reproduce the issue  when I am running on my local box , but when I deploy the code in yarn cluster  with 34 executors the problem is easily reproduced . Similarly when I am using Spark. CreateStream with one partition the issue is not reproduced and when I am using spark DirectStream to consume kafka with 100 partitions the issue can be easily reproduced. The duplicates are not happening on the same executor as per log print, its happening on different executors . I don't know whether last point helps.

On Sun, Nov 13, 2016 at 5:22 AM, ayan guha <guha.ayan@gmail.com> wrote:
Have you tried rdd.distinc? 

On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger <cody@koeninger.org> wrote:
Can you come up with a minimal reproducible example?

Probably unrelated, but why are you doing a union of 3 streams?

On Sat, Nov 12, 2016 at 10:29 AM, dev loper <sparkemr@gmail.com> wrote:
> There are no failures or errors.  Irrespective of that I am seeing
> duplicates. The steps and stages are all successful and even the speculation
> is turned off .
>
> On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <cody@koeninger.org> wrote:
>>
>> Are you certain you aren't getting any failed tasks or other errors?
>> Output actions like foreach aren't exactly once and will be retried on
>> failures.
>>
>>
>> On Nov 12, 2016 06:36, "dev loper" <sparkemr@gmail.com> wrote:
>>>
>>> Dear fellow Spark Users,
>>>
>>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>>> listens to Campaigns based on live stock feeds and the batch duration is 5
>>> seconds. The applications uses Kafka DirectStream and based on the feed
>>> source there are three streams. As given in the code snippet I am doing a
>>> union of three streams and I am trying to remove the duplicate campaigns
>>> received using reduceByKey based on the customer and campaignId. I could see
>>> lot of duplicate email being send out for the same key in the same batch.I
>>> was expecting reduceByKey to remove the duplicate campaigns in a batch based
>>> on customer and campaignId. In logs I am even printing the the key,batch
>>> time before sending the email and I could clearly see duplicates. I could
>>> see some duplicates getting removed after adding log in reduceByKey
>>> Function, but its not eliminating completely .
>>>
>>> JavaDStream<Campaign> matchedCampaigns =
>>> stream1.transform(CmpManager::getMatchedCampaigns)
>>>             .union(stream2).union(stream3).cache();
>>>
>>> JavaPairDStream<String, Campaign> uniqueCampaigns =
>>> matchedCampaigns.mapToPair(campaign->{
>>>         String key=campaign.getCustomer()+"_"+campaign.getId();
>>>         return new Tuple2<String, Campaigns>(key, campaign);
>>>     })
>>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
>>>
>>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>>>
>>> I am not able to figure out where I am going wrong here . Please help me
>>> here to get rid of this weird problem. Previously we were using createStream
>>> for listening to Kafka Queue (number of partitions 1) , there we didn't face
>>> this issue. But when we moved to directStream (number of partitions 100) we
>>> could easily reproduce this issue on high load .
>>>
>>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
>>> instead of reduceByKey Operation, But even that didn't
>>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
>>> Durations.Seconds(5))
>>>
>>> I have even requested for help on Stackoverflow , But I haven't received
>>> any solutions to this issue.
>>>
>>> Stack Overflow Link
>>> ================
>>>
>>> https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>>>
>>>
>>> Thanks and Regards
>>> Dev
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org




--
Best Regards,
Ayan Guha