spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dev loper <spark...@gmail.com>
Subject Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Date Sat, 12 Nov 2016 12:36:27 GMT
Dear fellow Spark Users,

My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
listens to Campaigns based on live stock feeds and the batch duration is 5
seconds. The applications uses Kafka DirectStream and based on the feed
source there are three streams. As given in the code snippet I am doing a
union of three streams and I am trying to remove the duplicate campaigns
received using reduceByKey based on the customer and campaignId. I could
see lot of duplicate email being send out for the same key in the same
batch.I was expecting reduceByKey to remove the duplicate campaigns in a
batch based on customer and campaignId. In logs I am even printing the the
key,batch time before sending the email and I could clearly see duplicates.
I could see some duplicates getting removed after adding log in reduceByKey
Function, but its not eliminating completely .

JavaDStream<Campaign> matchedCampaigns =
stream1.transform(CmpManager::getMatchedCampaigns)
            .union(stream2).union(stream3).cache();
JavaPairDStream<String, Campaign> uniqueCampaigns =
matchedCampaigns.mapToPair(campaign->{
        String key=campaign.getCustomer()+"_"+campaign.getId();
        return new Tuple2<String, Campaigns>(key, campaign);
    }).reduceByKey((campaign1, campaign2)->{return campaign1;});

uniqueCampaigns.foreachRDD(CmpManager::sendEmail);

I am not able to figure out where I am going wrong here . Please help me
here to get rid of this weird problem. Previously we were using
createStream for listening to Kafka Queue (number of partitions 1) , there
we didn't face this issue. But when we moved to directStream (number of
partitions 100) we could easily reproduce this issue on high load .

*Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds
instead of reduceByKey Operation, But even that didn't
help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1,
Durations.Seconds(5), Durations.Seconds(5))
I have even requested for help on Stackoverflow , But I haven't received
any solutions to this issue.


*Stack Overflow Link================*
https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch


Thanks and Regards
Dev

Mime
View raw message