nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <marka...@hotmail.com>
Subject Re: ConsumeKafkaRecord won't pull new events from Kafka
Date Tue, 13 Nov 2018 18:15:18 GMT
Mike,

Is there new data coming into the Kafka topic? By default, when the Processor is started,
it uses
an auto commit offset of 'latest'. So that means that if you started the Processor with this
setting,
the commit offset is saved pointing to the end of the topic. So if no more data is coming
into the topic,
you'll not see anything out of the processor, while the ConsumeKafka processor was started
with the
offset at 'earliest'?

Thanks
-Mark

On Nov 13, 2018, at 12:54 PM, Mike Thomsen <mikerthomsen@gmail.com<mailto:mikerthomsen@gmail.com>>
wrote:

That would appear to be the case. So here's what I was doing:

1. Used this sort of code to serialize the Avro:

    private byte[] serialize(Object obj, Class clz) throws Exception {
        SpecificDatumWriter writer = new SpecificDatumWriter<>(clz);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        //writer.write(obj, encoder);
        //encoder.flush();
        out.close();

        return out.toByteArray();
    }

So that got me a byte array with just the binary representation.

2. Produced that to the Kafka topic.

3. Had an AvroReader variously configured to use schema.name<http://schema.name/> hard
set to an entry on the AvroSchemaRegistry or EmbeddedSchema.

Didn't see any flowfiles get emitted to parse.failure or success.

On Tue, Nov 13, 2018 at 12:50 PM Joe Witt <joe.witt@gmail.com<mailto:joe.witt@gmail.com>>
wrote:
Mike - so does this mean the parse.failure relationship wasn't working
though?  We should try to dig into this more if you're up for it or
sharing more details.  We dont want the behavior you ran into for
sure...
On Tue, Nov 13, 2018 at 12:49 PM Mike Thomsen <mikerthomsen@gmail.com<mailto:mikerthomsen@gmail.com>>
wrote:
>
> So after a lot of diving into 1.9.X, it **appears** that there was some sort of combination
w/ the AvroReader + ConsumeKafkaRecord that was suppressing errors from being reported. Haven't
been able to fully figure out what was doing on, but I know one of the contributing factors
was that my producer (spring boot service, not NiFi) was incorrectly building the binary form
of the Avro (didn't have embedded schema).
>
> Mike
>
> On Tue, Nov 13, 2018 at 12:33 PM Joe Witt <joe.witt@gmail.com<mailto:joe.witt@gmail.com>>
wrote:
>>
>> Viking
>>
>> Just to clarify it isn't that it is trying to merge records to create
>> larger flow files but rather it is 'playing the kafka interaction as
>> it lies' so to speak.  When polling from Kafka we can get one or more
>> records.  We're just taking advantage of that without trading off
>> speed.
>>
>> Thanks
>> On Tue, Nov 13, 2018 at 12:24 PM Viking K <cyber_voyk@hotmail.com<mailto:cyber_voyk@hotmail.com>>
wrote:
>> >
>> > Mike, are we talking about complete avro messages or bare records (schema is
contained inside the avro file  or do they use a schema registry)
>> >
>> > From my own testing the ConsumeKafkaRecord tries to bundle in the incoming messages
to create larger flow files.
>> > Do you use any Kafka headers in the processor?
>> >
>> > Also what happens if you try to replicate the behavior of ConsumeKafkaRecord
like this. I don't know if you need the ConvertRecord but it might be needed to pick out a
schema name to use as Merge strategy.
>> > ConsumeKafka -> (ConvertRecord) -> Merge Content
>> >
>> > /Viking
>> > ________________________________
>> > From: Mike Thomsen <mikerthomsen@gmail.com<mailto:mikerthomsen@gmail.com>>
>> > Sent: Tuesday, November 13, 2018 3:02 PM
>> > To: users@nifi.apache.org<mailto:users@nifi.apache.org>
>> > Subject: Re: ConsumeKafkaRecord won't pull new events from Kafka
>> >
>> > Closest thing I see to something that implies something might be awry is this
in nifi-app.log:
>> >
>> > javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-1
>> >     at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>> >     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>> >     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>> >     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>> >     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>> >     at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>> >     at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
>> >     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:640)
>> >     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)
>> >     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494)
>> >     at org.apache.nifi.processors.kafka.pubsub.ConsumerPool.createKafkaConsumer(ConsumerPool.java:143)
>> >     at org.apache.nifi.processors.kafka.pubsub.ConsumerPool.obtainConsumer(ConsumerPool.java:107)
>> >     at org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.onTrigger(ConsumeKafka.java:359)
>> >     at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>> >     at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
>> >     at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
>> >     at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>> >     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> >     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> >     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> >     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >     at java.lang.Thread.run(Thread.java:748)
>> >
>> >
>> >
>> > On Tue, Nov 13, 2018 at 10:00 AM Pierre Villard <pierre.villard.fr@gmail.com<mailto:pierre.villard.fr@gmail.com>>
wrote:
>> >
>> > Hey Mike,
>> >
>> > Anything in the logs?
>> >
>> > Pierre
>> >
>> > Le mar. 13 nov. 2018 à 15:56, Mike Thomsen <mikerthomsen@gmail.com<mailto:mikerthomsen@gmail.com>>
a écrit :
>> >
>> > I have an odd situation where I have ConsumeKafkaRecord and ConsumeKafka pulling
from the same topic under different consumer groups, but only the latter will pull new events.
I ran into a situation where the reader didn't like the Avro data being pulled from the queue
and so I created new topics and configured both processors to use the new ones. However, only
the non-record version of the processor will read.
>> >
>> > Anyone got suggestions on how to debug this? I'm reasonably familiar with Kafka,
but can't figure out why ConsumeKafka and the console consumer can read the topic, but ConsumeKafkaRecord
is acting like there's nothing there at all.
>> >
>> > Thanks,
>> >
>> > Mike

Mime
View raw message