kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Artur Mrozowski <art...@gmail.com>
Subject Re: Joins in Kafka Streams and partitioning of the topics
Date Thu, 30 Nov 2017 22:26:09 GMT
Yes you are probably right. So I was inspired be the KIP 150 blog post, so
the entire statement would be like this:

KTable<Integer,CustomerList> customerGrouped=
kStreamBuilder.stream(stringSerde, customerMessageSerde,
CUSTOMER_TOPIC)

                .groupBy((key,value) ->

Integer.parseInt(value.customer.replaceFirst("cust","")),integerSerde,customerMessageSerde)

                .aggregate(CustomerList::new,(ckey, custMessage,
customerList) -> {
                    customerList.lst.add(custMessage);
                    return customerList;
                },customerListSerde,CUSTOMER_STORE);


The second oddity is beacause I want to gather history, all the previous
records. Exactly the same logic as the shopping cart , purchases and
wishList  described in KIP 150. The result of the joins will contain all
the history for particular key.
You mention repartioning. I also have a feeling thea GlobaKTables are more
suitable for look ups and not what I am trying to do here.
How do I repartition the topics? Is it by using keyed messages or are there
other alternatives? I've noticed that when I run two instances of the
application than each have only half of the records posted to the 4
partitions topic.

I'd be happy to join the slack, do I need an invite?

/Artur
n

On Thu, Nov 30, 2017 at 9:30 PM, Jan Filipiak <Jan.Filipiak@trivago.com>
wrote:

> There are some oddities in your topology that make make we wonder if they
> are the true drivers of your question.
>
> https://github.com/afuyo/KStreamsDemo/blob/master/src/main/
> java/kstream.demo/CustomerStreamPipelineHDI.java#L300
> Feels like it should be a KTable to begin with for example otherwise it is
> not clear how big this is supposed to grow
> https://github.com/afuyo/KStreamsDemo/blob/master/src/main/
> java/kstream.demo/CustomerStreamPipelineHDI.java#L325
> Same thing for policies. KGlobalTable might be chipped in later if you fat
> up from too many repartitions as some sort of
> performance optimisation, but my opinions on it are not to high.
>
>
> Hope that helps, just keep the questions coming, also check if you might
> want to join confluentcommunity on slack.
> Could never imaging that something like a insurance can really be modelled
> as 4 streams ;)
>
> Best Jan
>
>
>
>
>
>
> On 30.11.2017 21:07, Artur Mrozowski wrote:
>
>> what if I start two instances of that application?  Does the state migrate
>> between the applications? Is it then I have to use a global table?
>>
>> BR
>> Artur
>>
>> On Thu, Nov 30, 2017 at 7:40 PM, Jan Filipiak <Jan.Filipiak@trivago.com>
>> wrote:
>>
>> Hi,
>>>
>>> Haven't checked your code. But from what you describe you should be fine.
>>> Upgrading the version might help here and there but should still work
>>> with
>>> 0.10
>>> I guess.
>>>
>>> Best Jan
>>>
>>>
>>>
>>> On 30.11.2017 19:16, Artur Mrozowski wrote:
>>>
>>> Thank you Damian, it was very helpful.
>>>> I have implemented my solution in version 0.11.0.2 but there is one
>>>> thing
>>>> I
>>>> still wonder.
>>>> So what I try to do is what is described in KIP 150. Since it didn't
>>>> make
>>>> to the release for 1.0 I do it the old fashioned way.
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+
>>>> Kafka-Streams+Cogroup
>>>> First
>>>> KTable<K, V1> table1 =
>>>> builder.stream("topic1").groupByKey().aggregate(initializer1,
>>>> aggregator1, aggValueSerde1, storeName1);
>>>>
>>>>
>>>> for all the four topics and then I join the results.
>>>> And here is the thing, the topics are partitioned and I don't used
>>>> global
>>>> tables, nor keyed messages and it seems to work fine.
>>>>
>>>>   From Confluents documentation one could get impression that when
>>>> reading
>>>> from partitoned topics you need to use global tables. But is it really
>>>> necessary in this case?
>>>> And if not then why?
>>>>
>>>> Thanks again
>>>> Artur
>>>>
>>>> Here is the link to my implementation
>>>>
>>>> https://github.com/afuyo/KStreamsDemo/blob/master/src/main/
>>>> java/kstream.demo/CustomerStreamPipelineHDI.java
>>>>
>>>> On Wed, Nov 22, 2017 at 12:10 PM, Damian Guy <damian.guy@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Artur,
>>>>
>>>>> KafkaStreams 0.10.0.0 is quite old and a lot has changed and been fixed
>>>>> since then. If possible i'd recommend upgrading to at least 0.11.0.2
or
>>>>> 1.0.
>>>>> For joins you need to ensure that the topics have the same number of
>>>>> partitions (which they do) and that they are keyed the same.
>>>>>
>>>>> Thanks,
>>>>> Damian
>>>>>
>>>>> On Wed, 22 Nov 2017 at 11:02 Artur Mrozowski <artmro@gmail.com>
wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>>> I am joining 4 different topic with 4 partitions each using 0.10.0.0
>>>>>> version of Kafka Streams.  The joins are KTable to KTable. Is there
>>>>>> anything I should be aware of considering partitions or version of
>>>>>> Kafka
>>>>>> Streams? In other words should I be expecting consistent results
or
>>>>>> do I
>>>>>> need to for example use Global tables.
>>>>>>
>>>>>> I'd like to run that application on Kubernetes later on. Should I
>>>>>> think
>>>>>>
>>>>>> of
>>>>>
>>>>> anything or do different instances of the same Kafka Streams
>>>>>> application
>>>>>> take care of management of the state?
>>>>>>
>>>>>> Grateful for any thoughts or a piece of advice
>>>>>>
>>>>>> Best Regards
>>>>>> /Artur
>>>>>>
>>>>>>
>>>>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message