kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Offsets/Lags for global state stores not shown
Date Sun, 18 Nov 2018 19:21:26 GMT
Because each instance needs to consume all data, it's limited by what a
single instance can consume -- a hard bound is the network. Note,
network is shared, so don't take the maximum network speed into account.
Also, it's not the number of unique messaged, but the number of updates
that is important for this.


> Just to verify, for this IQ setup (streams app which only builds a single
> table to be queried) we have tried the alternative approach to use a normal
> KTable in combination with a unique application ID per application instance.
> This seemed to work quite well, including faster (parallel) startup etc.
> Is this approach valid or would you expect some pitfalls?


I guess, for your use case, this might be ok. There is one difference on
startup: if there is no local state build up, in the GlobalKTable case,
before you can start querying, the GlobalKTable will be fully populated
from the topic. For the KTable case, you can query from the very
beginning on, while data is put into the table.

Also, for this approach, if you add other processing, this processing
would not be parallelized but duplicated.


-Matthias



On 11/7/18 1:32 AM, Patrik Kleindl wrote:
> Thanks for the response.
> How "low" is the expected low throughput? We are are using GlobalKTables
> for IQ on several Topics, but with single-digit million unique messages and
> usually fewer changes per day.
> 
> Just to verify, for this IQ setup (streams app which only builds a single
> table to be queried) we have tried the alternative approach to use a normal
> KTable in combination with a unique application ID per application instance.
> This seemed to work quite well, including faster (parallel) startup etc.
> Is this approach valid or would you expect some pitfalls?
> 
> We have not used this approach more because it doesn't not work for global
> stores inside a streams application, but it might be beneficial to split
> that up again.
> 
> best regards
> 
> Patrik
> 
> On Tue, 6 Nov 2018 at 20:07, Matthias J. Sax <matthias@confluent.io> wrote:
> 
>> The topics of global stores are not included by design.
>>
>> The "problem" is, that each instance needs to consume *all*
>> topic-partitions from and thus topis, we thus they cannot be include
>> into the consumer group that would assign each partition to exactly one
>> instance. Hence, an additional consumer is used that uses partition
>> assignment (instead of subscription) and this consumer does not commit
>> any offset to Kafka.
>>
>> Note that global stores are bootstrapped before processing begins
>> though, and are expected to be low throughput topic anyway.
>>
>>
>> -Matthias
>>
>> On 11/6/18 2:03 AM, Patrik Kleindl wrote:
>>> Hello
>>>
>>> Am I doing something wrong or is it by design that global state stores
>> and
>>> their consumers do not show up under the consumer-groups?
>>> With the consumer group command (and in control center as well) I don't
>> get
>>> any output for the group:
>>> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
>>> --describe
>>> Note: This will not show information about old Zookeeper-based consumers.
>>>
>>> If I query for the state I get a response that members are present:
>>> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
>>> --describe --state
>>> Note: This will not show information about old Zookeeper-based consumers.
>>>
>>> COORDINATOR (ID)                 ASSIGNMENT-STRATEGY
>>> STATE                #MEMBERS
>>> broker:9092 (1) stream                    Stable               2
>>>
>>> This is quite irritating as we cannot see if a global state store has
>>> caught up with a backlog of messages.
>>>
>>> Code to reproduce:
>>>         builder.globalTable(TOPIC_NAME, Materialized
>>>                 .<String, String, KeyValueStore<Bytes,
>> byte[]>>as(STORENAME)
>>>                 .withKeySerde(Serdes.String())
>>>                 .withValueSerde(Serdes.String()));
>>>
>>> Nothing fancy.
>>>
>>> Logs:
>>> 2018-11-05 21:25:56 INFO  AbstractCoordinator:442 - (Re-)joining group
>>> 2018-11-05 21:25:56 INFO  StreamPartitionAssignor:481 - Assigned tasks to
>>> clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
>>> standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
>>> prevAssignedTasks: ([]) capacity: 1]}.
>>> 2018-11-05 21:25:56 WARN  ConsumerCoordinator:376 - The following
>>> subscribed topics are not assigned to any members: [storetopic]
>>> 2018-11-05 21:25:56 INFO  AbstractCoordinator:409 - Successfully joined
>>> group with generation 3
>>> 2018-11-05 21:25:56 INFO  ConsumerCoordinator:256 - Setting newly
>> assigned
>>> partitions []
>>>
>>> The store works after this, but it is not shown.
>>>
>>> Any input is appreciated
>>>
>>> best regards
>>>
>>> Patrik
>>>
>>> PS: The customer will forward this to the Confluent support too, but I'm
>>> asking here for public visibility
>>>
>>
>>
> 


Mime
View raw message