storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Qian Lin <linqian.qian....@gmail.com>
Subject Re: Question on Acking
Date Wed, 30 Apr 2014 07:45:47 GMT
The BaseBasicBolt instance is executed in BasicBoltExecutor, whose
execute()method is shown below:

    public void execute(Tuple input) {
        _collector.setContext(input);
        try {
            _bolt.execute(input, _collector);
            _collector.getOutputter().ack(input);
        } catch(FailedException e) {
            if(e instanceof ReportedFailedException) {
                _collector.reportError(e);
            }
            _collector.getOutputter().fail(input);
        }
    }

As can be seen, the BaseBasicBolt is only useful for the pattern of reading
an input tuple, emitting tuples based on it, and then acking the tuple at
the end of the execute() method. In other words, the ack() is actualized in
the BasicBoltExecutor rather than the BaseBasicBolt. Even if the BaseBasicBolt
instance is a sink bolt (i.e., emitting nothing), it will still ack the
input after its execute() finishes each time.

Qian


On Tue, Apr 29, 2014 at 11:46 PM, Kashyap Mhaisekar <kashyap.m@gmail.com>wrote:

> Hi,
> I have a strange problem my topology. I use KafkaSpout to read from a
> kafka topic and i find that the topology stops consuming messages after a
> while, without apparent reason.
> I suspect this on acking.
>
> I use BaseBasicBolt (due to auto acking capabilities) and what I do in
> bolt is use a condition to emit tuples out.
> My question is -
> 1. When I emit from an execute(...), does that mean acking happens
> automatically here?
> 2. What if I dont emit all tuples? If I use a condition like the code
> highlighted below, does it mean that acking does not happen when execute
> method is not complete?
> 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has
> method *ack *used for acking while BasicOutputCollector has no such
> method. What do I do to explicitly ack using BasicOutputCollector?
> 4. If I have a bolt that saves values to DB and does not emit anything,
> will it cause an problem?
>
> E.g.,
>
> public void execute(Tuple tuple, BasicOutputCollector collector) {
>             String sentence = tuple.getString(0);
>             for(String word: sentence.split(" ")) {
>                 *if (word.equals("the"))* collector.emit(new Values(word));
>             }
>         }
>
> Please help!
>
> Regards,
> Kashyap
>

Mime
View raw message