kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <mj...@apache.org>
Subject Re: Get after put in stateStore returns null
Date Wed, 01 Apr 2020 21:52:34 GMT
That is expected behavior.

And yes, there is a `Transformer` instance per partition with it's own
store that holds one shard of the overall state. The reason is, that you
could run one KafkaStreams instance per partition on different
hosts/servers and thus, we need to have a `Transformer` and state-store
per partition.

It's also by design that `transform()` does not do auto-repartitioning
because it's Processor API integration, and when using the Processor API
it's the developers responsibility to reason about correct data
partitioning.


-Matthias

On 4/1/20 2:05 PM, Jan Bols wrote:
> Ok, Matthias,
> 
> thanks for the hint:
> *Even if any upstream operation was key-changing, no auto-repartition is
> triggered. If repartitioning is required, a call to through()
> <https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#through-java.lang.String->
> should be performed before flatTransformValues(). *
> 
> Of course, I didn't call *through* before calling the transformer. As a
> result some calls where being processed by another instance of the
> transformer running on a different partition. Calling *store.get(key)* on
> an instance would then not return any value even though another instance
> did a *store.put(key, value)* before.  Is this expected behaviour? Is there
> a transformer for each partition and does it get its own state store?
> 
> Best regards
> 
> Jan
> 
> On Fri, Mar 27, 2020 at 12:59 AM Matthias J. Sax <mjsax@apache.org> wrote:
> 
>> Your code looks correct to me. If you write into the store, you should
>> also be able to read it back from the store.
>>
>> Can you reproduce the issue using `TopologyTestDriver`? How many
>> partitions does your input topic have? Is your stream partitioned by
>> key? Note that `transfrom()` does not do auto-repartitioning in contrast
>> to `groupByKey()`.
>>
>>
>> -Matthias
>>
>> On 3/25/20 3:49 AM, Jan Bols wrote:
>>> Hi all,
>>> I'm trying to aggregate a stream of messages and return a stream of
>>> aggregated results using kafka streams.
>>> At some point, depending on the incoming message, the old aggregate needs
>>> to be closed and a new aggregate needs to be created, just like a session
>>> that is closed due to some close event and at the same time a new session
>>> is started.
>>>
>>> For this I'm using transformValues where I store the result of an
>>> aggregation similar to how a groupByKey().aggregate() is done. When the
>> old
>>> session needs to be closed, it's sent first after the new value.
>>>
>>> The state store returns null for a given key at first retrieval and the
>> new
>>> aggregation result is stored under the same key.
>>> However, at the second pass, the value for the same key is still null
>> even
>>> though it has just been stored before.
>>>
>>> How can this be possible?
>>>
>>>
>>>
>>> I'm using transformValues in the following way:
>>>
>>> val storeName = "aggregateOverflow_binReportAgg"
>>> val store = Stores.keyValueStoreBuilder<K,
>>> V>(Stores.persistentKeyValueStore(storeName), serde.serde(),
>> serde.serde())
>>> streamsBuilder.addStateStore(store)
>>>
>>> ...
>>>
>>> stream
>>>    .flatTransformValues(ValueTransformerWithKeySupplier {
>>> AggregateOverflow(storeName, transformation) }, storeName)
>>>
>>>
>>> where AggregateOverflow gets the previous value from the state store,
>>> transforms the result into a AggregateOverflowResult.
>>> AggregateOverflowResult is a data class containing the current value and
>> an
>>> optional overflow value like this:
>>>
>>> data class AggregateOverflowResult<V>(val current: V, val overflow: V?)
>>>
>>> When the overflow value is not null, it's sent downstream first after the
>>> current value. In each case, the current result is stored in the
>> statestore
>>> for later retrieval like the following:
>>>
>>> class AggregateOverflow<K, V, VR : Any>(
>>>  private val storeName: String,
>>>  private val transformation: (K, V, VR?) ->
>> AggregateOverflowResult<VR>?) :
>>> ValueTransformerWithKey<K, V, Iterable<VR>> {
>>>  private val logger = KotlinLogging.logger{}
>>>  private lateinit var state: KeyValueStore<K, VR>
>>>
>>>  init {
>>>    logger.debug { "$storeName: created" }
>>>  }
>>>
>>>  override fun init(context: ProcessorContext) {
>>>    logger.debug { "$storeName: init called" }
>>>    this.state = context.getStateStore(storeName) as KeyValueStore<K, VR>;
>>>  }
>>>
>>>  override fun transform(key: K, value: V): Iterable<VR> {
>>>    val acc = state.get(key)
>>>    if (acc == null) logger.debug { "$storeName: Found empty value for
>> $key"
>>> }
>>>    val result = transformation(key, value, acc)
>>>    state.put(key, result?.current)
>>>    logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate
>>> old: $acc\n aggregate new: $result" }
>>>    return listOfNotNull(result?.overflow, result?.current) //prevAcc will
>>> be forwarded first if not null
>>>  }
>>>
>>>  override fun close() {
>>>    logger.debug { "$storeName: close called" }
>>>  }
>>> }
>>>
>>> In the log file you can see that the first invocation is returning an
>> empty
>>> value for the given key, you can also see that the new value is being
>>> serialized in the store.
>>> At the second invocation a few seconds later, the value for the same key
>> is
>>> still null.
>>>
>>> Any idea's why this is?
>>> Best regards
>>> Jan
>>>
>>
>>
> 


Mime
View raw message