kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: KafkaStreams metadata - enum keys?
Date Thu, 08 Dec 2016 18:09:34 GMT
Yes it could be an issue when you initially startup. If it is the first
time you run the app and there are internal topics created by Kafka
Streams, there will be rebalances. However it depends on your topology.

How are you trying to access the state store?

Thanks,
Damian

On Thu, 8 Dec 2016 at 17:49 Jon Yeargers <jon.yeargers@cedexis.com> wrote:

> Im only running one consumer-instance so would rebalancing / wrong host be
> an issue?
>
>
>
> On Thu, Dec 8, 2016 at 7:31 AM, Damian Guy <damian.guy@gmail.com> wrote:
>
> > Hi Jon,
> >
> > How are you trying to access the store?
> >
> > That exception is thrown in a few circumstances:
> > 1. KakfaStreams hasn't initialized or is re-initializing due to a
> > rebalance. This can occur for a number of reasons, i.e., new
> > topics/partitions being added to the broker (including streams internal
> > topics), broker going down, StreamThreads starting or stopping etc
> > 2. The StateStore has just been closed, which would usually mean that 1.
> is
> > about to happen
> > 3. The StateStore with that name and type doesn't exist on the local
> > KakfaStreams instance.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 8 Dec 2016 at 11:57 Jon Yeargers <jon.yeargers@cedexis.com>
> wrote:
> >
> > > Tried calling that - got this exception (FWIW - there isn't any other
> > > instance)
> > >
> > > State store value comes from
> > >
> > > groupByKey().aggregate(LogLine::new,
> > >     new aggregate(),
> > >     TimeWindows.of(60 * 60 * 1000L),
> > >     collectorSerde, "minute_agg_stream");
> > >
> > > 2016-12-08 11:33:50,924 [qtp1318180415-18] DEBUG
> > > o.eclipse.jetty.server.HttpChannel - Could not send response error 500,
> > > already committed
> > >
> > > javax.servlet.ServletException:
> > > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> > > store, minute_agg_stream, may have migrated to another instance.
> > >
> > > at
> > >
> > > org.glassfish.jersey.servlet.WebComponent.serviceImpl(
> > WebComponent.java:489)
> > >
> > > at org.glassfish.jersey.servlet.WebComponent.service(
> > WebComponent.java:427)
> > >
> > > at
> > >
> > > org.glassfish.jersey.servlet.ServletContainer.service(
> > ServletContainer.java:388)
> > >
> > > at
> > >
> > > org.glassfish.jersey.servlet.ServletContainer.service(
> > ServletContainer.java:341)
> > >
> > > at
> > >
> > > org.glassfish.jersey.servlet.ServletContainer.service(
> > ServletContainer.java:228)
> > >
> > > at org.eclipse.jetty.servlet.ServletHolder.handle(
> > ServletHolder.java:845)
> > >
> > > at
> > > org.eclipse.jetty.servlet.ServletHandler.doHandle(
> > ServletHandler.java:584)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.session.SessionHandler.
> > doHandle(SessionHandler.java:224)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.handler.ContextHandler.
> > doHandle(ContextHandler.java:1180)
> > >
> > > at
> > > org.eclipse.jetty.servlet.ServletHandler.doScope(
> > ServletHandler.java:512)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.session.SessionHandler.
> > doScope(SessionHandler.java:185)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.handler.ContextHandler.
> > doScope(ContextHandler.java:1112)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.handler.ScopedHandler.handle(
> > ScopedHandler.java:141)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.handler.HandlerWrapper.handle(
> > HandlerWrapper.java:134)
> > >
> > > at org.eclipse.jetty.server.Server.handle(Server.java:534)
> > >
> > > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
> > >
> > > at
> > > org.eclipse.jetty.server.HttpConnection.onFillable(
> > HttpConnection.java:251)
> > >
> > > at
> > > org.eclipse.jetty.io
> > > .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
> > >
> > > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
> > >
> > > at
> > > org.eclipse.jetty.io
> > > .SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
> > >
> > > at
> > >
> > > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.
> > executeProduceConsume(ExecuteProduceConsume.java:303)
> > >
> > > at
> > >
> > > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.
> > produceConsume(ExecuteProduceConsume.java:148)
> > >
> > > at
> > >
> > > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(
> > ExecuteProduceConsume.java:136)
> > >
> > > at
> > >
> > > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(
> > QueuedThreadPool.java:671)
> > >
> > > at
> > >
> > > org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(
> > QueuedThreadPool.java:589)
> > >
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
> > the
> > > state store, minute_agg_stream, may have migrated to another instance.
> > >
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.QueryableStoreProvider.
> > getStore(QueryableStoreProvider.java:49)
> > >
> > > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
> > >
> > > at
> > >
> > > com.cedexis.prtminuteagg.RestService.rangeForKeyValueStore(
> > RestService.java:190)
> > >
> > > at
> > > com.cedexis.prtminuteagg.RestService.keyRangeForStore(
> > RestService.java:99)
> > >
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >
> > > at
> > >
> > > sun.reflect.NativeMethodAccessorImpl.invoke(
> > NativeMethodAccessorImpl.java:62)
> > >
> > > at
> > >
> > > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > DelegatingMethodAccessorImpl.java:43)
> > >
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > >
> > > at
> > >
> > > org.glassfish.jersey.server.model.internal.
> > ResourceMethodInvocationHandlerFactory$1.invoke(
> > ResourceMethodInvocationHandlerFactory.java:81)
> > >
> > > at
> > >
> > > org.glassfish.jersey.server.model.internal.
> > AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDisp
> > atcher.java:144)
> > >
> > > at
> > >
> > > org.glassfish.jersey.server.model.internal.
> >
> AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDisp
> > atcher.java:161)
> > >
> > > at
> > >
> > > org.glassfish.jersey.server.model.internal.
> > JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(
> > JavaResourceMethodDispatcherProvider.java:205)
> > >
> > > at
> > >
> > > org.glassfish.jersey.server.model.internal.
> > AbstractJavaResourceMethodDispatcher.dispatch(
> > AbstractJavaResourceMethodDispatcher.java:99)
> > >
> > > at
> > >
> > > org.glassfish.jersey.server.model.ResourceMethodInvoker.
> > invoke(ResourceMethodInvoker.java:389)
> > >
> > > at
> > >
> > > org.glassfish.jersey.server.model.ResourceMethodInvoker.
> > apply(ResourceMethodInvoker.java:347)
> > >
> > > at
> > >
> > > org.glassfish.jersey.server.model.ResourceMethodInvoker.
> > apply(ResourceMethodInvoker.java:102)
> > >
> > > at org.glassfish.jersey.server.ServerRuntime$2.run(
> > ServerRuntime.java:326)
> > >
> > > at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
> > >
> > > at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
> > >
> > > at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
> > >
> > > at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
> > >
> > > at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
> > >
> > > at
> > >
> > > org.glassfish.jersey.process.internal.RequestScope.
> > runInScope(RequestScope.java:317)
> > >
> > > at
> > > org.glassfish.jersey.server.ServerRuntime.process(
> > ServerRuntime.java:305)
> > >
> > > at
> > >
> > > org.glassfish.jersey.server.ApplicationHandler.handle(
> > ApplicationHandler.java:1154)
> > >
> > > at
> > >
> > > org.glassfish.jersey.servlet.WebComponent.serviceImpl(
> > WebComponent.java:473)
> > >
> > > ... 25 common frames omitted
> > >
> > > On Thu, Dec 8, 2016 at 3:19 AM, Jon Yeargers <jon.yeargers@cedexis.com
> >
> > > wrote:
> > >
> > > > Maybe the 'rangeForKeyValueStore' function from the sample?
> > > >
> > > > On Thu, Dec 8, 2016 at 2:55 AM, Jon Yeargers <
> jon.yeargers@cedexis.com
> > >
> > > > wrote:
> > > >
> > > >> I see functions that require knowing a key name but in the interests
> > of
> > > >> partitioning we're using fairly complex key structures (IE
> non-obvious
> > > to
> > > >> an external function).
> > > >>
> > > >> Is there a method / process for enumerating keys?
> > > >>
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message