kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gaspar Muñoz <gmu...@stratio.com>
Subject Kafka Streams: KStream - KTable Left Join
Date Wed, 04 May 2016 14:43:54 GMT
Hi there,

I am not able to perform a Left Join between a KStream and KTable in Kafka

Exception in thread "main"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: KSTREAM-FILTER-0000000003 and KTABLE-SOURCE-0000000005 are not

My code looks

KStream<String, String> dataStream = builder
        .stream(stringDeserializer, stringDeserializer,

KTable<String, String> aapTable = builder.table("lookup_topic");

KStream<String, String> result = dataStream.leftJoin(aapTable, new

My ValueJoiner simply concat two Strings.  I think the problem is in map
where I change the key in order to join by this field. In the internals of
KafkaStreams we can see  that map function set to null source nodes

public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1,
V1>> mapper) {
    String name = topology.newName(MAP_NAME);

    topology.addProcessor(name, new KStreamMap<>(mapper), this.name);

    return new KStreamImpl<>(topology, name, null);

And after that, in the left join precondition, in function Set<String>
allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); the code
checks if any source nodes is null and, obviously, the KStream which I
applied the map has the source to null so throw the exception.

if (thisSourceNodes == null || otherSourceNodes == null)
    throw new TopologyBuilderException(this.name + " and " +
other.name + " are not joinable");

The question is, how can I change the key without lose parent data in order
to perform a join with KTable after that?


Gaspar Muñoz
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message