kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: IllegalStateException with custom state store ..
Date Mon, 03 Jul 2017 09:53:24 GMT
Remove the` logChange` from `flush` and do it when you write to the store.

i.e, in the BFStore + function

On Mon, 3 Jul 2017 at 10:43, Debasish Ghosh <ghosh.debasish@gmail.com>
wrote:

> Ok, so I make the following change .. Is this the change that u suggested ?
>
> // remove commit from process(). So process now looks as follows:
> override def process(dummy: String, record: String): Unit =
> LogParseUtil.parseLine(record) match {
>   case Success(r) => {
>     bfStore + r.host
>     bfStore.flush()
>   }
>   case Failure(ex) => throw ex
> }
>
> Still I get the same exception. Just as a test, I removed the flush as
> well from process() ..
>
> override def process(dummy: String, record: String): Unit =
> LogParseUtil.parseLine(record) match {
>   case Success(r) => {
>     bfStore + r.host
>   }
>   case Failure(ex) => throw ex
> }
>
> and still get the same exception as it does call flush after commit from
> within .. here's the trace ..
>
> Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to flush state store log-counts
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> Caused by: java.lang.IllegalStateException: This should not happen as
> timestamp() should only be called while a record is processed
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150)
> at
> com.lightbend.fdp.sample.kstream.processor.BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:25)
> at
> com.lightbend.fdp.sample.kstream.processor.BFStore.flush(BFStore.scala:89)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
>
> regards.
>
>
>
> On Mon, Jul 3, 2017 at 2:55 PM, Damian Guy <damian.guy@gmail.com> wrote:
>
>> `commit` is called by streams, you can see it in your stack trace above:
>>
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>>
>> `commit` will subsequently call `flush` on any stores. At this point,
>> though, there will be no `RecordContext` as there are no records being
>> processed. Note, that calling `context.commit()` from your Processor isn't
>> actually performing the commit, it is just signalling that a commit is
>> necessary after this record has been processed. You may not want to do that
>> as it probably will impact throughput.
>>
>> You should log the change when you write to the store, i.e, i think when
>> you do:
>> bfStore + r.host
>>
>>
>> Does that help?
>>
>> Thanks,
>> Damian
>>
>>
>> On Mon, 3 Jul 2017 at 10:12 Debasish Ghosh <ghosh.debasish@gmail.com>
>> wrote:
>>
>>> The only place where I am doing commit is from Processor.process() ..
>>> Here it is ..
>>>
>>> class WeblogProcessor extends AbstractProcessor[String, String] {
>>>   private var bfStore: BFStore[String] = _
>>>
>>>   override def init(context: ProcessorContext): Unit = {
>>>     super.init(context)
>>>     this.context.schedule(1000)
>>>     bfStore =
>>> this.context.getStateStore(WeblogDriver.LOG_COUNT_STATE_STORE).asInstanceOf[BFStore[String]]
>>>   }
>>>
>>>   override def process(dummy: String, record: String): Unit =
>>> LogParseUtil.parseLine(record) match {
>>>     case Success(r) => {
>>>       bfStore + r.host
>>>       context.commit()
>>>       context.forward(dummy, r.host)
>>>     }
>>>     case Failure(ex) => throw ex
>>>   }
>>>
>>>   override def punctuate(timestamp: Long): Unit =
>>> super.punctuate(timestamp)
>>>   override def close(): Unit = {}
>>> }
>>>
>>> The commit invokes the flush() of my Store. Here is the flush() method
>>> of my store ..
>>>
>>> override def flush(): Unit = {
>>>   if (loggingEnabled) {
>>>     changeLogger.logChange(changelogKey, bf
>>>   }
>>> }
>>>
>>> which in turn calls logChange that gives the error.
>>>
>>> Am I missing something ?
>>>
>>> regards.
>>>
>>> On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy <damian.guy@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> It is because you are calling `context.timestamp` during `commit`. At
>>>> this point there is no `RecordContext` associated with the
>>>> `ProcessorContext`, hence the null pointer. The `RecordContext` is only set
>>>> when streams is processing a record. You probably want to log the change
>>>> when you write to the store.
>>>>
>>>> Thanks,
>>>> Damian
>>>>
>>>> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debasish@gmail.com>
>>>> wrote:
>>>>
>>>>> Just to give some more information, the ProcessorContext that gets
>>>>> passed
>>>>> to the init method of the custom store has a null RecordContext. Gave
>>>>> the
>>>>> following debug statement ..
>>>>>
>>>>> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
>>>>>
>>>>> and got null.
>>>>>
>>>>> regards.
>>>>>
>>>>> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <
>>>>> ghosh.debasish@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi -
>>>>> >
>>>>> > I have implemented a custom state store named BFStore with a change
>>>>> > logger as follows:
>>>>> >
>>>>> > class BFStoreChangeLogger[K, V](val storeName: String,
>>>>> >                                 val context: ProcessorContext,
>>>>> >                                 val partition: Int,
>>>>> >                                 val serialization: StateSerdes[K,
>>>>> V]) {
>>>>> >
>>>>> >   private val topic =
>>>>> ProcessorStateManager.storeChangelogTopic(context.applicationId,
>>>>> > storeName)
>>>>> >   private val collector =
>>>>> context.asInstanceOf[RecordCollector.Supplier].
>>>>> > recordCollector
>>>>> >
>>>>> >   def this(storeName: String, context: ProcessorContext,
>>>>> serialization:
>>>>> > StateSerdes[K, V]) {
>>>>> >     this(storeName, context, context.taskId.partition, serialization)
>>>>> >   }
>>>>> >
>>>>> >   def logChange(key: K, value: V): Unit = {
>>>>> >     if (collector != null) {
>>>>> >       val keySerializer = serialization.keySerializer
>>>>> >       val valueSerializer = serialization.valueSerializer
>>>>> >       collector.send(this.topic, key, value, this.partition,
>>>>> > context.timestamp, keySerializer, valueSerializer)  //**//
>>>>> >     }
>>>>> >   }
>>>>> > }
>>>>> >
>>>>> > In my driver program I build the topology and start the streams
as
>>>>> follows:
>>>>> >
>>>>> > val builder: TopologyBuilder = new TopologyBuilder()
>>>>> >
>>>>> > builder.addSource("Source", config.fromTopic)
>>>>> >        .addProcessor("Process", () => new WeblogProcessor(),
>>>>> "Source")
>>>>> >        .addStateStore(new
>>>>> BFStoreSupplier[String](LOG_COUNT_STATE_STORE,
>>>>> > stringSerde, true, changelogConfig), "Process")
>>>>> >        .addSink("Sink", "weblog-count-topic", "Process")
>>>>> >
>>>>> > val streams = new KafkaStreams(builder, streamingConfig)
>>>>> > streams.start()
>>>>> >
>>>>> > When I run the program, immediately I get the following exception
..
>>>>> >
>>>>> > Exception in thread "StreamThread-1"
>>>>> org.apache.kafka.streams.errors.ProcessorStateException:
>>>>> > task [0_0] Failed to flush state store log-counts
>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>> > ProcessorStateManager.flush(ProcessorStateManager.java:337)
>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>> > StreamTask$1.run(StreamTask.java:72)
>>>>> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>>> > measureLatencyNs(StreamsMetricsImpl.java:188)
>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>> > StreamTask.commit(StreamTask.java:280)
>>>>> > at
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
>>>>> > StreamThread.java:807)
>>>>> > at
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
>>>>> > StreamThread.java:794)
>>>>> > at
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
>>>>> > StreamThread.java:769)
>>>>> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>> > StreamThread.java:647)
>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>> > StreamThread.run(StreamThread.java:361)
>>>>> > *Caused by: java.lang.IllegalStateException: This should not happen
>>>>> as
>>>>> > timestamp() should only be called while a record is processed*
>>>>> > at
>>>>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
>>>>> > timestamp(AbstractProcessorContext.java:150)
>>>>> > at com.lightbend.fdp.sample.kstream.processor.
>>>>> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
>>>>> > at com.lightbend.fdp.sample.kstream.processor.BFStore.
>>>>> > flush(BFStore.scala:86)
>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>> > ProcessorStateManager.flush(ProcessorStateManager.java:335)
>>>>> > ... 8 more
>>>>> >
>>>>> > Not sure I understand the whole trace but looks like this may be
>>>>> related
>>>>> > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from
>>>>> the
>>>>> > class BFStoreChangeLogger in the line I marked above with //**//.
>>>>> >
>>>>> > Any help / workaround will be appreciated ..
>>>>> >
>>>>> > regards.
>>>>> > --
>>>>> > Debasish Ghosh
>>>>> > http://manning.com/ghosh2
>>>>> > http://manning.com/ghosh
>>>>> >
>>>>> > Twttr: @debasishg
>>>>> > Blog: http://debasishg.blogspot.com
>>>>> > Code: http://github.com/debasishg
>>>>> >
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Debasish Ghosh
>>>>> http://manning.com/ghosh2
>>>>> http://manning.com/ghosh
>>>>>
>>>>> Twttr: @debasishg
>>>>> Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>>
>>>>
>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message