kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: [Kafka Streams] - problem joining 2 different streams
Date Mon, 27 Mar 2017 14:37:11 GMT
Hi Marco,

It looks like you are creating 2 independent instances of KafkaStreams and
trying to join across those instances. This wouldn't work and i'm surprised
it has let you get that far without some other exception.

You should remove this bit:
>KafkaStreams userLocationKafkaStream = new
>KafkaStreams(locationStreamBuilder, propsLocation);
>userLocationKafkaStream.start();
>
>//This Stream: User Activity
>KStreamBuilder activityStreamBuilder = new KStreamBuilder();

and build the input streams you want to join from the same builder, i.e.,
the original builder that you created. You then just start one instance of
KafkaStreams.

HTH,
Damian




On Mon, 27 Mar 2017 at 14:55 Marco Abitabile <marco.abitabile@gmail.com>
wrote:

> Hi all,
>
> I'm struggling with an apparently simple problem.
> I'm joining 2 different streams:
>
> Stream1. User activity data,  with key, value --> <String, JsonObject>
> Stream2. User location data (such as the city name) with key, value -->
> <String, String>
>
> Keys are homogeneous in content and represents the id of the user's device.
>
> The error thrown is:
> Exception in thread "StreamThread-2" java.lang.ClassCastException:
> com.mytest.JsonObject cannot be cast to  java.lang.String
> at com.mytest.serdes.JsonObjectSerde$1.serialize(JsonObjectSerde.java:49)
> at
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
> at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>
> This is the code I'm running:
>
> //Other Stream: User Location, is a string with the name of the city the
> user is (like "San Francisco")
> KStreamBuilder locationStreamBuilder = new KStreamBuilder();
> KStream<String, String> userLocationStream =
> locationStreamBuilder.stream(stringSerde, stringSerde,
> "userLocationStreamData");
> KStream<String, String> locationKstream = userLocationStream.
> map(MyStreamUtils::enhanceWithAreaDetails);
> locationKstream.to("user_location");
> KafkaStreams userLocationKafkaStream = new
> KafkaStreams(locationStreamBuilder, propsLocation);
> userLocationKafkaStream.start();
>
> //This Stream: User Activity
> KStreamBuilder activityStreamBuilder = new KStreamBuilder();
> KStream<String, JsonObject> activity =
> activityStreamBuilder.stream(stringSerde,
> jsonSerde, "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
> .map(MyStreamUtils::enhanceWithScoreDetails)
> .join(
>     locationKstream,
>     MyStreamUtils::locationActivityJoiner,
>     JoinWindows.of(1000).until(1000 * 60 * 5),
>     stringSerde, jsonSerde, stringSerde)
> .to("usersWithLocation")
>
> KafkaStreams userActivityStream = new KafkaStreams(builder, propsActivity);
> userActivityStream.start();
>
> And MyStreamUtils::locationActivityJoiner does:
>
> public static JsonObject locationActivityJoiner(JsonObject activity, String
> loc) {
>     JsonObject join = activity.copy();
>     join.put("city" , loc);
>     return join;
> }
>
>
> Basically it seems that  locationActivityJoiner  receives either as right
> and left, elements that belongs only from activity  KStream, while I was
> expecting to receive an activity (a JsonObject object) and a userLocation
> (a
> String object) element.
>
> how is this possible? I can't get where I'm doing wrong.
> Do you have any clue on why this is happenings?
>
> thanks a lot for your support and work.
>
> Best
> Marco
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message