kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Noll <mich...@confluent.io>
Subject Re: kafka stream to new topic based on message key
Date Fri, 07 Oct 2016 15:02:52 GMT
Great, happy to hear that, Gary!

On Fri, Oct 7, 2016 at 3:30 PM, Gary Ogden <gogden@gmail.com> wrote:

> Thanks for all the help gents. I really appreciate it. It's exactly what I
> needed.
>
> On 7 October 2016 at 06:56, Michael Noll <michael@confluent.io> wrote:
>
> > Gary,
> >
> > adding to what Guozhang said:  Yes, you can programmatically create a new
> > Kafka topic from within your application.  But how you'd do that varies a
> > bit between current Kafka versions and the upcoming 0.10.1 release.
> >
> > As of today (Kafka versions before the upcoming 0.10.1 release), you
> would
> > need to create your topic manually.  This can be on the CLI (which
> doesn't
> > help you in your scenario) or programmatically.  Right now
> programmatically
> > means you must directly talk to ZooKeeper, e.g. via zkclient.  If you are
> > looking for an example, the code at [1] may be helpful.  That code
> creates
> > a topic in ZK by using zkclient and Kafka's `AdminUtils`.
> >
> > Looking ahead, Kafka's upcoming 0.10.1 release introduces an admin client
> > for creating/deleting topics (this new functionality is part of the
> > not-yet-fully-completed work on KIP-4).  This would give you a new
> > programmatic approach to create a topic without having to communicate
> with
> > ZooKeeper directly.
> >
> > Lastly, keep in mind that it takes a while for a Kafka topic to be
> > created.  So you may run into race condition like situations.  You may
> > therefore want to double-check that the newly created topic is actually
> > ready-to-use before beginning to write to it or read from it.
> >
> > Hope this helps!
> > Michael
> >
> >
> >
> >
> > [1]
> > https://github.com/confluentinc/examples/blob/
> > master/kafka-streams/src/test/java/io/confluent/examples/
> > streams/kafka/KafkaEmbedded.java#L133-L160
> >
> >
> >
> >
> >
> >
> >
> > On Fri, Oct 7, 2016 at 8:12 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > If you can create a ZK client inside your processor implementation then
> > you
> > > can definitely to create any topics by talking to ZK directly, it's
> just
> > > that Kafka Streams public interface does not expose any efficient ways
> > > beyond that for now.
> > >
> > > Note that in KIP-4 we are trying to introduce the admin client for such
> > > tasks such as create / delete topics, it has added such requests in the
> > > upcoming 0.10.1.0 release, but the full implementation is yet to be
> > > completed.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Oct 6, 2016 at 12:48 PM, Gary Ogden <gogden@gmail.com> wrote:
> > >
> > > > Thanks Guozhang. I've gotten an example to work using your tips.
> > > >
> > > > So, is there no other way in streams to create a topic if
> > > > "auto.topic.create.enabled"
> > > > is set to false?  Maybe by creating a record in zookeeper for that
> > topic?
> > > >
> > > >
> > > >
> > > > On 5 October 2016 at 17:20, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> > > >
> > > > > Hello Gary,
> > > > >
> > > > >
> > > > > 1. The InternalTopicManager is only used by the
> Streams-instantiated
> > > > > PartitionAssignor to create internal topics for auto-repartitioning
> > and
> > > > > changelog.
> > > > >
> > > > > 2. About "RecordCollector.Supplier": you are right, and as I wrote
> in
> > > the
> > > > > above email you have to force casting it to
> RecordCollector.Supplier,
> > > > > theoretically this is not safe but the internal Impl is always
> used.
> > > > >
> > > > >
> > > > > If you know before hand of all the possible topics that you would
> > want
> > > to
> > > > > send based on the key-value pair, you can then use
> KStreams.branch()
> > to
> > > > > branch the source stream into multiple ones based on the content,
> > with
> > > > each
> > > > > branched stream to a different topic.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden <gogden@gmail.com>
> wrote:
> > > > >
> > > > > > Guozhang. I was just looking at the source for this, and it
looks
> > > like
> > > > > the
> > > > > > RecordCollector.Supplier is part of the internal
> > ProcessorContextImpl
> > > > > > class.  I don't think that's exposed to me, is it?
> > > > > >
> > > > > > If I create a processor class that extends AbstractProcess,
it
> only
> > > has
> > > > > > access to the ProcessorContext interface, which doesn't expose
> the
> > > > > > Supplier.
> > > > > >
> > > > > > On 5 October 2016 at 09:42, Gary Ogden <gogden@gmail.com>
wrote:
> > > > > >
> > > > > > > What if we were to use kafka connect instead of streams?
Does
> it
> > > have
> > > > > the
> > > > > > > ability to specify partitions, rf, segment size etc?
> > > > > > >
> > > > > > > On 5 October 2016 at 09:42, Gary Ogden <gogden@gmail.com>
> wrote:
> > > > > > >
> > > > > > >> Thanks Guozhang.
> > > > > > >>
> > > > > > >> So there's no way we could also use InternalTopicManager
to
> > > specify
> > > > > the
> > > > > > >> number of partitions and RF?
> > > > > > >>
> > > > > > >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> > > > > > >> /java/org/apache/kafka/streams/processor/internals/InternalT
> > > > > > >> opicManager.java
> > > > > > >>
> > > > > > >> On 4 October 2016 at 19:34, Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > > > >>
> > > > > > >>> Hello Gary,
> > > > > > >>>
> > > > > > >>> This is also doable in the Processor API, you can
use the
> > record
> > > > > > >>> collector
> > > > > > >>> from ProcessorContext to send data to arbitrary
topics, i.e.:
> > > > > > >>>
> > > > > > >>> RecordCollector collector = ((RecordCollector.Supplier)
> > > > > > >>> context).recordCollector();
> > > > > > >>> collector.send(new ProducerRecord<>(topic,
*...*),
> > keySerializer,
> > > > > > >>> valSerializer, partitioner);
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> But note that if the new topic, e.g. "123456_lv2"
is not
> > created,
> > > > > then
> > > > > > >>> the send call will thrown an exception unless the
borker-side
> > > > config
> > > > > > >>> "auto.topic.create.enabled" is set to true; and
even in this
> > > case,
> > > > > the
> > > > > > >>> topic will be auto-created with the pre-defined
number of
> > > > partitions,
> > > > > > >>> i.e. you cannot control how the topics can be created
with
> what
> > > > > > >>> configs such as compaction policy, num.partitions,
segment
> > sizes,
> > > > > etc.
> > > > > > >>> If that works for you then I think it should be
fine.
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> Guozhang
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <
> gogden@gmail.com>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>> > Is it possible, in a kafka streaming job,
to write to
> another
> > > > topic
> > > > > > >>> based
> > > > > > >>> > on the key in the messages?
> > > > > > >>> >
> > > > > > >>> > For example, say the message is:
> > > > > > >>> >
> > > > > > >>> > 123456#{"id":56789,"type":1}
> > > > > > >>> >
> > > > > > >>> > where the key is 123456, # is the delimeter,
and the {} is
> > the
> > > > json
> > > > > > >>> data.
> > > > > > >>> >
> > > > > > >>> > And I want to push the json data to another
topic that will
> > > have
> > > > > the
> > > > > > >>> name
> > > > > > >>> > 123456_lv2.
> > > > > > >>> >
> > > > > > >>> > Is this possible with kafka streaming?
> > > > > > >>> >
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> --
> > > > > > >>> -- Guozhang
> > > > > > >>>
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message