kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Cheng <jch...@tivo.com>
Subject How to consume from a specific topic, as well as a wildcard of topics?
Date Sat, 04 Apr 2015 00:03:15 GMT
Hi,

I want to consume from both a specific topic "a_topic" as well as all topics that match a
certain prefix "prefix.*".

When I do that using a single instance of a ConsumerConnector, I get a hang when creating
the 2nd set of message streams.

Code:
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
	Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("a_topic", new Integer(1));
        
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

        // do stuff with resulting streams
	
        TopicFilter whitelist = new Whitelist("prefix\\.*");
        List<KafkaStream<byte[], byte[]>> wildcardStreams = consumer.createMessageStreamsByFilter(whitelist,
1);

It hangs when inside createMessageStreamsByFilter(), within createEphemeralPathExpectConflictHandleZKBug():
There is an info() message saying:
	info("I wrote this conflicted ephemeral node [%s] at %s a while back in a different session,
".format(data, path)
                  + "hence I will backoff for this node to be deleted by Zookeeper and retry")

Is this expected to work? Can a ConsumerConnector be used like this, or should I have 2 ConsumerConnectors;
one for the specific topic, and another for the wildcarded topics?

It works when I use 2 ConsumerConnectors, but I just wanted to check if this is expected or
not.

Thanks,
-James


Mime
View raw message