kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debasish Ghosh <ghosh.debas...@gmail.com>
Subject Re: IllegalStateException with custom state store ..
Date Mon, 03 Jul 2017 10:14:24 GMT
That exception is gone .. Thanks for the suggestion.

I followed the example from
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala#L258
..

regards.

On Mon, Jul 3, 2017 at 3:23 PM, Damian Guy <damian.guy@gmail.com> wrote:

> Remove the` logChange` from `flush` and do it when you write to the store.
>
> i.e, in the BFStore + function
>
>
> On Mon, 3 Jul 2017 at 10:43, Debasish Ghosh <ghosh.debasish@gmail.com>
> wrote:
>
>> Ok, so I make the following change .. Is this the change that u suggested
>> ?
>>
>> // remove commit from process(). So process now looks as follows:
>> override def process(dummy: String, record: String): Unit =
>> LogParseUtil.parseLine(record) match {
>>   case Success(r) => {
>>     bfStore + r.host
>>     bfStore.flush()
>>   }
>>   case Failure(ex) => throw ex
>> }
>>
>> Still I get the same exception. Just as a test, I removed the flush as
>> well from process() ..
>>
>> override def process(dummy: String, record: String): Unit =
>> LogParseUtil.parseLine(record) match {
>>   case Success(r) => {
>>     bfStore + r.host
>>   }
>>   case Failure(ex) => throw ex
>> }
>>
>> and still get the same exception as it does call flush after commit from
>> within .. here's the trace ..
>>
>> Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException:
>> task [0_0] Failed to flush state store log-counts
>> at org.apache.kafka.streams.processor.internals.
>> ProcessorStateManager.flush(ProcessorStateManager.java:337)
>> at org.apache.kafka.streams.processor.internals.
>> StreamTask$1.run(StreamTask.java:72)
>> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:188)
>> at org.apache.kafka.streams.processor.internals.
>> StreamTask.commit(StreamTask.java:280)
>> at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
>> StreamThread.java:807)
>> at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
>> StreamThread.java:794)
>> at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
>> StreamThread.java:769)
>> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:647)
>> at org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>> Caused by: java.lang.IllegalStateException: This should not happen as
>> timestamp() should only be called while a record is processed
>> at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
>> timestamp(AbstractProcessorContext.java:150)
>> at com.lightbend.fdp.sample.kstream.processor.
>> BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:25)
>> at com.lightbend.fdp.sample.kstream.processor.BFStore.
>> flush(BFStore.scala:89)
>> at org.apache.kafka.streams.processor.internals.
>> ProcessorStateManager.flush(ProcessorStateManager.java:335)
>>
>> regards.
>>
>>
>>
>> On Mon, Jul 3, 2017 at 2:55 PM, Damian Guy <damian.guy@gmail.com> wrote:
>>
>>> `commit` is called by streams, you can see it in your stack trace above:
>>>
>>> > org.apache.kafka.streams.processor.internals.
>>> StreamTask.commit(StreamTask.java:280)
>>>
>>> `commit` will subsequently call `flush` on any stores. At this point,
>>> though, there will be no `RecordContext` as there are no records being
>>> processed. Note, that calling `context.commit()` from your Processor isn't
>>> actually performing the commit, it is just signalling that a commit is
>>> necessary after this record has been processed. You may not want to do that
>>> as it probably will impact throughput.
>>>
>>> You should log the change when you write to the store, i.e, i think when
>>> you do:
>>> bfStore + r.host
>>>
>>>
>>> Does that help?
>>>
>>> Thanks,
>>> Damian
>>>
>>>
>>> On Mon, 3 Jul 2017 at 10:12 Debasish Ghosh <ghosh.debasish@gmail.com>
>>> wrote:
>>>
>>>> The only place where I am doing commit is from Processor.process() ..
>>>> Here it is ..
>>>>
>>>> class WeblogProcessor extends AbstractProcessor[String, String] {
>>>>   private var bfStore: BFStore[String] = _
>>>>
>>>>   override def init(context: ProcessorContext): Unit = {
>>>>     super.init(context)
>>>>     this.context.schedule(1000)
>>>>     bfStore = this.context.getStateStore(WeblogDriver.LOG_COUNT_STATE_
>>>> STORE).asInstanceOf[BFStore[String]]
>>>>   }
>>>>
>>>>   override def process(dummy: String, record: String): Unit =
>>>> LogParseUtil.parseLine(record) match {
>>>>     case Success(r) => {
>>>>       bfStore + r.host
>>>>       context.commit()
>>>>       context.forward(dummy, r.host)
>>>>     }
>>>>     case Failure(ex) => throw ex
>>>>   }
>>>>
>>>>   override def punctuate(timestamp: Long): Unit =
>>>> super.punctuate(timestamp)
>>>>   override def close(): Unit = {}
>>>> }
>>>>
>>>> The commit invokes the flush() of my Store. Here is the flush() method
>>>> of my store ..
>>>>
>>>> override def flush(): Unit = {
>>>>   if (loggingEnabled) {
>>>>     changeLogger.logChange(changelogKey, bf
>>>>   }
>>>> }
>>>>
>>>> which in turn calls logChange that gives the error.
>>>>
>>>> Am I missing something ?
>>>>
>>>> regards.
>>>>
>>>> On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy <damian.guy@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> It is because you are calling `context.timestamp` during `commit`. At
>>>>> this point there is no `RecordContext` associated with the
>>>>> `ProcessorContext`, hence the null pointer. The `RecordContext` is only
set
>>>>> when streams is processing a record. You probably want to log the change
>>>>> when you write to the store.
>>>>>
>>>>> Thanks,
>>>>> Damian
>>>>>
>>>>> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debasish@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Just to give some more information, the ProcessorContext that gets
>>>>>> passed
>>>>>> to the init method of the custom store has a null RecordContext.
Gave
>>>>>> the
>>>>>> following debug statement ..
>>>>>>
>>>>>> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
>>>>>>
>>>>>> and got null.
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <
>>>>>> ghosh.debasish@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi -
>>>>>> >
>>>>>> > I have implemented a custom state store named BFStore with a
change
>>>>>> > logger as follows:
>>>>>> >
>>>>>> > class BFStoreChangeLogger[K, V](val storeName: String,
>>>>>> >                                 val context: ProcessorContext,
>>>>>> >                                 val partition: Int,
>>>>>> >                                 val serialization: StateSerdes[K,
>>>>>> V]) {
>>>>>> >
>>>>>> >   private val topic = ProcessorStateManager.
>>>>>> storeChangelogTopic(context.applicationId,
>>>>>> > storeName)
>>>>>> >   private val collector = context.asInstanceOf[
>>>>>> RecordCollector.Supplier].
>>>>>> > recordCollector
>>>>>> >
>>>>>> >   def this(storeName: String, context: ProcessorContext,
>>>>>> serialization:
>>>>>> > StateSerdes[K, V]) {
>>>>>> >     this(storeName, context, context.taskId.partition,
>>>>>> serialization)
>>>>>> >   }
>>>>>> >
>>>>>> >   def logChange(key: K, value: V): Unit = {
>>>>>> >     if (collector != null) {
>>>>>> >       val keySerializer = serialization.keySerializer
>>>>>> >       val valueSerializer = serialization.valueSerializer
>>>>>> >       collector.send(this.topic, key, value, this.partition,
>>>>>> > context.timestamp, keySerializer, valueSerializer)  //**//
>>>>>> >     }
>>>>>> >   }
>>>>>> > }
>>>>>> >
>>>>>> > In my driver program I build the topology and start the streams
as
>>>>>> follows:
>>>>>> >
>>>>>> > val builder: TopologyBuilder = new TopologyBuilder()
>>>>>> >
>>>>>> > builder.addSource("Source", config.fromTopic)
>>>>>> >        .addProcessor("Process", () => new WeblogProcessor(),
>>>>>> "Source")
>>>>>> >        .addStateStore(new BFStoreSupplier[String](LOG_
>>>>>> COUNT_STATE_STORE,
>>>>>> > stringSerde, true, changelogConfig), "Process")
>>>>>> >        .addSink("Sink", "weblog-count-topic", "Process")
>>>>>> >
>>>>>> > val streams = new KafkaStreams(builder, streamingConfig)
>>>>>> > streams.start()
>>>>>> >
>>>>>> > When I run the program, immediately I get the following exception
..
>>>>>> >
>>>>>> > Exception in thread "StreamThread-1" org.apache.kafka.streams.
>>>>>> errors.ProcessorStateException:
>>>>>> > task [0_0] Failed to flush state store log-counts
>>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>>> > ProcessorStateManager.flush(ProcessorStateManager.java:337)
>>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>>> > StreamTask$1.run(StreamTask.java:72)
>>>>>> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>>>> > measureLatencyNs(StreamsMetricsImpl.java:188)
>>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>>> > StreamTask.commit(StreamTask.java:280)
>>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>>> StreamThread.commitOne(
>>>>>> > StreamThread.java:807)
>>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>>> StreamThread.commitAll(
>>>>>> > StreamThread.java:794)
>>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>>> StreamThread.maybeCommit(
>>>>>> > StreamThread.java:769)
>>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>>> StreamThread.runLoop(
>>>>>> > StreamThread.java:647)
>>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>>> > StreamThread.run(StreamThread.java:361)
>>>>>> > *Caused by: java.lang.IllegalStateException: This should not
>>>>>> happen as
>>>>>> > timestamp() should only be called while a record is processed*
>>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>>> AbstractProcessorContext.
>>>>>> > timestamp(AbstractProcessorContext.java:150)
>>>>>> > at com.lightbend.fdp.sample.kstream.processor.
>>>>>> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
>>>>>> > at com.lightbend.fdp.sample.kstream.processor.BFStore.
>>>>>> > flush(BFStore.scala:86)
>>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>>> > ProcessorStateManager.flush(ProcessorStateManager.java:335)
>>>>>> > ... 8 more
>>>>>> >
>>>>>> > Not sure I understand the whole trace but looks like this may
be
>>>>>> related
>>>>>> > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes
from
>>>>>> the
>>>>>> > class BFStoreChangeLogger in the line I marked above with //**//.
>>>>>> >
>>>>>> > Any help / workaround will be appreciated ..
>>>>>> >
>>>>>> > regards.
>>>>>> > --
>>>>>> > Debasish Ghosh
>>>>>> > http://manning.com/ghosh2
>>>>>> > http://manning.com/ghosh
>>>>>> >
>>>>>> > Twttr: @debasishg
>>>>>> > Blog: http://debasishg.blogspot.com
>>>>>> > Code: http://github.com/debasishg
>>>>>> >
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message