kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brian Krahmer <bkrah...@krahmer.com>
Subject Problem with multiple joins in one topology
Date Wed, 07 Dec 2016 17:25:46 GMT
Hey guys,

   I'm having a hell of a time here.  I've worked for days trying to get 
this joining pipeline working.  I thought I had it working last week, 
but my jubilation was premature.  The point was to take data in from 
five different topics and merge them together to obtain one enriched 
event (output to compacted topic).  Can anybody spot what I'm doing 
wrong?  The ordering makes no difference.  For example, I've switched 
the locationInput and the vehicleReservedInput inputs in the leftJoin 
calls below, and I get the same results.  The location part of the 
enrichment works while the vehicleReserved part does not.  I can't even 
think of how to restructure the topology without resorting to building 
my own lower-level topology.

thanks,
brian


KTable<String, VehicleFinderData> fleetInput = 
builder.table(Serdes.String(),
                 vehicleFinderDataSerde, FLEET_TOPIC, 
VEHICLE_ENRICHER_FLEET_STORE);
...
fleetInput.print("fleetInput");
locationInput.print("locationInput");
vehicleReservedInput.print("vehicleReservedInput");
vehicleReleasedInput.print("vehicleReleasedInput");
vehicleUsageEndedInput.print("vehicleUsageEndedInput");

KTable<String, VehicleFinderData> mergeStepOne = 
fleetInput.leftJoin(locationInput, VehicleFinderData::merge);
mergeStepOne.print("mergeStepOne");
KTable<String, VehicleFinderData> mergeStepTwo = 
mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge);
mergeStepTwo.print("mergeStepTwo");
KTable<String, VehicleFinderData> mergeStepThree = 
mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge);
mergeStepThree.print("mergeStepThree");
KTable<String, VehicleFinderData> mergeStepFour = 
mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge);
mergeStepFour.print("mergeStepFour");

** Generate a location event **

[locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)
Deserializing from topic VehicleEnricherFleetStore
Merge operation called
[mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)
Merge operation called
[mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)
Merge operation called
[mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)
Merge operation called
[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

** New event correctly serialized **

-------------------------------------------------------

** Generate a vehicleReserved event **

[vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped 
json value}<-null)
[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null)

** NO EVENT **


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message