kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gaspar Muñoz <gmu...@stratio.com>
Subject Re: Kafka Streams: KStream - KTable Left Join
Date Thu, 05 May 2016 07:00:25 GMT
Thanks, I'll try it. It would be nice if you could improve the error
message.

2016-05-04 20:13 GMT+02:00 Matthias J. Sax <matthias@confluent.io>:

> +1
>
> I had the same thought and put it on my personal agenda already.
>
> -Matthias
>
> On 05/04/2016 06:37 PM, Jay Kreps wrote:
> > Is it possible to make the error message give more an explanation?
> >
> > -Jay
> >
> > On Wed, May 4, 2016 at 8:46 AM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> >> Hi,
> >>
> >> I am still new to Kafka Streams by myself, but from my understanding if
> >> you change the key, your partitioning changes, ie, is not valid anymore.
> >> Thus, the joins (which assumes co-located data) cannot be performed
> >> (this is the reason why sources get set to null). You can write to an
> >> intermediate topic via .through(...) to get a valid partitioning:
> >>
> >> KStream dataStream = builder.stream(...).map(...).through(...);
> >>
> >> Afterward, your join should work.
> >>
> >> -Matthias
> >>
> >>
> >> On 05/04/2016 04:43 PM, Gaspar Muñoz wrote:
> >>> Hi there,
> >>>
> >>> I am not able to perform a Left Join between a KStream and KTable in
> >> Kafka
> >>> Streams.
> >>>
> >>> Exception in thread "main"
> >>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> >> topology
> >>> building: KSTREAM-FILTER-0000000003 and KTABLE-SOURCE-0000000005 are
> not
> >>> joinable
> >>> at
> >>>
> >>
> org.apache.kafka.streams.kstream.internals.AbstractStream.ensureJoinableWith(AbstractStream.java:44)
> >>> at
> >>>
> >>
> org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:383)
> >>>
> >>>
> >>> My code looks
> >>>
> >>> KStream<String, String> dataStream = builder
> >>>         .stream(stringDeserializer, stringDeserializer,
> >>> conf.getString(INPUT_TOPIC))
> >>>         .map(App::keyByAap)
> >>>
> >>> KTable<String, String> aapTable = builder.table("lookup_topic");
> >>>
> >>> KStream<String, String> result = dataStream.leftJoin(aapTable, new
> >>> ValueJoinerAap());
> >>>
> >>>
> >>> My ValueJoiner simply concat two Strings.  I think the problem is in
> map
> >>> where I change the key in order to join by this field. In the internals
> >> of
> >>> KafkaStreams we can see  that map function set to null source nodes
> >>>
> >>> @Override
> >>> public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V,
KeyValue<K1,
> >>> V1>> mapper) {
> >>>     String name = topology.newName(MAP_NAME);
> >>>
> >>>     topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
> >>>
> >>>     return new KStreamImpl<>(topology, name, null);
> >>> }
> >>>
> >>>
> >>> And after that, in the left join precondition, in function Set<String>
> >>> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); the
> code
> >>> checks if any source nodes is null and, obviously, the KStream which I
> >>> applied the map has the source to null so throw the exception.
> >>>
> >>> if (thisSourceNodes == null || otherSourceNodes == null)
> >>>     throw new TopologyBuilderException(this.name + " and " +
> >>> other.name + " are not joinable");
> >>>
> >>>
> >>> The question is, how can I change the key without lose parent data in
> >> order
> >>> to perform a join with KTable after that?
> >>>
> >>> Thanks.
> >>>
> >>
> >>
> >
>
>


-- 
Gaspar Muñoz
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
<https://twitter.com/StratioBD>*

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message