kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Doing something wrong
Date Thu, 09 May 2019 13:35:18 GMT
I am not familiar with Spring Boot. But in general, you could query the
store to see if the counts are as expected:
https://kafka.apache.org/22/documentation/streams/developer-guide/interactive-queries.html

As an alternative, you could inspect either the store changelog topic,
or get a stream from the table

 .count().toStream().print(...); // for development

Instead of `print()` you could also use `peek()` or `foreach()`.

You could also write the result stream into an output topic via `to()`


-Matthias


On 5/9/19 1:23 AM, Pavel Molchanov wrote:
> Hi,
> 
> I created a simple Spring Boot Application with Kafka and added a
> dependency from Kafka Streams. The application can send and receive
> messages and works fine.
> 
> In the same app, I want to use Kafka Streams to calculate statistic about
> the topic.
> 
> I wrote the following code from the word count example:
> 
> @Service
> public class StartupService {
> 
> @Value(value = "${kafka.events.topic.name}")
> private String eventsTopicName;
> 
> @Value(value = "${kafka.bootstrap.servers}")
> private String bootstrapAddress;
> 
> @PostConstruct
> public void init() {
> Properties streamsConfiguration = new Properties();
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
> System.getProperty("java.io.tmpdir")+"\\count");
> StreamsBuilder builder = new StreamsBuilder();
> KStream<String, String> textLines = builder.stream(eventsTopicName);
> Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
> 
> KTable<String, Long> wordCounts = textLines
>   .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
>   .groupBy((key, word) -> word)
>   .count();
> Topology topology = builder.build();
> KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
> Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) ->
> {
>   System.out.println("Exception in thread:" + thread.getId()  + ",
> Message:" + throwable.getMessage());
> });
> streams.cleanUp();
> streams.start();
> try {
> Thread.sleep(20000);
> } catch (InterruptedException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
>                 // Check the result here:
> 
> }
> 
> }
> 
> How can I check the result of the calculation? The last lines in the log
> file are:
> 
> 2019-05-08 19:19:36.091  INFO 18380 --- [-StreamThread-1]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
> groupId=test] (Re-)joining group
> 2019-05-08 19:19:36.163  INFO 18380 --- [-StreamThread-1]
> o.a.k.s.p.i.StreamsPartitionAssignor     : stream-thread
> [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer]
> Assigned tasks to clients as
> {a2d0314e-c872-469e-b34c-c08e2fdf422a=[activeTasks: ([0_0, 1_0])
> standbyTasks: ([]) assignedTasks: ([0_0, 1_0]) prevActiveTasks: ([])
> prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
> 2019-05-08 19:19:36.205  INFO 18380 --- [-StreamThread-1]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
> groupId=test] Successfully joined group with generation 21
> 2019-05-08 19:19:36.209  INFO 18380 --- [-StreamThread-1]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
> clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
> groupId=test] Setting newly assigned partitions [events-0,
> test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-0]
> 2019-05-08 19:19:36.209  INFO 18380 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : stream-thread
> [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition
> from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> 2019-05-08 19:19:36.234  INFO 18380 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : stream-thread
> [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] partition
> assignment took 25 ms.
> current active tasks: [0_0, 1_0]
> current standby tasks: []
> previous active tasks: []
> 
> 2019-05-08 19:19:36.635  INFO 18380 --- [-StreamThread-1]
> org.apache.kafka.clients.Metadata        : Cluster ID:
> I6GfqZSORWGKqPHE7zA_cQ
> 2019-05-08 19:19:36.659  INFO 18380 --- [-StreamThread-1]
> o.a.k.s.p.i.StoreChangelogReader         : stream-thread
> [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] Restoring task
> 1_0's state store KSTREAM-AGGREGATE-STATE-STORE-0000000003 from beginning
> of the changelog test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-0
> 2019-05-08 19:19:36.668  INFO 18380 --- [-StreamThread-1]
> o.a.k.c.consumer.internals.Fetcher       : [Consumer
> clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-restore-consumer,
> groupId=] Resetting offset for partition
> test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-0 to offset 0.
> 2019-05-08 19:19:36.852  INFO 18380 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : stream-thread
> [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition
> from PARTITIONS_ASSIGNED to RUNNING
> 2019-05-08 19:19:36.853  INFO 18380 --- [-StreamThread-1]
> org.apache.kafka.streams.KafkaStreams    : stream-client
> [test-a2d0314e-c872-469e-b34c-c08e2fdf422a] State transition from
> REBALANCING to RUNNING
> 
> It's staying in the RUNNING state and doesn't move forward. What I am doing
> wrong?
> 
> 
> 
> *Pavel Molchanov*
> 


Mime
View raw message