kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Why do I get an IllegalStateException when accessing record metadata?
Date Thu, 03 Jan 2019 20:50:40 GMT
I see.

When updating the FAQ, it should be clear what you mean. Your current
proposal was unclear to me, and thus, it might be unclear to other
users, too.


On 1/3/19 9:13 PM, Eric Lalonde wrote:
>> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <matthias@confluent.io
>> <mailto:matthias@confluent.io>> wrote:
>> Thanks for reporting this. Feel free to edit the Wiki with the FAQ
>> directly.
>> What is unclear to me: what do you mean by "the state store [...] was
>> errantly scoped to the TransformerProvider, not the Transformer" ?
>> I would like to understand the actual issue.
> See this
> gist: https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4
> Because MyStore is declared in the parent supplier, it will be shared
> across tasks, even though the .get() function is instantiating a new
> MyTransformer() for each task. It should have been declared in the
> MyTransformer sub-class (say, around, line 15).
>> -Matthias
>> On 12/31/18 2:36 AM, Eric Lalonde wrote:
>>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
>>> application was exhibiting all of the behavior as discussed in the FAQ
>>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the
>>> section:
>>> Why do I get an IllegalStateException when accessing record metadata? 
>>> However, the root cause of my problem was not a due to the lack of
>>> instantiating a new transformer for each task. The root cause of my
>>> mistake was a bit more insidious: the state store that I instantiated
>>> was errantly scoped to the TransformerProvider, not the Transformer. Of
>>> course when I finally realized my error, the problem was obvious, but it
>>> was not immediately obvious. May I suggest extending the FAQ to help
>>> others as well? Perhaps it would be helpful to extend the aforementioned
>>> FAQ section in the following way (highlighted text is my addition):
>>>    Why do I get an IllegalStateException when accessing record metadata? 
>>>    If you attach a new |Processor/Transformer/ValueTransformer| to your
>>>    topology using a corresponding supplier, you need to make sure that
>>>    the supplier returns a /new/ instance each time |get()| is called.
>>>    If you return the same object, a
>>>    single |Processor/Transformer/ValueTransformer| would be shared over
>>>    multiple tasks resulting in an |IllegalStateException| with error
>>>    message |"This should not happen as topic() should only be called
>>>    while a record is processed"| (depending on the method you are
>>>    calling it could also be |partition()|, |offset()|,
>>>    or |timestamp()| instead of |topic()|). Additionally, all
>>>    instantiated state stores must be scoped to the inner
>>>    Processor/Transformer/ValueTransformer class, and not to the parent
>>>    Provider class. Scoping state stores to the parent class will result
>>>    in state store re-use across tasks, which will also result
>>>    in IllegalStateExceptions.
>>> Hope this saves someone else from making the same mistake :)
>>> - Eric

View raw message