storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nishu <nishuta...@gmail.com>
Subject Re: Question on Acking
Date Wed, 30 Apr 2014 17:43:06 GMT
Hi Kashyap,

Currently I am also working on KafkaSpout, but my bolt is not emitting any
message.  Kafka Topic has various messages.
While executing Topology, getting following logs :

68503 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68503 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Deleted partition
managers: []
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - New partition
managers: []
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Finished
refreshing
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Deleted partition
managers: []
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - New partition
managers: []
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Finished
refreshing

When I consume messages from kafka consumer on terminal, it shows all the
messages. Can you please share which storm-kafka dependencies and versions
are using in your pom.xml?

Any help would be really appreciated.

Thanks,
Nishu


On Tue, Apr 29, 2014 at 9:16 PM, Kashyap Mhaisekar <kashyap.m@gmail.com>wrote:

> Hi,
> I have a strange problem my topology. I use KafkaSpout to read from a
> kafka topic and i find that the topology stops consuming messages after a
> while, without apparent reason.
> I suspect this on acking.
>
> I use BaseBasicBolt (due to auto acking capabilities) and what I do in
> bolt is use a condition to emit tuples out.
> My question is -
> 1. When I emit from an execute(...), does that mean acking happens
> automatically here?
> 2. What if I dont emit all tuples? If I use a condition like the code
> highlighted below, does it mean that acking does not happen when execute
> method is not complete?
> 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has
> method *ack *used for acking while BasicOutputCollector has no such
> method. What do I do to explicitly ack using BasicOutputCollector?
> 4. If I have a bolt that saves values to DB and does not emit anything,
> will it cause an problem?
>
> E.g.,
>
> public void execute(Tuple tuple, BasicOutputCollector collector) {
>             String sentence = tuple.getString(0);
>             for(String word: sentence.split(" ")) {
>                 *if (word.equals("the"))* collector.emit(new Values(word));
>             }
>         }
>
> Please help!
>
> Regards,
> Kashyap
>

Mime
View raw message