kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From karan alang <karan.al...@gmail.com>
Subject Re: Kafka Streams - not able to see the text in console consumer
Date Mon, 18 Sep 2017 22:17:20 GMT
Thanks, Vito .. that worked !


On Sun, Sep 17, 2017 at 9:02 PM, 鄭紹志 <vito@is-land.com.tw> wrote:

> Hi, Karan,
>
> It looks like you need to add a property 'value.deserializer' to
> kafka-console-consumer.sh.
>
> For example:
> $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
> kstreams4 --from-beginning \
>     --property print.key=true \
>     --property print.value=true \
>     --property
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
> \
>     --property
> value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
>
>
> Vito
>
>
>
>
>
> ----------
> 鄭紹志 Vito Jeng
> 亦思科技股份有限公司 研究發展處
> TEL: 03-5630345 Ext.16
>
> On Mon, Sep 18, 2017 at 10:58 AM, karan alang <karan.alang@gmail.com>
> wrote:
>
> > Hello All
> > - i've a basic word count Kafka streams code, which reads data from the
> > input topic, splits the data (per the separator) and outputs the data
> into
> > the output topic.
> >
> > I've a console producer, which is putting data into the input topic
> > (kstreams3),
> > and a console consumer which is reading the (split) data from output
> > topic(kstreams4)
> >
> > The code seems to be working fine,
> > but on the console consumer, i'm Not able to see the text.
> > It seems to be printing "blanks", instead of the actual text ..
> > (though, console consumer seems to be consuming the correct number of
> > lines)
> >
> > Any pointers on what the issue might be ?
> >
> > Code ->
> >
> > Properties props = new Properties();
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > > "streamswordcount-application");
> > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> > > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass().getName());
> > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass().getName());
> > >
> > > KStreamBuilder builder = new KStreamBuilder();
> > > KStream<String, String> textLines = builder.stream("kstreams3");
> > >
> > > KTable<String, Long> wordCounts = textLines
> > > .flatMapValues(textLine ->
> > > Arrays.asList(textLine.toLowerCase().split("\\W+")))
> > > .groupBy((key, word)-> word)
> > > .count("Counts");
> > >
> > > System.out.println(" Kstreams - wordCount " + wordCounts);
> > >
> > > wordCounts.to(Serdes.String(),Serdes.Long(),"kstreams4");
> > >
> > > KafkaStreams streams = new KafkaStreams(builder, props);
> > > streams.start();
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message