kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Furkan KAMACI <furkankam...@gmail.com>
Subject Re: Kafka Streams Error
Date Thu, 03 Nov 2016 11:36:09 GMT
Hi Matthias,

Thanks for the response. I stream output as follows:

        longCounts.toStream((wk, v) -> wk.key())
                .to(Serdes.String(),
                        Serdes.Long(),
                        "qps-aggregated");

I want to read last value from that topic at another application. I've
tried that:

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "qps-consumer"); *//I'dont know the real
purpose of this setting*
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
"org.apache.kafka.common.serialization.LongDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("qps-aggregated"));
        ConsumerRecords<String, String> records = consumer.poll(1);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Connected! offset = %d, key = %s, value =
%s", record.offset(), record.key(), record.value());
        }

I can see that there is data when I check the streamed topic
(qps-aggregated) from command line. However, I cannot get any result from
that subscription via my application. What can be the reason?

Kind Regards,
Furkan KAMACI

On Wed, Nov 2, 2016 at 10:58 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hi,
>
> first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up you
> application for the first time. If you start it a second time, it will
> resume from where is left off.
>
> About getting numbers starting from zero: this is expected behavior
> because streams **updates** the window computation each time an input
> record is added to the window. So you see each intermediate result.
>
> Furthermore, each time a new window is created, you will see a "1"
> again in the output as this is the current count of the new window. If
> you want do distinguish windows in the output, you need to look at the
> key. It encode the original record-key as well as a window ID.
>
>
> - -Matthias
>
> On 11/2/16 12:13 PM, Furkan KAMACI wrote:
> > I use Kafka 0.10.0.1. I count the messages of a topic as follows:
> >
> > ...
> > streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > "earliest"); ... KStream<String, String> longs =
> > builder.stream(Serdes.String(), Serdes.String(), "qps-input"); ...
> > KTable<Windowed<String>, Long> longCounts =
> > longs.countByKey(TimeWindows.of("qps", 3600 * 1000),
> > Serdes.String()); ...
> >
> > and then I write output to another topic. Result is that:
> >
> > Numbers which starts from 1 and increase whenever I add something
> > to qps-input.
> >
> > My questions:
> >
> > 1) Does it calculate really last hour or everything from the
> > beginning due you I've set it as earliest?
> >
> > 2) Sometimes it's been reset and numbers starts from 1. What can be
> > the reason for that?
> >
> > Kind Regards, Furkan KAMACI
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYGlN3AAoJECnhiMLycopPzesP/jo9pX7hM03WeXEMvsGLpUgz
> N0/vqH9roEQOT/LoZacwV62CYZ7UvITU/G7hLymp9s8Q1g3+7phdc9OPI2Vy2WFT
> RgpK3WYVYK7lKOZiE8i/n/Ibu9H2SJAYBdkyse1RsuMGACLEuOoASV6P67QZKIGI
> Cw9Eq5IQDLBPpWoeUfofWIJtFEFF4DtT52zY7CFryKsRngWDZtBcGcqt0mqUrVM6
> vvlCuRsxB/1/n/IzmCF3JqmSL7TSsNrSu2ULKgG0K/+71SxPpzNhLZSlAs92zQH+
> APPWgu4s0Kq4IIzje6eQiny82354zg0E3xbVTC+Ra3o0PEX/skKUdlcj1GA1Yvf8
> sFaGDzXjrhQa9ZmCPYSDyveZRlUKmP6QGdPJro+EIKnOv4VTxsF9LPiiQzDds/sc
> bMjCRP+kZdFpow9IcjsLGo39Cu2mVCg7ChbaGVnvVaZ8pZuPdASTbLhWeUPXNhjv
> XPEkxqPFexdRL38idWh0CcWv++Dr2Dvbu2lRBDc9SPqRcgzF51pmAmau/TW3WV+J
> 8iVL+OH0TRhRx+L3Ie3tiahInXvf7Fwwwmc1fJASeN54zhhJnU8vSVYA0JDX0+N8
> BPVnSoIdHEnCmlFNm1vxxcCk65Fjug+AZQpHCmZzepHTg6LcdNHR9TH9iaTrvjr1
> 6gi7YNmGkeE+jzTf/YC9
> =Vq3G
> -----END PGP SIGNATURE-----
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message