kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: Understanding org.apache.kafka.streams.errors.TopologyBuilderException
Date Sun, 09 Oct 2016 07:20:54 GMT
Thanks for pointing this out.
I am doing exactly like this now and it is working fine.

Sachin


On Sun, Oct 9, 2016 at 12:32 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> You must ensure, that both streams are co-partitioned (ie, same number
> of partitions and using the join key).
>
> (see "Note" box:
> http://docs.confluent.io/3.0.1/streams/developer-guide.
> html#joining-streams)
>
> You can enforce co-partitioning by introducing a call to .through()
> before doing the join (on either one or both input streams). You need to
> insert .through() for an input stream, if you did (potentially) modify
> the key (eg, you did apply .selectKey(), map(), or flatMap() before the
> join).
>
> If one stream's key is not modified, it is sufficient to only
> re-distribute the other stream via .through(). Also keep in mind, that
> you should create the topic use in .through() manually with the right
> number of partitions before you start you application.
>
>
> -Matthias
>
> On 10/08/2016 08:54 AM, Sachin Mittal wrote:
> > I don't think that is the issue.
> > The join api says:
> > public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V,
V1,
> R>
> > joiner)
> > In my case V is Map<String, List<X>>
> > V1 is List<X>
> > R is Map<String, List<X>>
> > K is String
> >
> > Note the final V and V1 are arrived after doing transformation on
> original
> > streams
> > <String, Y>
> >
> > So there are intermediate steps like
> > stream.map(new KeyValueMapper<String, Y, KeyValue<String, X>>())
> > and
> > table.mapValues(new ValueMapper<X, Map<String,X>>()
> >
> > So whenever I modify the structure of a stream or table do I need to back
> > it up with a new kafka topic calling through("new-mapped-topic") ?
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> > On Sat, Oct 8, 2016 at 7:29 PM, Martin Gainty <mgainty@hotmail.com>
> wrote:
> >
> >>
> >>
> >>
> >>> From: sjmittal@gmail.com
> >>> Date: Sat, 8 Oct 2016 15:30:33 +0530
> >>> Subject: Understanding org.apache.kafka.streams.errors.
> >> TopologyBuilderException
> >>> To: users@kafka.apache.org
> >>>
> >>> Hi,
> >>> I am getting this exception
> >>>
> >>> Exception in thread "main"
> >>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> >> topology
> >>> building: KTABLE-MAPVALUES-0000000007 and KSTREAM-AGGREGATE-0000000009
> >> are
> >>> not joinable
> >>>
> >> MG>look at join declaration for org.apache.kafka.streams.
> >> kstream.internals.KTableImpl.java
> >> MG> public <V1, R> KTable<K, R> join(KTable<K, V1> other,
ValueJoiner<V,
> >> V1, R> joiner)
> >> MG>method join assumes 2 collections that exactly match the generic
> >> declaration of join method
> >>
> >> MG>KTable<String, Map<String, List<V1>>> !=  KTable<String,
List<V1>>
> (2nd
> >> parameter is missing both V and R declarators)
> >> MG>you can establish a new collection of KTable<String, List<V1>>
> >>
> >> MG>and then *join* KTable<String, Map<String, List<V1>>>
 into
> >> KTable<String, List<V1>>  thru custom join method
> >>
> >>> What I am trying to do is I aggregate a KStream into a KTable of type
> >>> KTable<String, Map<String, List<V>>>
> >>>
> >>> and I am trying to join it to another KStream which is aggregated into
> >>> another KTable of type
> >>>  KTable<String, List<V>>
> >>>
> >>> Since keys of both the final KTable are same, I don't understand why it
> >> is
> >>> giving this exception.
> >>>
> >>> Thanks
> >>> Sachin
> >>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message