kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debasish Ghosh <ghosh.debas...@gmail.com>
Subject Re: IllegalStateException with custom state store ..
Date Mon, 03 Jul 2017 09:42:58 GMT
Ok, so I make the following change .. Is this the change that u suggested ?

// remove commit from process(). So process now looks as follows:
override def process(dummy: String, record: String): Unit =
LogParseUtil.parseLine(record) match {
  case Success(r) => {
    bfStore + r.host
    bfStore.flush()
  }
  case Failure(ex) => throw ex
}

Still I get the same exception. Just as a test, I removed the flush as well
from process() ..

override def process(dummy: String, record: String): Unit =
LogParseUtil.parseLine(record) match {
  case Success(r) => {
    bfStore + r.host
  }
  case Failure(ex) => throw ex
}

and still get the same exception as it does call flush after commit from
within .. here's the trace ..

Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
to flush state store log-counts
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: java.lang.IllegalStateException: This should not happen as
timestamp() should only be called while a record is processed
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150)
at
com.lightbend.fdp.sample.kstream.processor.BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:25)
at
com.lightbend.fdp.sample.kstream.processor.BFStore.flush(BFStore.scala:89)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)

regards.



On Mon, Jul 3, 2017 at 2:55 PM, Damian Guy <damian.guy@gmail.com> wrote:

> `commit` is called by streams, you can see it in your stack trace above:
>
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:280)
>
> `commit` will subsequently call `flush` on any stores. At this point,
> though, there will be no `RecordContext` as there are no records being
> processed. Note, that calling `context.commit()` from your Processor isn't
> actually performing the commit, it is just signalling that a commit is
> necessary after this record has been processed. You may not want to do that
> as it probably will impact throughput.
>
> You should log the change when you write to the store, i.e, i think when
> you do:
> bfStore + r.host
>
>
> Does that help?
>
> Thanks,
> Damian
>
>
> On Mon, 3 Jul 2017 at 10:12 Debasish Ghosh <ghosh.debasish@gmail.com>
> wrote:
>
>> The only place where I am doing commit is from Processor.process() ..
>> Here it is ..
>>
>> class WeblogProcessor extends AbstractProcessor[String, String] {
>>   private var bfStore: BFStore[String] = _
>>
>>   override def init(context: ProcessorContext): Unit = {
>>     super.init(context)
>>     this.context.schedule(1000)
>>     bfStore = this.context.getStateStore(WeblogDriver.LOG_COUNT_STATE_
>> STORE).asInstanceOf[BFStore[String]]
>>   }
>>
>>   override def process(dummy: String, record: String): Unit =
>> LogParseUtil.parseLine(record) match {
>>     case Success(r) => {
>>       bfStore + r.host
>>       context.commit()
>>       context.forward(dummy, r.host)
>>     }
>>     case Failure(ex) => throw ex
>>   }
>>
>>   override def punctuate(timestamp: Long): Unit =
>> super.punctuate(timestamp)
>>   override def close(): Unit = {}
>> }
>>
>> The commit invokes the flush() of my Store. Here is the flush() method
>> of my store ..
>>
>> override def flush(): Unit = {
>>   if (loggingEnabled) {
>>     changeLogger.logChange(changelogKey, bf
>>   }
>> }
>>
>> which in turn calls logChange that gives the error.
>>
>> Am I missing something ?
>>
>> regards.
>>
>> On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy <damian.guy@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> It is because you are calling `context.timestamp` during `commit`. At
>>> this point there is no `RecordContext` associated with the
>>> `ProcessorContext`, hence the null pointer. The `RecordContext` is only set
>>> when streams is processing a record. You probably want to log the change
>>> when you write to the store.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debasish@gmail.com>
>>> wrote:
>>>
>>>> Just to give some more information, the ProcessorContext that gets
>>>> passed
>>>> to the init method of the custom store has a null RecordContext. Gave
>>>> the
>>>> following debug statement ..
>>>>
>>>> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
>>>>
>>>> and got null.
>>>>
>>>> regards.
>>>>
>>>> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <
>>>> ghosh.debasish@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi -
>>>> >
>>>> > I have implemented a custom state store named BFStore with a change
>>>> > logger as follows:
>>>> >
>>>> > class BFStoreChangeLogger[K, V](val storeName: String,
>>>> >                                 val context: ProcessorContext,
>>>> >                                 val partition: Int,
>>>> >                                 val serialization: StateSerdes[K, V])
>>>> {
>>>> >
>>>> >   private val topic = ProcessorStateManager.
>>>> storeChangelogTopic(context.applicationId,
>>>> > storeName)
>>>> >   private val collector = context.asInstanceOf[
>>>> RecordCollector.Supplier].
>>>> > recordCollector
>>>> >
>>>> >   def this(storeName: String, context: ProcessorContext,
>>>> serialization:
>>>> > StateSerdes[K, V]) {
>>>> >     this(storeName, context, context.taskId.partition, serialization)
>>>> >   }
>>>> >
>>>> >   def logChange(key: K, value: V): Unit = {
>>>> >     if (collector != null) {
>>>> >       val keySerializer = serialization.keySerializer
>>>> >       val valueSerializer = serialization.valueSerializer
>>>> >       collector.send(this.topic, key, value, this.partition,
>>>> > context.timestamp, keySerializer, valueSerializer)  //**//
>>>> >     }
>>>> >   }
>>>> > }
>>>> >
>>>> > In my driver program I build the topology and start the streams as
>>>> follows:
>>>> >
>>>> > val builder: TopologyBuilder = new TopologyBuilder()
>>>> >
>>>> > builder.addSource("Source", config.fromTopic)
>>>> >        .addProcessor("Process", () => new WeblogProcessor(), "Source")
>>>> >        .addStateStore(new BFStoreSupplier[String](LOG_
>>>> COUNT_STATE_STORE,
>>>> > stringSerde, true, changelogConfig), "Process")
>>>> >        .addSink("Sink", "weblog-count-topic", "Process")
>>>> >
>>>> > val streams = new KafkaStreams(builder, streamingConfig)
>>>> > streams.start()
>>>> >
>>>> > When I run the program, immediately I get the following exception ..
>>>> >
>>>> > Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.
>>>> ProcessorStateException:
>>>> > task [0_0] Failed to flush state store log-counts
>>>> > at org.apache.kafka.streams.processor.internals.
>>>> > ProcessorStateManager.flush(ProcessorStateManager.java:337)
>>>> > at org.apache.kafka.streams.processor.internals.
>>>> > StreamTask$1.run(StreamTask.java:72)
>>>> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>> > measureLatencyNs(StreamsMetricsImpl.java:188)
>>>> > at org.apache.kafka.streams.processor.internals.
>>>> > StreamTask.commit(StreamTask.java:280)
>>>> > at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.commitOne(
>>>> > StreamThread.java:807)
>>>> > at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.commitAll(
>>>> > StreamThread.java:794)
>>>> > at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.maybeCommit(
>>>> > StreamThread.java:769)
>>>> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>> > StreamThread.java:647)
>>>> > at org.apache.kafka.streams.processor.internals.
>>>> > StreamThread.run(StreamThread.java:361)
>>>> > *Caused by: java.lang.IllegalStateException: This should not happen
>>>> as
>>>> > timestamp() should only be called while a record is processed*
>>>> > at org.apache.kafka.streams.processor.internals.
>>>> AbstractProcessorContext.
>>>> > timestamp(AbstractProcessorContext.java:150)
>>>> > at com.lightbend.fdp.sample.kstream.processor.
>>>> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
>>>> > at com.lightbend.fdp.sample.kstream.processor.BFStore.
>>>> > flush(BFStore.scala:86)
>>>> > at org.apache.kafka.streams.processor.internals.
>>>> > ProcessorStateManager.flush(ProcessorStateManager.java:335)
>>>> > ... 8 more
>>>> >
>>>> > Not sure I understand the whole trace but looks like this may be
>>>> related
>>>> > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from
>>>> the
>>>> > class BFStoreChangeLogger in the line I marked above with //**//.
>>>> >
>>>> > Any help / workaround will be appreciated ..
>>>> >
>>>> > regards.
>>>> > --
>>>> > Debasish Ghosh
>>>> > http://manning.com/ghosh2
>>>> > http://manning.com/ghosh
>>>> >
>>>> > Twttr: @debasishg
>>>> > Blog: http://debasishg.blogspot.com
>>>> > Code: http://github.com/debasishg
>>>> >
>>>>
>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message