kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Rosenberg <...@squareup.com>
Subject Re: problem with high-level consumer stream filter regex....
Date Tue, 17 Dec 2013 06:16:26 GMT
Ping....

Any thoughts on this?

Seems like a bug, but then again, we're not sure what the expected behavior
for regexes should be here (e.g. is there a way to whitelist topics with a
filter that looks for a leading substring, but then blocks subsequent
substrings)?  E.g. apply a blacklist to a whitelist :).....

Jason


On Thu, Dec 12, 2013 at 1:01 PM, Jason Rosenberg <jbr@squareup.com> wrote:

> All, I've filed:  https://issues.apache.org/jira/browse/KAFKA-1180
>
> We are needing to create a stream selector that essentially combines the
> logic of the BlackList and WhiteList classes.  That is, we want to select a
> topic that contains a certain prefix, as long as it doesn't also contain a
> secondary string.
>
> This should be easy to do with ordinary java Regex's, but we're running
> into some issues, trying to do this with the WhiteList class only.
>
> We have a pattern that uses negative lookahead, like this:
>
> "test-(?!bad\\b)[\\w]+"
>
> So this should select a topic like: "test-good", but exclude a topic like
> "test-bad", and also exclude a topic without the "test" prefix, like
> "foo-bar".
>
> Instead, what we see is a NullPointerException in the ConsumerIterator,
> and the consumer just hangs, after sending a topic of 'test-topic' followed
> by 'test-bad':
>
> 21700
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683]
> ERROR kafka.consumer.ConsumerFetcherThread  -
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683],
> Error due to
> kafka.common.KafkaException: error processing data for partition
> [test-bad,0] offset 0
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
>  at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>  at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
>  at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
> at kafka.utils.Utils$.inLock(Utils.scala:565)
>  at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> Caused by: java.lang.NullPointerException
> at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>  at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
>  ... 9 more
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message