kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: kafka stream to new topic based on message key
Date Tue, 04 Oct 2016 22:34:51 GMT
Hello Gary,

This is also doable in the Processor API, you can use the record collector
from ProcessorContext to send data to arbitrary topics, i.e.:

RecordCollector collector = ((RecordCollector.Supplier)
collector.send(new ProducerRecord<>(topic, *...*), keySerializer,
valSerializer, partitioner);

But note that if the new topic, e.g. "123456_lv2" is not created, then
the send call will thrown an exception unless the borker-side config
"auto.topic.create.enabled" is set to true; and even in this case, the
topic will be auto-created with the pre-defined number of partitions,
i.e. you cannot control how the topics can be created with what
configs such as compaction policy, num.partitions, segment sizes, etc.
If that works for you then I think it should be fine.


On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gogden@gmail.com> wrote:

> Is it possible, in a kafka streaming job, to write to another topic based
> on the key in the messages?
> For example, say the message is:
> 123456#{"id":56789,"type":1}
> where the key is 123456, # is the delimeter, and the {} is the json data.
> And I want to push the json data to another topic that will have the name
> 123456_lv2.
> Is this possible with kafka streaming?

-- Guozhang

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message