kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Rosenberg <...@squareup.com>
Subject Re: How to consume from a specific topic, as well as a wildcard of topics?
Date Sat, 04 Apr 2015 04:36:41 GMT
Yeah, I think you need to have 2 consumer connectors (I routinely have
multiple consumer connectors co-existing in the same app).

That error message about the ephemeral node is really annoying, by the
way.  It happens under lots of scenarios (at least it did under 0.8.1.1),
where it simply never recovers, until you bounce the app.  In our case it
seemed to happen after a rebalance failure, etc.  The text of the message
is also really annoying (overly flippant), especially when you get it
spewed to the logs continuously....Essentially, the message gets generated
in cases that have nothing to do with it's original intent, and the cute
log message is useless...

Jason

On Fri, Apr 3, 2015 at 8:03 PM, James Cheng <jcheng@tivo.com> wrote:

> Hi,
>
> I want to consume from both a specific topic "a_topic" as well as all
> topics that match a certain prefix "prefix.*".
>
> When I do that using a single instance of a ConsumerConnector, I get a
> hang when creating the 2nd set of message streams.
>
> Code:
>         ConsumerConnector consumer =
> Consumer.createJavaConsumerConnector(consumerConfig);
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>         topicCountMap.put("a_topic", new Integer(1));
>
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
> consumer.createMessageStreams(topicCountMap);
>
>         // do stuff with resulting streams
>
>         TopicFilter whitelist = new Whitelist("prefix\\.*");
>         List<KafkaStream<byte[], byte[]>> wildcardStreams =
> consumer.createMessageStreamsByFilter(whitelist, 1);
>
> It hangs when inside createMessageStreamsByFilter(), within
> createEphemeralPathExpectConflictHandleZKBug(): There is an info() message
> saying:
>         info("I wrote this conflicted ephemeral node [%s] at %s a while
> back in a different session, ".format(data, path)
>                   + "hence I will backoff for this node to be deleted by
> Zookeeper and retry")
>
> Is this expected to work? Can a ConsumerConnector be used like this, or
> should I have 2 ConsumerConnectors; one for the specific topic, and another
> for the wildcarded topics?
>
> It works when I use 2 ConsumerConnectors, but I just wanted to check if
> this is expected or not.
>
> Thanks,
> -James
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message