kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gaspar Muñoz <gmu...@stratio.com>
Subject Kafka Streams: KStream - KTable Left Join
Date Wed, 04 May 2016 14:43:54 GMT
Hi there,

I am not able to perform a Left Join between a KStream and KTable in Kafka
Streams.

Exception in thread "main"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: KSTREAM-FILTER-0000000003 and KTABLE-SOURCE-0000000005 are not
joinable
at
org.apache.kafka.streams.kstream.internals.AbstractStream.ensureJoinableWith(AbstractStream.java:44)
at
org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:383)


My code looks

KStream<String, String> dataStream = builder
        .stream(stringDeserializer, stringDeserializer,
conf.getString(INPUT_TOPIC))
        .map(App::keyByAap)

KTable<String, String> aapTable = builder.table("lookup_topic");

KStream<String, String> result = dataStream.leftJoin(aapTable, new
ValueJoinerAap());


My ValueJoiner simply concat two Strings.  I think the problem is in map
where I change the key in order to join by this field. In the internals of
KafkaStreams we can see  that map function set to null source nodes

@Override
public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1,
V1>> mapper) {
    String name = topology.newName(MAP_NAME);

    topology.addProcessor(name, new KStreamMap<>(mapper), this.name);

    return new KStreamImpl<>(topology, name, null);
}


And after that, in the left join precondition, in function Set<String>
allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); the code
checks if any source nodes is null and, obviously, the KStream which I
applied the map has the source to null so throw the exception.

if (thisSourceNodes == null || otherSourceNodes == null)
    throw new TopologyBuilderException(this.name + " and " +
other.name + " are not joinable");


The question is, how can I change the key without lose parent data in order
to perform a join with KTable after that?

Thanks.

-- 
Gaspar Muñoz
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
<https://twitter.com/StratioBD>*

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message