kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koen Vantomme <koen.vanto...@gmail.com>
Subject Consumer error : This consumer has already been closed
Date Sun, 23 Oct 2016 16:10:21 GMT
Hello,

I'm creating a simple consumer in JAVA, the first time I run the consumer
it works fine.
I stop the application. When I want to rerun the consumer I get error
message "This consumer has already been closed"

Any suggestions ?
Regards,
Koen

2016-10-23 17:17:34,261 [main] INFO   AppInfoParser - Kafka commitId :
23c69d62a0cabf06
Exception in thread "main" java.lang.IllegalStateException: This consumer
has already been closed.
at
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1310)
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1321)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:844)


The code :

String topic ="testmetrics";
String group ="cg1";

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);


consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        {
        System.out.printf("offset = %d, key = %s, value = %s\n",
                record.offset(), record.key(), record.value());
        }
c

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message