kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: kafka stream to new topic based on message key
Date Fri, 07 Oct 2016 06:12:09 GMT
If you can create a ZK client inside your processor implementation then you
can definitely to create any topics by talking to ZK directly, it's just
that Kafka Streams public interface does not expose any efficient ways
beyond that for now.

Note that in KIP-4 we are trying to introduce the admin client for such
tasks such as create / delete topics, it has added such requests in the
upcoming 0.10.1.0 release, but the full implementation is yet to be
completed.


Guozhang


On Thu, Oct 6, 2016 at 12:48 PM, Gary Ogden <gogden@gmail.com> wrote:

> Thanks Guozhang. I've gotten an example to work using your tips.
>
> So, is there no other way in streams to create a topic if
> "auto.topic.create.enabled"
> is set to false?  Maybe by creating a record in zookeeper for that topic?
>
>
>
> On 5 October 2016 at 17:20, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Hello Gary,
> >
> >
> > 1. The InternalTopicManager is only used by the Streams-instantiated
> > PartitionAssignor to create internal topics for auto-repartitioning and
> > changelog.
> >
> > 2. About "RecordCollector.Supplier": you are right, and as I wrote in the
> > above email you have to force casting it to RecordCollector.Supplier,
> > theoretically this is not safe but the internal Impl is always used.
> >
> >
> > If you know before hand of all the possible topics that you would want to
> > send based on the key-value pair, you can then use KStreams.branch() to
> > branch the source stream into multiple ones based on the content, with
> each
> > branched stream to a different topic.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden <gogden@gmail.com> wrote:
> >
> > > Guozhang. I was just looking at the source for this, and it looks like
> > the
> > > RecordCollector.Supplier is part of the internal ProcessorContextImpl
> > > class.  I don't think that's exposed to me, is it?
> > >
> > > If I create a processor class that extends AbstractProcess, it only has
> > > access to the ProcessorContext interface, which doesn't expose the
> > > Supplier.
> > >
> > > On 5 October 2016 at 09:42, Gary Ogden <gogden@gmail.com> wrote:
> > >
> > > > What if we were to use kafka connect instead of streams? Does it have
> > the
> > > > ability to specify partitions, rf, segment size etc?
> > > >
> > > > On 5 October 2016 at 09:42, Gary Ogden <gogden@gmail.com> wrote:
> > > >
> > > >> Thanks Guozhang.
> > > >>
> > > >> So there's no way we could also use InternalTopicManager to specify
> > the
> > > >> number of partitions and RF?
> > > >>
> > > >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> > > >> /java/org/apache/kafka/streams/processor/internals/InternalT
> > > >> opicManager.java
> > > >>
> > > >> On 4 October 2016 at 19:34, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> > > >>
> > > >>> Hello Gary,
> > > >>>
> > > >>> This is also doable in the Processor API, you can use the record
> > > >>> collector
> > > >>> from ProcessorContext to send data to arbitrary topics, i.e.:
> > > >>>
> > > >>> RecordCollector collector = ((RecordCollector.Supplier)
> > > >>> context).recordCollector();
> > > >>> collector.send(new ProducerRecord<>(topic, *...*), keySerializer,
> > > >>> valSerializer, partitioner);
> > > >>>
> > > >>>
> > > >>> But note that if the new topic, e.g. "123456_lv2" is not created,
> > then
> > > >>> the send call will thrown an exception unless the borker-side
> config
> > > >>> "auto.topic.create.enabled" is set to true; and even in this case,
> > the
> > > >>> topic will be auto-created with the pre-defined number of
> partitions,
> > > >>> i.e. you cannot control how the topics can be created with what
> > > >>> configs such as compaction policy, num.partitions, segment sizes,
> > etc.
> > > >>> If that works for you then I think it should be fine.
> > > >>>
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gogden@gmail.com>
> > wrote:
> > > >>>
> > > >>> > Is it possible, in a kafka streaming job, to write to another
> topic
> > > >>> based
> > > >>> > on the key in the messages?
> > > >>> >
> > > >>> > For example, say the message is:
> > > >>> >
> > > >>> > 123456#{"id":56789,"type":1}
> > > >>> >
> > > >>> > where the key is 123456, # is the delimeter, and the {} is
the
> json
> > > >>> data.
> > > >>> >
> > > >>> > And I want to push the json data to another topic that will
have
> > the
> > > >>> name
> > > >>> > 123456_lv2.
> > > >>> >
> > > >>> > Is this possible with kafka streaming?
> > > >>> >
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> -- Guozhang
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message