kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: Understanding org.apache.kafka.streams.errors.TopologyBuilderException
Date Sat, 08 Oct 2016 15:54:48 GMT
I don't think that is the issue.
The join api says:
public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V,
V1, R>
joiner)
In my case V is Map<String, List<X>>
V1 is List<X>
R is Map<String, List<X>>
K is String

Note the final V and V1 are arrived after doing transformation on original
streams
<String, Y>

So there are intermediate steps like
stream.map(new KeyValueMapper<String, Y, KeyValue<String, X>>())
and
table.mapValues(new ValueMapper<X, Map<String,X>>()

So whenever I modify the structure of a stream or table do I need to back
it up with a new kafka topic calling through("new-mapped-topic") ?

Thanks
Sachin




On Sat, Oct 8, 2016 at 7:29 PM, Martin Gainty <mgainty@hotmail.com> wrote:

>
>
>
> > From: sjmittal@gmail.com
> > Date: Sat, 8 Oct 2016 15:30:33 +0530
> > Subject: Understanding org.apache.kafka.streams.errors.
> TopologyBuilderException
> > To: users@kafka.apache.org
> >
> > Hi,
> > I am getting this exception
> >
> > Exception in thread "main"
> > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> topology
> > building: KTABLE-MAPVALUES-0000000007 and KSTREAM-AGGREGATE-0000000009
> are
> > not joinable
> >
> MG>look at join declaration for org.apache.kafka.streams.
> kstream.internals.KTableImpl.java
> MG> public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V,
> V1, R> joiner)
> MG>method join assumes 2 collections that exactly match the generic
> declaration of join method
>
> MG>KTable<String, Map<String, List<V1>>> !=  KTable<String, List<V1>>
(2nd
> parameter is missing both V and R declarators)
> MG>you can establish a new collection of KTable<String, List<V1>>
>
> MG>and then *join* KTable<String, Map<String, List<V1>>>  into
> KTable<String, List<V1>>  thru custom join method
>
> > What I am trying to do is I aggregate a KStream into a KTable of type
> > KTable<String, Map<String, List<V>>>
> >
> > and I am trying to join it to another KStream which is aggregated into
> > another KTable of type
> >  KTable<String, List<V>>
> >
> > Since keys of both the final KTable are same, I don't understand why it
> is
> > giving this exception.
> >
> > Thanks
> > Sachin
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message