kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marco Abitabile <marco.abitab...@gmail.com>
Subject [Kafka Streams] - problem joining 2 different streams
Date Mon, 27 Mar 2017 11:01:48 GMT
Hi all,

I'm struggling with an apparently simple problem.
I'm joining 2 different streams:

Stream1. User activity data,  with key, value --> <String, JsonObject>
Stream2. User location data (such as the city name) with key, value -->
<String, String>

Keys are homogeneous in content and represents the id of the user's device.

The error thrown is:
Exception in thread "StreamThread-2" java.lang.ClassCastException:
com.mytest.JsonObject cannot be cast to  java.lang.String
at com.mytest.serdes.JsonObjectSerde$1.serialize(JsonObjectSerde.java:49)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
at
org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

This is the code I'm running:

//Other Stream: User Location, is a string with the name of the city the
user is (like "San Francisco")
KStreamBuilder locationStreamBuilder = new KStreamBuilder();
KStream<String, String> userLocationStream =
locationStreamBuilder.stream(stringSerde, stringSerde,
"userLocationStreamData");
KStream<String, String> locationKstream = userLocationStream.
map(MyStreamUtils::enhanceWithAreaDetails);
locationKstream.to("user_location");
KafkaStreams userLocationKafkaStream = new
KafkaStreams(locationStreamBuilder, propsLocation);
userLocationKafkaStream.start();

//This Stream: User Activity
KStreamBuilder activityStreamBuilder = new KStreamBuilder();
KStream<String, JsonObject> activity =
activityStreamBuilder.stream(stringSerde,
jsonSerde, "activityStreamData");
activity.filter(MyStreamUtils::filterOutFakeUsers)
.map(MyStreamUtils::enhanceWithScoreDetails)
.join(
    locationKstream,
    MyStreamUtils::locationActivityJoiner,
    JoinWindows.of(1000).until(1000 * 60 * 5),
    stringSerde, jsonSerde, stringSerde)
.to("usersWithLocation")

KafkaStreams userActivityStream = new KafkaStreams(builder, propsActivity);
userActivityStream.start();

And MyStreamUtils::locationActivityJoiner does:

public static JsonObject locationActivityJoiner(JsonObject activity, String
loc) {
    JsonObject join = activity.copy();
    join.put("city" , loc);
    return join;
}


Basically it seems that  locationActivityJoiner  receives either as right
and left, elements that belongs only from activity  KStream, while I was
expecting to receive an activity (a JsonObject object) and a userLocation (a
String object) element.

how is this possible? I can't get where I'm doing wrong.
Do you have any clue on why this is happenings?

thanks a lot for your support and work.

Best
Marco

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message