kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bill Bejeck <b...@confluent.io>
Subject Re: Kafka Streams - Custom processor "init" method called before state store has data restored into it
Date Mon, 13 Nov 2017 21:02:28 GMT
Rainer,

Thanks for the info.

What version were you using before upgrading to 1.0.0?

Thanks,
Bill

On Sun, Nov 12, 2017 at 7:06 PM, Rainer Guessner <raguessner@gmx.com> wrote:

> Hi Bill,
>
> thanks for the suggestion towards StateRestoreListener, however that does
> not solve my issue as its a global listener and doesn't help the processor
> itself.
>
> Please find the simple one-class code for reproducing the issue below.
> Please create two topics:
> kafka-topics -zookeeper localhost:2181 kafka-topics.cmd -create -topic
> sampleapp-input -partitions 1 -replication-factor 1 --config
> cleanup.policy=delete
> kafka-topics -zookeeper localhost:2181 kafka-topics.cmd -create -topic
> sampleapp-mystatestore-changelog -partitions 1 -replication-factor 1
> --config cleanup.policy=compact
>
> Please run "testStepOne", delete the KStreams state, next run
> "testStepTwo".
>
> I have included log output after the code.
>
> Thank you.
> Rainer
>
>
> <code>
> public class KStreamsTester extends TestCase {
>     private final static Logger log = LoggerFactory.getLogger(
> KStreamsTester.class);
>
>     private final static String STATESTORE_NAME = "mystatestore";
>
>     public void testStepOne() throws Exception {
>         log.info("Starting step one");
>         KafkaStreams streams = getKStreams();
>         Thread.sleep(45000);
>         streams.close();
>     }
>
>     /**
>      * Delete state store between running step one and step two.
>      */
>
>     public void testStepTwo() throws Exception {
>         log.info("Starting step two");
>         KafkaStreams streams = getKStreams();
>         Thread.sleep(30000);
>     }
>
>     public KafkaStreams getKStreams() {
>         Topology builder = new Topology();
>         Properties props = new Properties();
>         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092
> ");
>         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sampleapp");
>         StreamsConfig config = new StreamsConfig(props);
>
>         MyProcessorSupplier supplier = new MyProcessorSupplier();
>         builder.addSource("source", new StringDeserializer(), new
> StringDeserializer(), "sampleapp-input");
>         builder.addProcessor("processor", supplier, "source");
>         builder.addStateStore(new MyStateStoreBuilder(), "processor");
>
>         KafkaStreams streaming = new KafkaStreams(builder, config);
>         streaming.start();
>         return streaming;
>     }
>
>     public static class MyStateStoreBuilder implements StoreBuilder {
>         public StoreBuilder withCachingEnabled() {
>             return this;
>         }
>
>         public StoreBuilder withLoggingEnabled(Map map) {
>             return this;
>         }
>
>         public StoreBuilder withLoggingDisabled() {
>             return this;
>         }
>
>         public StateStore build() {
>             return new MyStateStore();
>         }
>
>         public Map<String, String> logConfig() {
>             return Collections.emptyMap();
>         }
>
>         public boolean loggingEnabled() {
>             return true;
>         }
>
>         public String name() {
>             return STATESTORE_NAME;
>         }
>     }
>
>     public static class MyStateStore implements StateStore {
>         private final static Logger log = LoggerFactory.getLogger(
> MyStateStore.class);
>
>         private boolean open;
>         private ProcessorContext processorContext;
>
>         public String name() {
>             return STATESTORE_NAME;
>         }
>
>         public void init(ProcessorContext processorContext, StateStore
> stateStore) {
>             log.info(".init");
>             this.processorContext = processorContext;
>             open = true;
>             processorContext.register(stateStore, true, new
> AbstractNotifyingRestoreCallback() {
>                 public void onRestoreStart(TopicPartition topicPartition,
> String storeName, long startingOffset, long endingOffset) {
>                     log.info(".onRestoreStart");
>                 }
>
>                 public void onBatchRestored(TopicPartition topicPartition,
> String storeName, long batchEndOffset, long numRestored) {
>                     log.info(".onBatchRestored");
>                 }
>
>                 public void onRestoreEnd(TopicPartition topicPartition,
> String storeName, long totalRestored) {
>                     log.info(".onRestoreEnd");
>                 }
>
>                 public void restore(byte[] bytes, byte[] bytes1) {
>                     log.info(".restore");
>                 }
>             });
>         }
>
>         public void flush() {
>             log.info(".flush");
>             RecordCollector collector = ((RecordCollector.Supplier)
> processorContext).recordCollector();
>             int partition = processorContext.taskId().partition;
>             String topic = ProcessorStateManager.storeChangelogTopic(
> processorContext.applicationId(), STATESTORE_NAME);
>             collector.send(topic, "ABC".getBytes(), "DEF".getBytes(),
> partition, System.currentTimeMillis(), new ByteArraySerializer(), new
> ByteArraySerializer());
>         }
>
>         public void close() {
>             log.info(".close");
>             open = false;
>         }
>
>         public boolean persistent() {
>             return true;
>         }
>
>         public boolean isOpen() {
>             return open;
>         }
>     }
>
>     public static class MyProcessorSupplier implements ProcessorSupplier {
>         public Processor get() {
>             return new MyProcessor();
>         }
>     }
>
>     public static class MyProcessor extends AbstractProcessor<Object,
> Object> {
>         private final static Logger log = LoggerFactory.getLogger(
> MyProcessor.class);
>
>         public void init(ProcessorContext context) {
>             super.init(context);
>             log.info(".init");
>         }
>
>         public void process(Object o, Object o2) {
>             log.info(".process");
>         }
>     }
> }
> </code>
>
>
> 19:04:36,668 INFO  [KStreamsTester$MyStateStore] .init
> 19:04:36,668 INFO  [KStreamsTester$MyProcessor] .init
> 19:04:36,690 INFO  [KStreamsTester$MyStateStore] .onRestoreStart
> 19:04:36,706 INFO  [KStreamsTester$MyStateStore] .restore
> 19:04:36,706 INFO  [KStreamsTester$MyStateStore] .restore
> 19:04:36,706 INFO  [KStreamsTester$MyStateStore] .restore
> 19:04:36,706 INFO  [KStreamsTester$MyStateStore] .onBatchRestored
> 19:04:36,706 INFO  [KStreamsTester$MyStateStore] .onRestoreEnd
> 19:04:36,706 INFO  [StreamThread] stream-thread
> [sampleapp-07ac897b-7bfe-40c6-84c7-fb1f7fbd1b24-StreamThread-1] State
> transition from PARTITIONS_ASSIGNED to RUNNING
> 19:04:36,706 INFO  [KafkaStreams] stream-client
> [sampleapp-07ac897b-7bfe-40c6-84c7-fb1f7fbd1b24]State transition from
> REBALANCING to RUNNING
> 19:04:36,706 INFO  [KStreamsTester$MyStateStore] .flush
>
>
> =============
>
> Sent: Sunday, November 12, 2017 at 1:01 PM
> From: "Bill Bejeck" <bill@confluent.io>
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams - Custom processor "init" method called before
> state store has data restored into it
> Rainer,
>
> Thanks for sharing the logs.
>
> With respect to option "c" in you orginal email, given that you are using
> a custom processor and state store would setting a StateRestoreListener in
> the custom store suit your needs?
>
> Would you be comfortable sharing your code so I can see if there is an
> acceptable alternative I can workout for you?
>
> Thanks,
> Bill
>
>
>
>
> On Thu, Nov 9, 2017 at 2:26 PM, Rainer Guessner <raguessner@gmx.com>
> wrote:
>
> > I have a few logs below.
> >
> > Thank you.
> > Rainer
> >
> > 14:21:04,304 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating restore
> > consumer client
> > 14:21:04,393 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating shared
> > producer client
> > 14:21:04,429 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating consumer
> > client
> > 14:21:04,620 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Starting
> > 14:21:04,622 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> > from CREATED to RUNNING
> > 14:21:04,639 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> > from RUNNING to PARTITIONS_REVOKED
> > 14:21:04,639 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] partition
> revocation
> > took 0 ms.
> > suspended active tasks: []
> > suspended standby tasks: []
> > 14:21:04,748 INFO [RestApplication] Adding listener: http://0.0.0.0:8082
> > 14:21:04,768 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> > from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > 14:21:04,783 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] partition
> assignment
> > took 15 ms.
> > current active tasks: [0_0]
> > current standby tasks: []
> > previous active tasks: []
> >
> > STATESTORE INIT
> > 14:21:04,873 INFO ... our stuff here
> > PROCESSOR INIT
> > 14:21:05,637 INFO ... our stuff here
> > RESTORE CALLED
> > ...
> > RESTORE CALLED
> > 14:21:05,680 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> > from PARTITIONS_ASSIGNED to RUNNING
> >
> > ==================
> > Sent: Thursday, November 09, 2017 at 1:43 PM
> > From: "Bill Bejeck" <bill@confluent.io>
> > To: users@kafka.apache.org
> > Subject: Re: Kafka Streams - Custom processor "init" method called before
> > state store has data restored into it
> > Hi Rainer,
> >
> > Thanks for reporting this issue. Do you have any log data you can share?
> >
> > In the meantime, I'll look into the issue.
> >
> > Thanks,
> > Bill
> >
> > On Thu, Nov 9, 2017 at 1:23 PM, Rainer Guessner <raguessner@gmx.com>
> > wrote:
> >
> > > I have a custom processor that implements AbstractProcessor and a
> custom
> > > store that implements StateStore.
> > >
> > > Before Kafka 1.0.0 the processors "init" method gets called after the
> > > state store is restored from changelog and that is good.
> > > With Kafka 1.0.0 the processors "init" method is called BEFORE the
> state
> > > store is restored from changelog and that is bad.
> > >
> > > My processor can only initialize when it has access to the state.
> However
> > > at the time KStreams calls "init" on the processor the state store may
> > not
> > > have any data. It is not an option for me to initialize the processor
> > > lazily when a record arrives, or to re-initialize it when
> "onRestoreEnd"
> > is
> > > called (its only called on restore; The state store "init" gets called
> > > before processor "init" regardless of restore or not.)
> > >
> > > I think I need to have either of these:
> > > a) know whether or not a state restore will take place and when not
> > > b) or get a call to the state store regardless of whether state restore
> > > took place or not
> > > c) or I need a "ready" method on the processor that gets called when
> the
> > > state store has completed restoring and is actually usable
> > >
> > > Please help, thank you in advance.
> > > Rainer
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message