storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bilal Al Fartakh <alfartaj.bi...@gmail.com>
Subject Re: storm-kafka-plus 0.4 duplicate tuples
Date Mon, 26 May 2014 15:10:43 GMT
did you try to set the number of spout as 1 beside 4 ?


2014-05-26 16:04 GMT+01:00 Irek Khasyanov <quard8@gmail.com>:

> After hours of investigation, I have this:
>
> 1. Problem happens when some tuple did't ack properly, but all ack's
> happens in same thread and right after tuple recieved.
> 2. Time when duplicate record in database inserted relies on TOPOLOGY_
> MESSAGE_TIMEOUT_SECS
> 3. storm-kafka-plus does't alerting about failed tuples (or maybe this is
> storm problem) even with Config.setDebug(true); It's not possible to
> investigate where magic happens.
> 4. Removing anchoring in emitting bolt helps it most situations, sometimes
> I see duplicate records, but it total mass of events it's not critical.
>
> I'm still not sure why failing tuples exists :)
>
>
>
> On 25 May 2014 20:20, Irek Khasyanov <quard8@gmail.com> wrote:
>
>> Hi everyone!
>>
>> I’m stuck with duplicates of tuples.
>>
>> I have kafka-spout with 4 workers for 4 partitions. Some tuples
>> duplicating as I see in database, twice or triple. This happens not just in
>> time, but after few second, about 30 second. Sometimes this happens after
>> few minutes. I can’t figure out what happens, everything is ok in workers
>> log files.
>>
>> My topology config:
>>
>> TopologyBuilder builder = new TopologyBuilder();
>>         builder.setSpout("event", kafkaSpout, 4).setNumTasks(4);
>>         builder.setBolt(“events", new EventsBatchBolt(topologyConfig), 1)
>>                 .shuffleGrouping("event")
>>                 .setNumTasks(1);
>>
>>         builder.setBolt("properties", new
>> EventPropertiesBolt(topologyConfig), 1)
>>                 .fieldsGrouping("vertica", new Fields("event_id",
>> "fieldname", "repr_type", "repr_value"))
>>                 .setNumTasks(1);
>>
>> storm config:
>>
>> Config config = new Config();
>>         config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60);
>>         config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 20);
>>         config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2);
>>
>>         config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             8);
>>         config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
>>         config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 131072);
>>
>>         config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,    131072);
>>         config.setNumAckers(2);
>>         config.setDebug(false);
>>
>> My guess, I doing something wrong with topology config or something wrong
>> with storm-kafka-plus 0.4.
>>
>> Any ideas why I have duplicates?
>>
>> Thanks
>>
>> — With best regards, Irek Khasyanov
>>
>
>
>
> --
> With best regards, Irek Khasyanov.
>



-- 
*Al Fartakh Bilal*

Mime
View raw message