kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Conrad Crampton <conrad.cramp...@gmail.com>
Subject Re: ClassCastException in KStreams job for SessionWindow aggregation
Date Wed, 02 May 2018 18:44:29 GMT
Thanks Damian for taking the time to point out my stupidity - that was it
:-)
Case of Occams Razor!!
Thanks again

On Wed, 2 May 2018 at 16:56 Conrad Crampton <conrad.crampton@gmail.com>
wrote:

> Looks like I may be victim of that copy and paste gremlin!!
> Without checking I think you might be onto something - I'll check it out
> later and repost back.
> On one hand I really hope it's this, on the other I can't believe I've
> been so stupid!
> Thanks
>
>
> On Wed, 2 May 2018 at 16:30 Damian Guy <damian.guy@gmail.com> wrote:
>
>> Hi,
>>
>> I think it **might** be  related to this:
>>   final Serializer<HttpSession> httpSessionSerializer = new
>> JsonPOJOSerializer<>();
>>         serdeProps.put("JsonPOJOClass", Http.class);
>>         httpSessionSerializer.configure(serdeProps, false);
>>
>>         final Deserializer<HttpSession> httpSessionDeserializer = new
>> JsonPOJODeserializer<>();
>>         serdeProps.put("JsonPOJOClass", Http.class);
>>         httpSessionDeserializer.configure(serdeProps, false);
>>
>> Shouldn't the class be HttpSession.class ?
>>
>> On Wed, 2 May 2018 at 16:12 Conrad Crampton <conrad.crampton@gmail.com>
>> wrote:
>>
>>> I'm trying to window over http logs and create an HttpSession i.e. a list
>>> of http requests (and some other properties). However when in my
>>> aggregate
>>> Merger part (I think) I'm getting a classcastexception I think in when my
>>> sessions are being merged and cannot for the life of me work out why.
>>> The exception is at the bottom and I think the relevant code is here.
>>> Can anyone give a suggestion as to why Http is trying to be cast to
>>> HttpSession?
>>> Thanks
>>>
>>>
>>> final Serializer<Http> httpSerializer = new JsonPOJOSerializer<>();
>>>         serdeProps.put("JsonPOJOClass", Http.class);
>>>         httpSerializer.configure(serdeProps, false);
>>>
>>>         final Deserializer<Http> httpDeserializer = new
>>> JsonPOJODeserializer<>();
>>>         serdeProps.put("JsonPOJOClass", Http.class);
>>>         httpDeserializer.configure(serdeProps, false);
>>>
>>>         final Serde<Http> httpSerde = Serdes.serdeFrom(httpSerializer,
>>> httpDeserializer);
>>>
>>>         final Serializer<HttpSession> httpSessionSerializer = new
>>> JsonPOJOSerializer<>();
>>>         serdeProps.put("JsonPOJOClass", Http.class);
>>>         httpSessionSerializer.configure(serdeProps, false);
>>>
>>>         final Deserializer<HttpSession> httpSessionDeserializer = new
>>> JsonPOJODeserializer<>();
>>>         serdeProps.put("JsonPOJOClass", Http.class);
>>>         httpSessionDeserializer.configure(serdeProps, false);
>>>
>>>         final Serde<HttpSession> httpSessionSerde =
>>> Serdes.serdeFrom(httpSessionSerializer, httpSessionDeserializer);
>>>
>>>         StreamsBuilder builder = new StreamsBuilder();
>>>
>>>         KStream<String, HttpSession> httpStream = null;
>>>         try {
>>>             httpStream = builder.stream(
>>>                     config.getString(ConfigConstants.HTTP_TOPIC_KEY),
>>>                     Consumed.with(Serdes.String(), httpSerde))
>>>                     .selectKey((s, http) -> http.getClient() +
>>> http.getSourceIp() + http.getUseragent())
>>>                     .groupByKey(Serialized.with(Serdes.String(),
>>> httpSerde))
>>>                     // window by session
>>>
>>> .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(10)))
>>>                     .aggregate(
>>>                             new Initializer<HttpSession>() {
>>>                                 @Override
>>>                                 public HttpSession apply() {
>>>                                     return new HttpSession();
>>>                                 }
>>>                             },
>>>                             new Aggregator<String, Http, HttpSession>()
{
>>>                                 @Override
>>>                                 public HttpSession apply(String s, Http
>>> http, HttpSession session) {
>>>                                     return session.addRequest(http);
>>>                                 }
>>>                             },
>>>                             new Merger<String, HttpSession>() {
>>>                                  @Override
>>>                                  public HttpSession apply(String s,
>>> HttpSession session, HttpSession v1)
>>>                                      log.debug("merging key {}, session
>>> {}
>>> with other {}", s, session, v1);
>>>                                      return session.merge(v1);}
>>> },
>>>                             Materialized.<String, HttpSession,
>>> SessionStore<Bytes,
>>> byte[]>>as(config.getString(StreamsConfig.APPLICATION_ID_CONFIG) +
>>>
>>> "-session-store").withKeySerde(Serdes.String()).withValueSerde(httpSessionSerde)
>>>                     ).toStream((stringWindowed, session) ->
>>> (stringWindowed.key()));
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>
>>>         httpStream
>>>                 .filter((key, message) -> message != null)
>>>                 .filter((key, message) -> message.getClient() != null)
>>>                 .filter((key, message) ->
>>> httpClients.stream().anyMatch(message.getClient()::equals))
>>>                 .foreach((key, message) -> {
>>>                     log.info("message {}", message);
>>>                 });
>>>
>>>         final KafkaStreams streams = new KafkaStreams(builder.build(),
>>> props);
>>>         streams.start();
>>>
>>> java.lang.ClassCastException: com.secdata.gi.graph.model.Http cannot be
>>> cast to com.secdata.gi.graph.model.HttpSession
>>> at com.secdata.gi.graph.Process$$Lambda$45/1474607212.apply(Unknown
>>> Source)
>>> at
>>>
>>> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:157)
>>> at
>>>
>>> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:154)
>>> at
>>>
>>> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>>> at
>>>
>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>>> at
>>>
>>> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
>>> at
>>>
>>> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:176)
>>> at
>>>
>>> org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
>>> at
>>>
>>> org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
>>> at
>>>
>>> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
>>> at
>>>
>>> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
>>> at
>>>
>>> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
>>> at
>>>
>>> org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
>>> at
>>>
>>> org.apache.kafka.streams.state.internals.CachingSessionStore.close(CachingSessionStore.java:201)
>>> at
>>>
>>> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:275)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:238)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:450)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:532)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:500)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:493)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:212)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1213)
>>> at
>>>
>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:755)
>>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message