kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: Consuming a state store (KTable) basics - 1.0.0
Date Thu, 07 Dec 2017 19:05:10 GMT
Hi Peter,

glad it helped,

these are the preferred ways indeed.




On 07.12.2017 15:58, Peter Figliozzi wrote:
> Thanks Jan, super helpful!  To summarize (I hope I've got it right), there
> are only two ways for external applications to access data derived from a
> KTable:
>
> 1.  Inside the streams application that builds the KTable, create a
> KafkaStreams.store and expose to the outside via a service.
>
> 2.  Convert the KTable to a stream and write to a new Kafka topic.  Then
> external apps can just consume this feed.  If we only care about the latest
> updates, make the topic log-compacted.
latest value per key or last updated might be a different story here,
in the end there is a lot of flexibility here that everyone is free to 
explore

Best Jan

>
> Thanks,
>
> Pete
>
> On Thu, Dec 7, 2017 at 1:42 AM, Jan Filipiak <Jan.Filipiak@trivago.com>
> wrote:
>
>> Hi,
>>
>> you should be able to retrieve your store with
>>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/
>> java/org/apache/kafka/streams/KafkaStreams.java#L1021
>>
>> This would give you access to the store from inside your current
>> application. In your Streams application your could then
>> expose this Store with a say REST or any other RPC interface, to let
>> applications from outside your JVM query it.
>>
>> So i would say the blogpost still applies quite well.
>>
>> Hope this helps
>>
>> Best Jan
>>
>>
>> On 07.12.2017 04:59, Peter Figliozzi wrote:
>>
>>> I've written a Streams application which creates a KTable like this:
>>>
>>> val myTable: KTable[String, GenericRecord] = myStream
>>>       .groupByKey()
>>>       .aggregate(myInitializer, myAdder, myStore)
>>>
>>> where myStore was configured like this:
>>>
>>> val myStore
>>>       : Materialized[String, GenericRecord, KeyValueStore[Bytes,
>>> Array[Byte]]] =
>>>       Materialized
>>>         .as("my-store")
>>>         .withKeySerde(Serdes.String())
>>>         .withValueSerde(genericValueSerde)
>>>
>>> What I'd like to do now is query (read) this store from a separate
>>> application.  How do I query it in 1.0.0?  With a KTable constructor,
>>> using
>>> the store string as the topic, i.e.:
>>>
>>> public <K,V> KTable<K,V> table(
>>> java.lang.String topic,
>>> Materialized<K,V,KeyValueStore<org.apache.kafka.common.
>>> utils.Bytes,byte[]>>
>>> materialized)
>>>
>>> Or some other way?
>>>
>>> I saw this blog post
>>> <https://blog.codecentric.de/en/2017/03/interactive-queries-
>>> in-apache-kafka-streams/>
>>> but it appears to be only applicable to the older version of Kafka (please
>>> correct me if I'm wrong).
>>>
>>> Thanks,
>>>
>>> Pete
>>>
>>>


Mime
View raw message