kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Noll <mich...@confluent.io>
Subject Re: kafka stream to new topic based on message key
Date Fri, 07 Oct 2016 09:56:04 GMT
Gary,

adding to what Guozhang said:  Yes, you can programmatically create a new
Kafka topic from within your application.  But how you'd do that varies a
bit between current Kafka versions and the upcoming 0.10.1 release.

As of today (Kafka versions before the upcoming 0.10.1 release), you would
need to create your topic manually.  This can be on the CLI (which doesn't
help you in your scenario) or programmatically.  Right now programmatically
means you must directly talk to ZooKeeper, e.g. via zkclient.  If you are
looking for an example, the code at [1] may be helpful.  That code creates
a topic in ZK by using zkclient and Kafka's `AdminUtils`.

Looking ahead, Kafka's upcoming 0.10.1 release introduces an admin client
for creating/deleting topics (this new functionality is part of the
not-yet-fully-completed work on KIP-4).  This would give you a new
programmatic approach to create a topic without having to communicate with
ZooKeeper directly.

Lastly, keep in mind that it takes a while for a Kafka topic to be
created.  So you may run into race condition like situations.  You may
therefore want to double-check that the newly created topic is actually
ready-to-use before beginning to write to it or read from it.

Hope this helps!
Michael




[1]
https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/kafka/KafkaEmbedded.java#L133-L160







On Fri, Oct 7, 2016 at 8:12 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> If you can create a ZK client inside your processor implementation then you
> can definitely to create any topics by talking to ZK directly, it's just
> that Kafka Streams public interface does not expose any efficient ways
> beyond that for now.
>
> Note that in KIP-4 we are trying to introduce the admin client for such
> tasks such as create / delete topics, it has added such requests in the
> upcoming 0.10.1.0 release, but the full implementation is yet to be
> completed.
>
>
> Guozhang
>
>
> On Thu, Oct 6, 2016 at 12:48 PM, Gary Ogden <gogden@gmail.com> wrote:
>
> > Thanks Guozhang. I've gotten an example to work using your tips.
> >
> > So, is there no other way in streams to create a topic if
> > "auto.topic.create.enabled"
> > is set to false?  Maybe by creating a record in zookeeper for that topic?
> >
> >
> >
> > On 5 October 2016 at 17:20, Guozhang Wang <wangguoz@gmail.com> wrote:
> >
> > > Hello Gary,
> > >
> > >
> > > 1. The InternalTopicManager is only used by the Streams-instantiated
> > > PartitionAssignor to create internal topics for auto-repartitioning and
> > > changelog.
> > >
> > > 2. About "RecordCollector.Supplier": you are right, and as I wrote in
> the
> > > above email you have to force casting it to RecordCollector.Supplier,
> > > theoretically this is not safe but the internal Impl is always used.
> > >
> > >
> > > If you know before hand of all the possible topics that you would want
> to
> > > send based on the key-value pair, you can then use KStreams.branch() to
> > > branch the source stream into multiple ones based on the content, with
> > each
> > > branched stream to a different topic.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden <gogden@gmail.com> wrote:
> > >
> > > > Guozhang. I was just looking at the source for this, and it looks
> like
> > > the
> > > > RecordCollector.Supplier is part of the internal ProcessorContextImpl
> > > > class.  I don't think that's exposed to me, is it?
> > > >
> > > > If I create a processor class that extends AbstractProcess, it only
> has
> > > > access to the ProcessorContext interface, which doesn't expose the
> > > > Supplier.
> > > >
> > > > On 5 October 2016 at 09:42, Gary Ogden <gogden@gmail.com> wrote:
> > > >
> > > > > What if we were to use kafka connect instead of streams? Does it
> have
> > > the
> > > > > ability to specify partitions, rf, segment size etc?
> > > > >
> > > > > On 5 October 2016 at 09:42, Gary Ogden <gogden@gmail.com> wrote:
> > > > >
> > > > >> Thanks Guozhang.
> > > > >>
> > > > >> So there's no way we could also use InternalTopicManager to
> specify
> > > the
> > > > >> number of partitions and RF?
> > > > >>
> > > > >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> > > > >> /java/org/apache/kafka/streams/processor/internals/InternalT
> > > > >> opicManager.java
> > > > >>
> > > > >> On 4 October 2016 at 19:34, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > > > >>
> > > > >>> Hello Gary,
> > > > >>>
> > > > >>> This is also doable in the Processor API, you can use the
record
> > > > >>> collector
> > > > >>> from ProcessorContext to send data to arbitrary topics, i.e.:
> > > > >>>
> > > > >>> RecordCollector collector = ((RecordCollector.Supplier)
> > > > >>> context).recordCollector();
> > > > >>> collector.send(new ProducerRecord<>(topic, *...*),
keySerializer,
> > > > >>> valSerializer, partitioner);
> > > > >>>
> > > > >>>
> > > > >>> But note that if the new topic, e.g. "123456_lv2" is not
created,
> > > then
> > > > >>> the send call will thrown an exception unless the borker-side
> > config
> > > > >>> "auto.topic.create.enabled" is set to true; and even in this
> case,
> > > the
> > > > >>> topic will be auto-created with the pre-defined number of
> > partitions,
> > > > >>> i.e. you cannot control how the topics can be created with
what
> > > > >>> configs such as compaction policy, num.partitions, segment
sizes,
> > > etc.
> > > > >>> If that works for you then I think it should be fine.
> > > > >>>
> > > > >>>
> > > > >>> Guozhang
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gogden@gmail.com>
> > > wrote:
> > > > >>>
> > > > >>> > Is it possible, in a kafka streaming job, to write to
another
> > topic
> > > > >>> based
> > > > >>> > on the key in the messages?
> > > > >>> >
> > > > >>> > For example, say the message is:
> > > > >>> >
> > > > >>> > 123456#{"id":56789,"type":1}
> > > > >>> >
> > > > >>> > where the key is 123456, # is the delimeter, and the
{} is the
> > json
> > > > >>> data.
> > > > >>> >
> > > > >>> > And I want to push the json data to another topic that
will
> have
> > > the
> > > > >>> name
> > > > >>> > 123456_lv2.
> > > > >>> >
> > > > >>> > Is this possible with kafka streaming?
> > > > >>> >
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> --
> > > > >>> -- Guozhang
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message