kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Craig Ching <craigch...@gmail.com>
Subject Re: Debugging message timestamps in Sarama
Date Tue, 24 Jul 2018 00:26:14 GMT
Hi Dmitry,

Are you associated with the Sarama project?  If so, understand that part of
what I want is to learn about Sarama and the Kafka message format ;)

The problem I'm having is that if I turn on:

log.message.timestamp.type=LogAppendTime

in the broker, then produce on topic1 with console producer, I will see
timestamps in the sarama client.  If I produce on topic2 with telegraf
(incidentally, I think telegraf is a sarama producer), then I don't see
timestamps in the sarama client.  In both cases, if I consume using the
console consumer (with --property print.timestamp=true) I *do* see
timestamps.

I'm happy to debug this issue myself and submit a PR to sarama, but I am
missing some fundamentals of how to decode the kafka message format and
would really like some pointers.

Cheers,
Craig

P.S.  Here is the sarama code I'm using to test:

package main

import (
"fmt"
"log"
"os"
"os/signal"
"time"

"github.com/Shopify/sarama"
)

func main() {

// Initialize Sarama logging
sarama.Logger = log.New(os.Stdout, "[Sarama] ",
log.Ldate|log.Lmicroseconds|log.Lshortfile)

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.ClientID = "consumer-test"
config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
config.Metadata.Full = true
// config.Version = sarama.V0_10_0_0
config.Version = sarama.V1_1_0_0
// config.Version = sarama.V0_10_2_1
config.Consumer.Offsets.Initial = sarama.OffsetOldest

brokers := []string{"localhost:9092"}
// brokers :=
[]string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}

client, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}

// topic := "topic1"
topic := "topic2"
// topic := "metric-influx-measurement"
// How to decide partition, is it fixed value...?
consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}

defer func() {
if err := client.Close(); err != nil {
panic(err)
}
}()

// Count how many message processed
msgCount := 0

go func() {
for {
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
msgCount++
fmt.Println(msg.Timestamp)
fmt.Println("Received messages", string(msg.Key), string(msg.Value))
case <-signals:
fmt.Println("Interrupt is detected")
break
}
}
}()
<-signals
}


On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <dvsekhvalnov@gmail.com>
wrote:

> Hey Craig,
>
> what exact problem you have with Sarama client?
>
> On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <craigching@gmail.com> wrote:
>
> > Hi!
> >
> > I'm working on debugging a problem with how message timestamps are
> handled
> > in the sarama client.  In some cases, the sarama client won't associate a
> > timestamp with a message while the kafka console consumer does.  I've
> found
> > the documentation on the message format here:
> >
> > https://kafka.apache.org/documentation/#messageformat
> >
> > But the information there is very sparse.  For instance, what are
> > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
> > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
> > timestamp I want.  Is there some state about the message that I need to
> > understand in order to have maxTimestamp be used?  Any further
> > documentation or guidance on this would be very helpful!
> >
> > On another note, I am trying to debug this through the scala/java console
> > consumer, but I'm having a hard time getting IntelliJ setup.  Is there
> > anything special or documentation I need to set this up for debugging?
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message