nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Viking K <>
Subject Re: ConsumeKafkaRecord won't pull new events from Kafka
Date Tue, 13 Nov 2018 17:50:25 GMT

Yea I was thinking in the lines of;
Maybe its a broken Avro record or something similar inside the Kafka Topic thats causing ConsumeKafkaRecord
to lock up.
Since the ConsumeKafkaRecord uses the newer record readers while ConsumeKafka don't.
If that is the case it should materialize itself in the ConvertRecord processor, and maybe
it should be possible to grab out the error from there instead.

From: Joe Witt <>
Sent: Tuesday, November 13, 2018 5:32 PM
Subject: Re: ConsumeKafkaRecord won't pull new events from Kafka


Just to clarify it isn't that it is trying to merge records to create
larger flow files but rather it is 'playing the kafka interaction as
it lies' so to speak.  When polling from Kafka we can get one or more
records.  We're just taking advantage of that without trading off

On Tue, Nov 13, 2018 at 12:24 PM Viking K <> wrote:
> Mike, are we talking about complete avro messages or bare records (schema is contained
inside the avro file  or do they use a schema registry)
> From my own testing the ConsumeKafkaRecord tries to bundle in the incoming messages to
create larger flow files.
> Do you use any Kafka headers in the processor?
> Also what happens if you try to replicate the behavior of ConsumeKafkaRecord like this.
I don't know if you need the ConvertRecord but it might be needed to pick out a schema name
to use as Merge strategy.
> ConsumeKafka -> (ConvertRecord) -> Merge Content
> /Viking
> ________________________________
> From: Mike Thomsen <>
> Sent: Tuesday, November 13, 2018 3:02 PM
> To:
> Subject: Re: ConsumeKafkaRecord won't pull new events from Kafka
> Closest thing I see to something that implies something might be awry is this in nifi-app.log:
> kafka.consumer:type=app-info,id=consumer-1
>     at com.sun.jmx.mbeanserver.Repository.addMBean(
>     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(
>     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(
>     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(
>     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(
>     at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(
>     at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(
>     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
>     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
>     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
>     at org.apache.nifi.processors.kafka.pubsub.ConsumerPool.createKafkaConsumer(
>     at org.apache.nifi.processors.kafka.pubsub.ConsumerPool.obtainConsumer(
>     at org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.onTrigger(
>     at org.apache.nifi.processor.AbstractProcessor.onTrigger(
>     at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>     at org.apache.nifi.controller.tasks.ConnectableTask.invoke(
>     at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$
>     at java.util.concurrent.Executors$
>     at java.util.concurrent.FutureTask.runAndReset(
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(
>     at java.util.concurrent.ScheduledThreadPoolExecutor$
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>     at java.util.concurrent.ThreadPoolExecutor$
>     at
> On Tue, Nov 13, 2018 at 10:00 AM Pierre Villard <> wrote:
> Hey Mike,
> Anything in the logs?
> Pierre
> Le mar. 13 nov. 2018 à 15:56, Mike Thomsen <> a écrit :
> I have an odd situation where I have ConsumeKafkaRecord and ConsumeKafka pulling from
the same topic under different consumer groups, but only the latter will pull new events.
I ran into a situation where the reader didn't like the Avro data being pulled from the queue
and so I created new topics and configured both processors to use the new ones. However, only
the non-record version of the processor will read.
> Anyone got suggestions on how to debug this? I'm reasonably familiar with Kafka, but
can't figure out why ConsumeKafka and the console consumer can read the topic, but ConsumeKafkaRecord
is acting like there's nothing there at all.
> Thanks,
> Mike

View raw message