kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Emmett Butler <emm...@parsely.com>
Subject Re: [pykafka-user] Re: Timestamp based reset_offset fails with OffsetOutOfRangeError
Date Tue, 19 Jun 2018 21:42:31 GMT
Thanks! I ask because I think it's possible that only having a single log
segment (as your partition does) hamstrings the functionality of
reset_offsets(). I haven't verified this experimentally, but I think it's
possible. Maybe someone on the Kafka users group has insight.

On Tue, Jun 19, 2018 at 1:55 AM, Moe <volland.moritz@gmail.com> wrote:

> Thanks for getting back to me so speedily. To be frank, I do not know how
> log segments differ from logs.
>
> *# This is a view into my logs folder:*
>
>
> *# And this is the view into that folder for my offset_retrieval_test.001
> partition (the topic in question):*
>
>
> Please let me know if this isn't what you were asking for! The topic has
> been auto created so all settings are default (7 days retention if my
> memory serves me right, but I'm sure you know that better than me).
>
> I appreciate the time and effort,
> Moritz
>
> 2018-06-18 18:27 GMT+02:00 Emmett Butler <emmett@parsely.com>:
>
>> How many log segments does this partition have, and what are the
>> retention settings for the topic? You can find the log segments by looking
>> in the directory pointed to by the log.dirs server configuration.
>>
>> On Mon, Jun 18, 2018 at 7:43 AM, Moe <volland.moritz@gmail.com> wrote:
>>
>>> *# Sorry, I should have mentioned that the timestamp I use is  one I
>>> read from the server gui*
>>>
>>>
>>> Am Montag, 18. Juni 2018 16:39:50 UTC+2 schrieb Moe:
>>>>
>>>> *Hi,*
>>>>
>>>>
>>>> *I'm struggling to implement a time(stamp) based reset_offset and am
>>>> hoping someone here can show me the light. In essence when I call
>>>> reset_offset with a timestamp I get *
>>>>
>>>> *"OffsetOutOfRangeError" *
>>>>
>>>>
>>>> *# Setup:*
>>>> - Kafka:
>>>> Kafka==1.1.0 (also tried with 1.0.0) running in Docker
>>>> (Wurstmeister/Kafka image)
>>>>
>>>> - Consumer/Producer:
>>>> Ubuntu17.04
>>>> pykafka2.7.0
>>>> python3.6
>>>>
>>>>
>>>> *# Kafka Connection*
>>>> client = KafkaClient(hosts='10.25.42.127:9092', broker_version="1.0.0")
>>>> topic = client.topics['offset_retrieval_test.001'.encode()]
>>>>
>>>> *# Producer*
>>>> delivery_reports=False
>>>> auto_start=True
>>>> linger_ms=10.0
>>>>
>>>> ksp = topic.get_producer(delivery_reports=delivery_reports,linger_
>>>> ms=linger_ms,auto_start=auto_start)
>>>>
>>>> for i in range(100):
>>>>
>>>>     dict_obj = {
>>>>         'time': time.time(),
>>>>         'value': i
>>>>     }
>>>>
>>>>     data = json.dumps(dict_obj)
>>>>
>>>>     ksp.produce(data.encode())
>>>>     time.sleep(0.25)
>>>>
>>>> *# This results in data being written to my Kafka Server as follows:*
>>>>
>>>>
>>>> *# But when I call reset offset such as:*
>>>> offset_ts = int(time.mktime(time.strptime('2018-06-18 15:33:22',
>>>> '%Y-%m-%d %H:%M:%S'))*1000)
>>>> consumer.reset_offsets([(topic.partitions[0],offset_ts)])
>>>>
>>>> while True:
>>>>     message = consumer.consume()
>>>>     dict_obj = json.loads(message.value)
>>>>
>>>>     print(dict_obj)
>>>>
>>>> *# I get the below error and a reset to 0*
>>>>
>>>>
>>>> *# Potentially related; printing the message timestamp (as below)
>>>> results in 0 being printed*
>>>>
>>>> while True:
>>>>     message = consumer.consume()
>>>>     dict_obj = json.loads(message.value)
>>>>
>>>>     print(message.timestamp)
>>>>
>>>>
>>>> *Thanks a million,*
>>>> Moritz
>>>>
>>>>
>>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "pykafka-user" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to pykafka-user+unsubscribe@googlegroups.com.
>>> To post to this group, send email to pykafka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/pykafka-user.
>>> To view this discussion on the web visit https://groups.google.com/d/ms
>>> gid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40googlegroups.com
>>> <https://groups.google.com/d/msgid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40googlegroups.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Emmett Butler | Senior Software Engineer
>>
>> <http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature>
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "pykafka-user" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to pykafka-user+unsubscribe@googlegroups.com.
>> To post to this group, send email to pykafka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/pykafka-user.
>> To view this discussion on the web visit https://groups.google.com/d/ms
>> gid/pykafka-user/CAER3s9S5uQBO-R6bkkvfFWkRggZxa8zh07umd7Hr39
>> %3DEit0yDA%40mail.gmail.com
>> <https://groups.google.com/d/msgid/pykafka-user/CAER3s9S5uQBO-R6bkkvfFWkRggZxa8zh07umd7Hr39%3DEit0yDA%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>>
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>


-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature>

Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message