kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Noll <mich...@confluent.io>
Subject Re: Printing to stdin from KStreams?
Date Fri, 07 Oct 2016 15:05:25 GMT
> Is it possible to have kafka-streams-reset be automatically called during
> development? Something like streams.cleanUp() but which also does reset?

Unfortunately this isn't possible (yet), Ali.  I am also not aware of any
plan to add such a feature in the short-term.



On Fri, Oct 7, 2016 at 1:36 PM, Ali Akhtar <ali.rac200@gmail.com> wrote:

> Is it possible to have kafka-streams-reset be automatically called during
> development? Something like streams.cleanUp() but which also does reset?
>
> On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll <michael@confluent.io> wrote:
>
> > Ali,
> >
> > adding to what Matthias said:
> >
> > Kafka 0.10 changed the message format to add so-called "embedded
> > timestamps" into each Kafka message.  The Java producer included in Kafka
> > 0.10 includes such embedded timestamps into any generated message as
> > expected.
> >
> > However, other clients (like the go kafka plugin you are using) may not
> > have been updated yet to be compatible with the new 0.10 message format.
> > That's the root cause why see these "-1" negative timestamps.   (The same
> > negative timestamp problem also happens if you attempt to read messages
> > that were generated with pre-0.10 versions of Kafka's Java producer.)
> >
> > FYI: Kafka Streams' default timestamp extractor attempts to read those
> new
> > embedded timestamps.  If there are no such embedded timestamps, you run
> > into these "negative timestamps" errors.
> >
> > Now, how to fix your problem?
> >
> > - Fix the root cause: Check if there's a newer version of your Go kafka
> > plugin that generates messages in the new Kafka 0.10 format.  If there is
> > no such version, ask the maintainers for an update. :-)
> >
> > - Work around the problem:  As Matthias said, you can also tell Kafka
> > Streams to not use its default timestamp extractor.  You can fallback to
> > the WallclockTimestampExtractor, though this means your application will
> > not use event-time but processing-time when processing your data, which
> is
> > probably not what you want (but it does prevent the -1 timestamp errors).
> > If your data (generated by the go kafka plugin) *does* contain timestamp
> > information in the message payload, then the better option is to write a
> > custom timestamp extract that inspects each message, extracts the
> timestamp
> > from the payload, and returns it to Kafka Streams.  The Timestamp
> Extractor
> > section in [1] explains how to write a custom one and how to configure
> your
> > app to use it.
> >
> > Hope this helps,
> > Michael
> >
> >
> >
> > [1]
> > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > html#optional-configuration-parameters
> >
> >
> >
> >
> >
> >
> > On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> > > -----BEGIN PGP SIGNED MESSAGE-----
> > > Hash: SHA512
> > >
> > > If you restart your application, it will resume where is left off
> > > (same as any other Kafka consumer that does use group management and
> > > commits offsets).
> > >
> > > If you want to reprocess data from scratch, you need to reset your
> > > application using bin/kafka-streams-application-reset.sh
> > >
> > > See also
> > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> html#application-
> > > reset-tool
> > >
> > > and
> > > http://www.confluent.io/blog/data-reprocessing-with-kafka-
> streams-resett
> > > ing-a-streams-application/
> > >
> > >
> > > About the timestamp issue: it seems that your Go client does not
> > > assign valid timestamps when writing the data. As you already said,
> > > you need to provide a custom TimestampExtractor (or you
> > > WallclockTimestampExtractor if semantic permit) instead of default
> > > ConsumerRecordTimestampExtractor)
> > >
> > >
> > > - -Matthias
> > >
> > > On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > > > Thanks.
> > > >
> > > > I'm encountering a strange issue.
> > > >
> > > > If I create messages thru console-producer.sh on a new topic,
> > > > things work fine.
> > > >
> > > > But on the topic that I need to consume, the messages are being
> > > > produced via the go kafka plugin.
> > > >
> > > > On this topic, at first, nothing happens when the stream starts
> > > > (i.e it doesn't process the messages which are already in there)
> > > >
> > > > Then, if I produce new messages, then my exception handler is
> > > > called with the exception that timestamp is negative.
> > > >
> > > > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> > > >
> > > > I'm going to write a new timestamp extractor, but any ideas why
> > > > nothing happens with the old messages which are in the topic, it
> > > > only responds if i push new messages to this topic?
> > > >
> > > > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> > > > <matthias@confluent.io> wrote:
> > > >
> > > > Sure.
> > > >
> > > > Just use #print() or #writeAsText()
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 10/6/16 6:25 PM, Ali Akhtar wrote:
> > > >>>> What the subject says. For dev, it would be a lot easier if
> > > >>>> debugging info can be printed to stdin instead of another
> > > >>>> topic, where it will persist.
> > > >>>>
> > > >>>> Any ideas if this is possible?
> > > >>>>
> > > >>
> > > >
> > > -----BEGIN PGP SIGNATURE-----
> > > Comment: GPGTools - https://gpgtools.org
> > >
> > > iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
> > > 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
> > > JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
> > > qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
> > > 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
> > > AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
> > > Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
> > > UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
> > > NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
> > > OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
> > > gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
> > > ZBA0KEjx9U8NNf+eiqN5
> > > =UMbs
> > > -----END PGP SIGNATURE-----
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message