kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Yu <cyu...@gmail.com>
Subject org.apache.kafka.common.errors.UnsupportedVersionException in Java
Date Wed, 18 Apr 2018 23:06:22 GMT
Hi,

I can use "kafka-console-consumer.sh" to subscribe a topic on my kafka
broker, the consumer receives messages from kafka broker just fine.

However, when I try to subscribe the same topic in Java code with
spring-kafka, I get the UnsupportedVersionException error:
2018-04-19 06:42:30.124 ERROR
o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.run[719] - Container
exceptionorg.apache.kafka.common.errors.UnsupportedVersionException: The
broker does not support LIST_OFFSETS with version in range [2,2]. The
supported range is [0,1].

The "kafka-console-consumer.sh" is from "kafka_2.12-1.0.0", and the
spring-kafka is with version of "2.1.4.RELEASE".

According to the client compatibility table listed in "
http://projects.spring.io/spring-kafka/", my spring-kafka should be
equivalent to client used in "kafka_2.12-1.0.0".

Following is how I configure the kafka consumer in springboot, how can I
get pass this  UnsupportedVersionException error?

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString().toLowerCase());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    return factory;
}

Thank you.


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message