kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From João Peixoto <joao.harti...@gmail.com>
Subject Re: Can't re-process topic
Date Wed, 17 May 2017 23:12:03 GMT
I'm not too familiar with Spark but the "earliest"/"latest" configuration
is only relevant if your consumer does not hold a valid offset.

If you read up to offset N, when you restart you'll start from N.
If you start a new consumer then it has no offset, that's when the above
configuration takes effect.

To reprocess a topic you need to set your consumer's offset to 0 or change
your consumer group name to a non-existing one. The former is preferable I
believe.


On Wed, May 17, 2017 at 1:38 PM Marcelo Oikawa <marcelo.oikawa@webradar.com>
wrote:

> Hi, list.
>
> I'm trying to re-process a topic in Kafka but when I request for earliest
> offsets. The code below always returns the same value as latest offsets (if
> I replace OffsetRequest.EarliestTime() to OffsetRequest.LatestTime()).
>
> Is there something that I missing? I'm pretty sure that this code worked
> for me at some point in our project. Today, we're using Kafka 0.10 but our
> library is spark-streaming-kafka_2.10:1.6.3 and that depends on
> kafka-clients:0.8.2.1. And also, our application is running on Spark 1.6.3.
>
> Any thoughts are welcome.
>
> // Get the partitions and offsets
> TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
> partMetadata.partitionId());
> PartitionOffsetRequestInfo partitionRequestInfo = new
> PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1);
>
> Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
> Collections.singletonMap(topicAndPartition, partitionRequestInfo);
>
> OffsetRequest offsetRequest = new OffsetRequest(requestInfo,
> kafka.api.OffsetRequest.CurrentVersion(), "spark");
> OffsetResponse offsetResponse =
> getConsumer(partMetadata.leader().host(),
> partMetadata.leader().port()).getOffsetsBefore(offsetRequest);
>
> System.out.println(offsetResponse.hasError());
> long[] offsets = offsetResponse.offsets(topic, partMetadata.partitionId());
> System.out.println("offsets.size: " + offsets.length);
> if (offsets.length > 0) {
>     StringBuilder result = new StringBuilder();
>     result.append("topic: ").append(topic).append("; ");
>     result.append("partitionId:
> ").append(partMetadata.partitionId()).append("; ");
>     result.append("offset: ").append(offsets[0]).append("; ");
>     result.append("offsetSize: ").append(offsets.length).append(";");
>
>     System.out.println(result.toString() + "\n");
> }
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message