storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Francisco Lopes <>
Subject Re: How to guarantee exactly-once on Kafka messages processing
Date Thu, 09 Jun 2016 22:18:19 GMT

But isn't that exactly what Trident is supposed to do?

After reading more about this subject, it seems Trident gives me the
exactly once semantics for free:

You've seen the intricacies of what it takes to achieve exactly-once
> semantics. The nice thing about Trident is that it internalizes all the
> fault-tolerance logic within the State – as a user you don't have to deal
> with comparing txids, storing multiple values in the database, or anything
> like that.

>From that I understand that Trident will take care of it all for me,
storing just the txid somewhere for each tuple, as described as an example

You've already seen that storing just the count as the value isn't
> sufficient to know whether you've processed a batch of tuples before.
> Instead, what you can do is store the transaction id with the count in the
> database as an atomic value. Then, when updating the count, you can just
> compare the transaction id in the database with the transaction id for the
> current batch. If they're the same, you skip the update – because of the
> strong ordering, you know for sure that the value in the database
> incorporates the current batch. If they're different, you increment the
> count. This logic works because the batch for a txid never changes, and
> Trident ensures that state updates are ordered among batches.

I'm wondering though where does Trident these internal txids. Would it be
in memory? Or Zookeeper, maybe?


On Thu, Jun 9, 2016 at 6:23 AM, Alberto São Marcos <>

> Trying to do "exactly once"  in distributed systems is not the easiest or
> safest path Francisco. Making the downstream workflow idempotent is usually
> a lot easier and prolly a much robust solution.
> On Wed, Jun 8, 2016 at 3:02 PM, Francisco Lopes <>
> wrote:
>> Hello,
>> I'm new to Storm/Trident and I'm using it to read messages from Kafka and
>> send them to an external API exactly once. My topology is as simple as:
>> IPartitionedTridentSpout kafkaSpout = getKafkaSpout();
>> TridentTopology topology = new TridentTopology();
>> topology
>>     .newStream("kafka", kafkaSpout)
>>     .each(new Fields("str"), new Processor(), new Fields());
>> I'm not sure how I should implement state to guarantee a message is not
>> processed twice.
>> Can anyone please enlighten me?
>> All the examples I found show states as counts or sums and that's not
>> what I really need. I'm inclined to use a Redis instance to store state (
>>, but I don't know on what I'd
>> actually persistentAggregate.
>> Thank you.
>> Regards,
>> Francisco

View raw message