kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Akhtar <ali.rac...@gmail.com>
Subject Re: Printing to stdin from KStreams?
Date Fri, 07 Oct 2016 15:33:50 GMT
Thank you.

I've resolved this by adding a run config in Intellij for running
streams-reset, and using the same application id in all applications in
development (transparently reading the application id from environment
variables, so in my kubernetes config I can specify different app ids for
production)

On Fri, Oct 7, 2016 at 8:05 PM, Michael Noll <michael@confluent.io> wrote:

> > Is it possible to have kafka-streams-reset be automatically called during
> > development? Something like streams.cleanUp() but which also does reset?
>
> Unfortunately this isn't possible (yet), Ali.  I am also not aware of any
> plan to add such a feature in the short-term.
>
>
>
> On Fri, Oct 7, 2016 at 1:36 PM, Ali Akhtar <ali.rac200@gmail.com> wrote:
>
> > Is it possible to have kafka-streams-reset be automatically called during
> > development? Something like streams.cleanUp() but which also does reset?
> >
> > On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll <michael@confluent.io>
> wrote:
> >
> > > Ali,
> > >
> > > adding to what Matthias said:
> > >
> > > Kafka 0.10 changed the message format to add so-called "embedded
> > > timestamps" into each Kafka message.  The Java producer included in
> Kafka
> > > 0.10 includes such embedded timestamps into any generated message as
> > > expected.
> > >
> > > However, other clients (like the go kafka plugin you are using) may not
> > > have been updated yet to be compatible with the new 0.10 message
> format.
> > > That's the root cause why see these "-1" negative timestamps.   (The
> same
> > > negative timestamp problem also happens if you attempt to read messages
> > > that were generated with pre-0.10 versions of Kafka's Java producer.)
> > >
> > > FYI: Kafka Streams' default timestamp extractor attempts to read those
> > new
> > > embedded timestamps.  If there are no such embedded timestamps, you run
> > > into these "negative timestamps" errors.
> > >
> > > Now, how to fix your problem?
> > >
> > > - Fix the root cause: Check if there's a newer version of your Go kafka
> > > plugin that generates messages in the new Kafka 0.10 format.  If there
> is
> > > no such version, ask the maintainers for an update. :-)
> > >
> > > - Work around the problem:  As Matthias said, you can also tell Kafka
> > > Streams to not use its default timestamp extractor.  You can fallback
> to
> > > the WallclockTimestampExtractor, though this means your application
> will
> > > not use event-time but processing-time when processing your data, which
> > is
> > > probably not what you want (but it does prevent the -1 timestamp
> errors).
> > > If your data (generated by the go kafka plugin) *does* contain
> timestamp
> > > information in the message payload, then the better option is to write
> a
> > > custom timestamp extract that inspects each message, extracts the
> > timestamp
> > > from the payload, and returns it to Kafka Streams.  The Timestamp
> > Extractor
> > > section in [1] explains how to write a custom one and how to configure
> > your
> > > app to use it.
> > >
> > > Hope this helps,
> > > Michael
> > >
> > >
> > >
> > > [1]
> > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > > html#optional-configuration-parameters
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > > > -----BEGIN PGP SIGNED MESSAGE-----
> > > > Hash: SHA512
> > > >
> > > > If you restart your application, it will resume where is left off
> > > > (same as any other Kafka consumer that does use group management and
> > > > commits offsets).
> > > >
> > > > If you want to reprocess data from scratch, you need to reset your
> > > > application using bin/kafka-streams-application-reset.sh
> > > >
> > > > See also
> > > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > html#application-
> > > > reset-tool
> > > >
> > > > and
> > > > http://www.confluent.io/blog/data-reprocessing-with-kafka-
> > streams-resett
> > > > ing-a-streams-application/
> > > >
> > > >
> > > > About the timestamp issue: it seems that your Go client does not
> > > > assign valid timestamps when writing the data. As you already said,
> > > > you need to provide a custom TimestampExtractor (or you
> > > > WallclockTimestampExtractor if semantic permit) instead of default
> > > > ConsumerRecordTimestampExtractor)
> > > >
> > > >
> > > > - -Matthias
> > > >
> > > > On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > > > > Thanks.
> > > > >
> > > > > I'm encountering a strange issue.
> > > > >
> > > > > If I create messages thru console-producer.sh on a new topic,
> > > > > things work fine.
> > > > >
> > > > > But on the topic that I need to consume, the messages are being
> > > > > produced via the go kafka plugin.
> > > > >
> > > > > On this topic, at first, nothing happens when the stream starts
> > > > > (i.e it doesn't process the messages which are already in there)
> > > > >
> > > > > Then, if I produce new messages, then my exception handler is
> > > > > called with the exception that timestamp is negative.
> > > > >
> > > > > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> > > > >
> > > > > I'm going to write a new timestamp extractor, but any ideas why
> > > > > nothing happens with the old messages which are in the topic, it
> > > > > only responds if i push new messages to this topic?
> > > > >
> > > > > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> > > > > <matthias@confluent.io> wrote:
> > > > >
> > > > > Sure.
> > > > >
> > > > > Just use #print() or #writeAsText()
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 10/6/16 6:25 PM, Ali Akhtar wrote:
> > > > >>>> What the subject says. For dev, it would be a lot easier
if
> > > > >>>> debugging info can be printed to stdin instead of another
> > > > >>>> topic, where it will persist.
> > > > >>>>
> > > > >>>> Any ideas if this is possible?
> > > > >>>>
> > > > >>
> > > > >
> > > > -----BEGIN PGP SIGNATURE-----
> > > > Comment: GPGTools - https://gpgtools.org
> > > >
> > > > iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
> > > > 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
> > > > JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
> > > > qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
> > > > 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
> > > > AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
> > > > Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
> > > > UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
> > > > NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
> > > > OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
> > > > gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
> > > > ZBA0KEjx9U8NNf+eiqN5
> > > > =UMbs
> > > > -----END PGP SIGNATURE-----
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message