kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Queryable state
Date Fri, 17 Nov 2017 04:29:24 GMT
Hello Boris,

The reason of this check is to make sure that the cluster metadata has been
updated at least once, meaning that the instance has gone through the
initialization phase of the rebalance and have received the assignment
information already. Before this phase, any metadata returned may be
incorrect.

To get around this scenario, you may need to wait on the state of the
running instance to go from Rebalancing to Running, which means it has been
stabilized with the global metadata. This can be done by passing in your
customized state change listener:

public void setStateListener(final KafkaStreams.StateListener listener)


Guozhang


On Mon, Nov 13, 2017 at 9:43 AM, Boris Lublinsky <
boris.lublinsky@lightbend.com> wrote:

> I have updated my queryable state example, based on
> https://github.com/confluentinc/examples/tree/3.2.x/kafka-
> streams/src/main/java/io/confluent/examples/streams/interactivequeries <
> https://github.com/confluentinc/examples/tree/3.2.x/kafka-
> streams/src/main/java/io/confluent/examples/streams/interactivequeries>
>
> To 1.0.0
> And now when I am trying trying to get instances for store when running on
> a local machine, I am getting an empty array, from
>
> public List<HostStoreInfo> streamsMetadataForStore(final  String store) {
>     // Get metadata for all of the instances of this Kafka Streams
> application hosting the store
>     final Collection<StreamsMetadata> metadata =
> streams.allMetadataForStore(store);
>     return mapInstancesToHostStoreInfo(metadata);
> }
> This is happening because in StreamsMetadata state,
>
> public synchronized Collection<StreamsMetadata>
> getAllMetadataForStore(final String storeName) {
>     Objects.requireNonNull(storeName, "storeName cannot be null");
>
>     if (!isInitialized()) {
>         return Collections.emptyList();
>     }
> IsInitialized method
>
> private boolean isInitialized() {
>     return clusterMetadata != null && !clusterMetadata.topics().isEmpty();
> }
> Check for the cluster, which is null
>
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
> https://www.lightbend.com/
>
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message