spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Date Sat, 12 Nov 2016 21:28:58 GMT
Can you come up with a minimal reproducible example?

Probably unrelated, but why are you doing a union of 3 streams?

On Sat, Nov 12, 2016 at 10:29 AM, dev loper <sparkemr@gmail.com> wrote:
> There are no failures or errors.  Irrespective of that I am seeing
> duplicates. The steps and stages are all successful and even the speculation
> is turned off .
>
> On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <cody@koeninger.org> wrote:
>>
>> Are you certain you aren't getting any failed tasks or other errors?
>> Output actions like foreach aren't exactly once and will be retried on
>> failures.
>>
>>
>> On Nov 12, 2016 06:36, "dev loper" <sparkemr@gmail.com> wrote:
>>>
>>> Dear fellow Spark Users,
>>>
>>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>>> listens to Campaigns based on live stock feeds and the batch duration is 5
>>> seconds. The applications uses Kafka DirectStream and based on the feed
>>> source there are three streams. As given in the code snippet I am doing a
>>> union of three streams and I am trying to remove the duplicate campaigns
>>> received using reduceByKey based on the customer and campaignId. I could see
>>> lot of duplicate email being send out for the same key in the same batch.I
>>> was expecting reduceByKey to remove the duplicate campaigns in a batch based
>>> on customer and campaignId. In logs I am even printing the the key,batch
>>> time before sending the email and I could clearly see duplicates. I could
>>> see some duplicates getting removed after adding log in reduceByKey
>>> Function, but its not eliminating completely .
>>>
>>> JavaDStream<Campaign> matchedCampaigns =
>>> stream1.transform(CmpManager::getMatchedCampaigns)
>>>             .union(stream2).union(stream3).cache();
>>>
>>> JavaPairDStream<String, Campaign> uniqueCampaigns =
>>> matchedCampaigns.mapToPair(campaign->{
>>>         String key=campaign.getCustomer()+"_"+campaign.getId();
>>>         return new Tuple2<String, Campaigns>(key, campaign);
>>>     })
>>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
>>>
>>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>>>
>>> I am not able to figure out where I am going wrong here . Please help me
>>> here to get rid of this weird problem. Previously we were using createStream
>>> for listening to Kafka Queue (number of partitions 1) , there we didn't face
>>> this issue. But when we moved to directStream (number of partitions 100) we
>>> could easily reproduce this issue on high load .
>>>
>>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
>>> instead of reduceByKey Operation, But even that didn't
>>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
>>> Durations.Seconds(5))
>>>
>>> I have even requested for help on Stackoverflow , But I haven't received
>>> any solutions to this issue.
>>>
>>> Stack Overflow Link
>>> ================
>>>
>>> https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>>>
>>>
>>> Thanks and Regards
>>> Dev
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message