kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Ogden <gog...@gmail.com>
Subject Re: kafka stream to new topic based on message key
Date Fri, 07 Oct 2016 13:30:50 GMT
Thanks for all the help gents. I really appreciate it. It's exactly what I
needed.

On 7 October 2016 at 06:56, Michael Noll <michael@confluent.io> wrote:

> Gary,
>
> adding to what Guozhang said:  Yes, you can programmatically create a new
> Kafka topic from within your application.  But how you'd do that varies a
> bit between current Kafka versions and the upcoming 0.10.1 release.
>
> As of today (Kafka versions before the upcoming 0.10.1 release), you would
> need to create your topic manually.  This can be on the CLI (which doesn't
> help you in your scenario) or programmatically.  Right now programmatically
> means you must directly talk to ZooKeeper, e.g. via zkclient.  If you are
> looking for an example, the code at [1] may be helpful.  That code creates
> a topic in ZK by using zkclient and Kafka's `AdminUtils`.
>
> Looking ahead, Kafka's upcoming 0.10.1 release introduces an admin client
> for creating/deleting topics (this new functionality is part of the
> not-yet-fully-completed work on KIP-4).  This would give you a new
> programmatic approach to create a topic without having to communicate with
> ZooKeeper directly.
>
> Lastly, keep in mind that it takes a while for a Kafka topic to be
> created.  So you may run into race condition like situations.  You may
> therefore want to double-check that the newly created topic is actually
> ready-to-use before beginning to write to it or read from it.
>
> Hope this helps!
> Michael
>
>
>
>
> [1]
> https://github.com/confluentinc/examples/blob/
> master/kafka-streams/src/test/java/io/confluent/examples/
> streams/kafka/KafkaEmbedded.java#L133-L160
>
>
>
>
>
>
>
> On Fri, Oct 7, 2016 at 8:12 AM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > If you can create a ZK client inside your processor implementation then
> you
> > can definitely to create any topics by talking to ZK directly, it's just
> > that Kafka Streams public interface does not expose any efficient ways
> > beyond that for now.
> >
> > Note that in KIP-4 we are trying to introduce the admin client for such
> > tasks such as create / delete topics, it has added such requests in the
> > upcoming 0.10.1.0 release, but the full implementation is yet to be
> > completed.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Oct 6, 2016 at 12:48 PM, Gary Ogden <gogden@gmail.com> wrote:
> >
> > > Thanks Guozhang. I've gotten an example to work using your tips.
> > >
> > > So, is there no other way in streams to create a topic if
> > > "auto.topic.create.enabled"
> > > is set to false?  Maybe by creating a record in zookeeper for that
> topic?
> > >
> > >
> > >
> > > On 5 October 2016 at 17:20, Guozhang Wang <wangguoz@gmail.com> wrote:
> > >
> > > > Hello Gary,
> > > >
> > > >
> > > > 1. The InternalTopicManager is only used by the Streams-instantiated
> > > > PartitionAssignor to create internal topics for auto-repartitioning
> and
> > > > changelog.
> > > >
> > > > 2. About "RecordCollector.Supplier": you are right, and as I wrote in
> > the
> > > > above email you have to force casting it to RecordCollector.Supplier,
> > > > theoretically this is not safe but the internal Impl is always used.
> > > >
> > > >
> > > > If you know before hand of all the possible topics that you would
> want
> > to
> > > > send based on the key-value pair, you can then use KStreams.branch()
> to
> > > > branch the source stream into multiple ones based on the content,
> with
> > > each
> > > > branched stream to a different topic.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden <gogden@gmail.com> wrote:
> > > >
> > > > > Guozhang. I was just looking at the source for this, and it looks
> > like
> > > > the
> > > > > RecordCollector.Supplier is part of the internal
> ProcessorContextImpl
> > > > > class.  I don't think that's exposed to me, is it?
> > > > >
> > > > > If I create a processor class that extends AbstractProcess, it only
> > has
> > > > > access to the ProcessorContext interface, which doesn't expose the
> > > > > Supplier.
> > > > >
> > > > > On 5 October 2016 at 09:42, Gary Ogden <gogden@gmail.com> wrote:
> > > > >
> > > > > > What if we were to use kafka connect instead of streams? Does
it
> > have
> > > > the
> > > > > > ability to specify partitions, rf, segment size etc?
> > > > > >
> > > > > > On 5 October 2016 at 09:42, Gary Ogden <gogden@gmail.com>
wrote:
> > > > > >
> > > > > >> Thanks Guozhang.
> > > > > >>
> > > > > >> So there's no way we could also use InternalTopicManager
to
> > specify
> > > > the
> > > > > >> number of partitions and RF?
> > > > > >>
> > > > > >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> > > > > >> /java/org/apache/kafka/streams/processor/internals/InternalT
> > > > > >> opicManager.java
> > > > > >>
> > > > > >> On 4 October 2016 at 19:34, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >>> Hello Gary,
> > > > > >>>
> > > > > >>> This is also doable in the Processor API, you can use
the
> record
> > > > > >>> collector
> > > > > >>> from ProcessorContext to send data to arbitrary topics,
i.e.:
> > > > > >>>
> > > > > >>> RecordCollector collector = ((RecordCollector.Supplier)
> > > > > >>> context).recordCollector();
> > > > > >>> collector.send(new ProducerRecord<>(topic, *...*),
> keySerializer,
> > > > > >>> valSerializer, partitioner);
> > > > > >>>
> > > > > >>>
> > > > > >>> But note that if the new topic, e.g. "123456_lv2" is
not
> created,
> > > > then
> > > > > >>> the send call will thrown an exception unless the borker-side
> > > config
> > > > > >>> "auto.topic.create.enabled" is set to true; and even
in this
> > case,
> > > > the
> > > > > >>> topic will be auto-created with the pre-defined number
of
> > > partitions,
> > > > > >>> i.e. you cannot control how the topics can be created
with what
> > > > > >>> configs such as compaction policy, num.partitions, segment
> sizes,
> > > > etc.
> > > > > >>> If that works for you then I think it should be fine.
> > > > > >>>
> > > > > >>>
> > > > > >>> Guozhang
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gogden@gmail.com>
> > > > wrote:
> > > > > >>>
> > > > > >>> > Is it possible, in a kafka streaming job, to write
to another
> > > topic
> > > > > >>> based
> > > > > >>> > on the key in the messages?
> > > > > >>> >
> > > > > >>> > For example, say the message is:
> > > > > >>> >
> > > > > >>> > 123456#{"id":56789,"type":1}
> > > > > >>> >
> > > > > >>> > where the key is 123456, # is the delimeter, and
the {} is
> the
> > > json
> > > > > >>> data.
> > > > > >>> >
> > > > > >>> > And I want to push the json data to another topic
that will
> > have
> > > > the
> > > > > >>> name
> > > > > >>> > 123456_lv2.
> > > > > >>> >
> > > > > >>> > Is this possible with kafka streaming?
> > > > > >>> >
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> --
> > > > > >>> -- Guozhang
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message