Hi Kashyap,

Currently I am also working on KafkaSpout, but my bolt is not emitting any message.  Kafka Topic has various messages.
While executing Topology, getting following logs : 

68503 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Refreshing partition manager connections
68503 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Refreshing partition manager connections
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Deleted partition managers: []
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - New partition managers: []
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Finished refreshing
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Deleted partition managers: []
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - New partition managers: []
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Finished refreshing

When I consume messages from kafka consumer on terminal, it shows all the messages. Can you please share which storm-kafka dependencies and versions are using in your pom.xml?

Any help would be really appreciated.

Thanks,
Nishu


On Tue, Apr 29, 2014 at 9:16 PM, Kashyap Mhaisekar <kashyap.m@gmail.com> wrote:
Hi,
I have a strange problem my topology. I use KafkaSpout to read from a kafka topic and i find that the topology stops consuming messages after a while, without apparent reason.
I suspect this on acking.

I use BaseBasicBolt (due to auto acking capabilities) and what I do in bolt is use a condition to emit tuples out.
My question is -
1. When I emit from an execute(...), does that mean acking happens automatically here?
2. What if I dont emit all tuples? If I use a condition like the code highlighted below, does it mean that acking does not happen when execute method is not complete?
3. How do I ack from BasicOutputCollector? I mean, OutputCollector has method ack used for acking while BasicOutputCollector has no such method. What do I do to explicitly ack using BasicOutputCollector?
4. If I have a bolt that saves values to DB and does not emit anything, will it cause an problem?

E.g.,

public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                if (word.equals("the")) collector.emit(new Values(word));
            }
        }
Please help!

Regards,
Kashyap