kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "namesuperwood"<namesuperw...@gmail.com>
Subject uncontinuous offset for consumer seek
Date Wed, 24 Jan 2018 03:25:02 GMT
Hi all


kafka version : kafka_2.11-0.11.0.2


A topic-partition "adn-tracking,15" in kafka who's earliest offset is1255644602 andlatest
offset is1271253441. 
While starting a spark streaming to process the data from the topic , we got a exception with
"Got wrong record XXXX even after seeking to offset 1266921577”. [    (earliest offset)
1255644602  1266921577   1271253441 ( latest offset ) ]
Iimplemented a simple project to use consumer to seek offset 1266921577. But it return the
offset1266921578. Then while seek to1266921576, it return the1266921576exactly。
Why ? How to fix that ?


There is the code:
public class consumerDemo {
public static void main(String[] argv){ 
Properties props = new Properties();
props.put("bootstrap.servers", "172.31.29.31:9091");
props.put("group.id", "consumer-tutorial-demo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumerString, String consumer = new KafkaConsumerString, String(props);
TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
CollectionTopicPartition collection = new ArrayListTopicPartition();
collection.add(tp); consumer.assign(collection);
consumer.seek(tp, 1266921576); ConsumerRecordsString, String consumerRecords = consumer.poll(10000);
ListConsumerRecordString, String listR = consumerRecords.records(tp);
IteratorConsumerRecordString, String  iter = listR.iterator();
ConsumerRecordString, String record = iter.next();
System.out.println(" the next record " + record.offset() + " recode topic " + record.topic());
 }
}
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message