samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gaurav Agarwal <gauravagarw...@gmail.com>
Subject Re: Samza 0.14.1 not correctly handling OffsetOutOfRangeException exception?
Date Fri, 24 Aug 2018 09:08:46 GMT
Hi All,

By patching the samza codebase locally that this error goes away:
Patch involves changing the import for OffsetOutOfRangeException class in
file
*samza/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala*
to *import org.apache.kafka.common.errors.OffsetOutOfRangeException*

Can you please confirm if this change is good? And if so can a quick patch
release with it be made available?

Independently, does this release needs to be verified for any more such
similar errors (possibly due to change in class packages etc.)? Not trying
to cast aspersions on this release, but just trying to ask the next thing
that naturally comes to mind :-)

--
thanks,
gaurav


On Thu, Aug 23, 2018 at 7:06 PM Gaurav Agarwal <gauravagarwal4@gmail.com>
wrote:

> Few more notes (based on reading a similar thread from few days ago):
> - this exception is while initializing offset for the data topic partition
> (not samza's checkpoint topic/partition)
> - we have manually verified that due to some issue, kafka data-logs have
> rolled over and the earliest available offset is greater than what samza
> has in its checkpoint - and hence when samza is querying kafka with the
> offset it checkpointed last, it is seeing this error.
>
> Please let me know if more logs are required.
>
>
> On Thu, Aug 23, 2018 at 4:30 PM Gaurav Agarwal <gauravagarwal4@gmail.com>
> wrote:
>
>> Hi All,
>>
>> We are facing identical problem as described in thread
>> https://www.mail-archive.com/dev@samza.apache.org/msg06740.html
>>
>> Here - Samza is requesting for an Kafka partition offset that is too old
>> (i.e Kafka log has moved ahead). We are setting the property *consumer.auto.offset.reset
>> to smallest* and therefore expecting that Samza will reset its
>> checkpoint to earliest available partition offset in such a scenario. But
>> that is not happening  we are getting exceptions of this form continually:
>>
>> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
>> kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
>> Disconnecting from vrni-platform-release:9092
>> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
>> system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
>> 56443499 for topic and partition Topic3-0
>> WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
>> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
>> refreshing brokers for Topic3-0:
>> org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
>> offset is not within the range of offsets maintained by the server..
>> Retrying.
>>
>> *Version Details:*
>>
>> *Samza: 2.11-0.14.1*
>> *Kafka Client: 1.1.0 *
>> *Kafka Server: 1.1.0 Scala 2.11 *
>>
>>
>> Browsing through the code, it appears that GetOffset::isValidOffset
>> should be able to catch the exception OffsetOutOfRangeException and
>> convert it to a false value. But it appears that this not happening. Could
>> there be a mismatch in package of the Exception? This class is catching the
>> exception import kafka.common.OffsetOutOfRangeException, but from logs,
>> it appears that the package of this class different. Could this be the
>> reason?
>>
>>  def isValidOffset(consumer: DefaultFetchSimpleConsumer,
>>> topicAndPartition: TopicAndPartition, offset: String) = {
>>
>>     info("Validating offset %s for topic and partition %s" format
>>> (offset, topicAndPartition))
>>
>>     try {
>>
>>       val messages = consumer.defaultFetch((topicAndPartition,
>>> offset.toLong))
>>
>>       if (messages.hasError) {
>>
>>
>>> KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic,
>>> topicAndPartition.partition).exception())
>>
>>       }
>>
>>       info("Able to successfully read from offset %s for topic and
>>> partition %s. Using it to instantiate consumer." format (offset,
>>> topicAndPartition))
>>
>>       true
>>
>>     } catch {
>>
>>       case e: OffsetOutOfRangeException => false
>>
>>     }
>>
>>   }
>>
>>
>> Also, it Appears that BrokerProxy class - the caller of GetOffset would
>> print a log ("*It appears that...*") in case it gets a false value, but
>> it is not logging this line (indicating that some Exception generated in
>> GetOffset method is going uncaught and being propagated up):
>>
>>
>>   def addTopicPartition(tp: TopicAndPartition, nextOffset:
>>> Option[String]) = {
>>
>>     debug("Adding new topic and partition %s to queue for %s" format (tp,
>>> host))
>>
>>     if (nextOffsets.asJava.containsKey(tp)) {
>>
>>       toss("Already consuming TopicPartition %s" format tp)
>>
>>     }
>>
>>     val offset = if (nextOffset.isDefined &&
>>> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
>>
>>       nextOffset
>>
>>         .get
>>
>>         .toLong
>>
>>     } else {
>>
>>       warn("It appears that we received an invalid or empty offset %s for
>>> %s. Attempting to use Kafka's auto.offset.reset setting. This can result in
>>> data loss if processing continues." format (nextOffset, tp))
>>
>>       offsetGetter.getResetOffset(simpleConsumer, tp)
>>
>>     }
>>
>>     debug("Got offset %s for new topic and partition %s." format (offset,
>>> tp))
>>
>>     nextOffsets += tp -> offset
>>
>>     metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
>>
>>   }
>>
>>
>> *Could this be due to mismatch in Kafka client library version that we
>> are using? Is there are commended Kafka client version we should use with
>> Samza 0.14.1 (assuming that Kafka server is 1.x)?*
>> Any help regarding this will be greatly appreciated.
>>
>>
>> - -
>> thanks,
>> gaurav
>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message