kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Akhtar <ali.rac...@gmail.com>
Subject Re: Printing to stdin from KStreams?
Date Fri, 07 Oct 2016 11:36:12 GMT
Is it possible to have kafka-streams-reset be automatically called during
development? Something like streams.cleanUp() but which also does reset?

On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll <michael@confluent.io> wrote:

> Ali,
>
> adding to what Matthias said:
>
> Kafka 0.10 changed the message format to add so-called "embedded
> timestamps" into each Kafka message.  The Java producer included in Kafka
> 0.10 includes such embedded timestamps into any generated message as
> expected.
>
> However, other clients (like the go kafka plugin you are using) may not
> have been updated yet to be compatible with the new 0.10 message format.
> That's the root cause why see these "-1" negative timestamps.   (The same
> negative timestamp problem also happens if you attempt to read messages
> that were generated with pre-0.10 versions of Kafka's Java producer.)
>
> FYI: Kafka Streams' default timestamp extractor attempts to read those new
> embedded timestamps.  If there are no such embedded timestamps, you run
> into these "negative timestamps" errors.
>
> Now, how to fix your problem?
>
> - Fix the root cause: Check if there's a newer version of your Go kafka
> plugin that generates messages in the new Kafka 0.10 format.  If there is
> no such version, ask the maintainers for an update. :-)
>
> - Work around the problem:  As Matthias said, you can also tell Kafka
> Streams to not use its default timestamp extractor.  You can fallback to
> the WallclockTimestampExtractor, though this means your application will
> not use event-time but processing-time when processing your data, which is
> probably not what you want (but it does prevent the -1 timestamp errors).
> If your data (generated by the go kafka plugin) *does* contain timestamp
> information in the message payload, then the better option is to write a
> custom timestamp extract that inspects each message, extracts the timestamp
> from the payload, and returns it to Kafka Streams.  The Timestamp Extractor
> section in [1] explains how to write a custom one and how to configure your
> app to use it.
>
> Hope this helps,
> Michael
>
>
>
> [1]
> http://docs.confluent.io/3.0.1/streams/developer-guide.
> html#optional-configuration-parameters
>
>
>
>
>
>
> On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA512
> >
> > If you restart your application, it will resume where is left off
> > (same as any other Kafka consumer that does use group management and
> > commits offsets).
> >
> > If you want to reprocess data from scratch, you need to reset your
> > application using bin/kafka-streams-application-reset.sh
> >
> > See also
> > http://docs.confluent.io/3.0.1/streams/developer-guide.html#application-
> > reset-tool
> >
> > and
> > http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resett
> > ing-a-streams-application/
> >
> >
> > About the timestamp issue: it seems that your Go client does not
> > assign valid timestamps when writing the data. As you already said,
> > you need to provide a custom TimestampExtractor (or you
> > WallclockTimestampExtractor if semantic permit) instead of default
> > ConsumerRecordTimestampExtractor)
> >
> >
> > - -Matthias
> >
> > On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > > Thanks.
> > >
> > > I'm encountering a strange issue.
> > >
> > > If I create messages thru console-producer.sh on a new topic,
> > > things work fine.
> > >
> > > But on the topic that I need to consume, the messages are being
> > > produced via the go kafka plugin.
> > >
> > > On this topic, at first, nothing happens when the stream starts
> > > (i.e it doesn't process the messages which are already in there)
> > >
> > > Then, if I produce new messages, then my exception handler is
> > > called with the exception that timestamp is negative.
> > >
> > > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> > >
> > > I'm going to write a new timestamp extractor, but any ideas why
> > > nothing happens with the old messages which are in the topic, it
> > > only responds if i push new messages to this topic?
> > >
> > > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> > > <matthias@confluent.io> wrote:
> > >
> > > Sure.
> > >
> > > Just use #print() or #writeAsText()
> > >
> > >
> > > -Matthias
> > >
> > > On 10/6/16 6:25 PM, Ali Akhtar wrote:
> > >>>> What the subject says. For dev, it would be a lot easier if
> > >>>> debugging info can be printed to stdin instead of another
> > >>>> topic, where it will persist.
> > >>>>
> > >>>> Any ideas if this is possible?
> > >>>>
> > >>
> > >
> > -----BEGIN PGP SIGNATURE-----
> > Comment: GPGTools - https://gpgtools.org
> >
> > iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
> > 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
> > JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
> > qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
> > 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
> > AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
> > Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
> > UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
> > NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
> > OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
> > gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
> > ZBA0KEjx9U8NNf+eiqN5
> > =UMbs
> > -----END PGP SIGNATURE-----
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message