nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Uwe Geercken" <uwe.geerc...@web.de>
Subject Aw: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10
Date Thu, 22 Jun 2017 20:11:45 GMT
Matt,
 
maybe the schema.
 
It's not an empty Kafka message, because the ConsumeKafka_0_10 processor on the same flow
works correctly and without problems. Maybe that is another hint that it could be the schema
itself.
 
I already spent hald a day on it, but I will try to do some further tests.
 
Rgds,
 
Uwe
 

Gesendet: Donnerstag, 22. Juni 2017 um 22:01 Uhr
Von: "Matt Burgess" <mattyb149@apache.org>
An: users@nifi.apache.org
Betreff: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10
Uwe,

Actually I spoke too soon; although it might end up being a schema
issue, the NPE looks like it is caused when the record itself is null,
which can happen in the code (there is no null check before trying to
get the schema from the record). However this might still be a schema
issue if the Reader can be instantiated but then it retrieves no
records. Or it might be a bug in the processor's handling of the
Kafka input which might be empty? That's all the guessing I'll do on
the Kafka stuff, I'll leave it to the folks that know much more about
it :)

Regards,
Matt

On Thu, Jun 22, 2017 at 3:54 PM, Matt Burgess <mattyb149@apache.org> wrote:
> Uwe,
>
> It looks like this error is directly related to your other question.
> This line from the stack trace:
>
> Caused by: java.lang.NullPointerException: null
> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>
> Is when it calls record.getSchema(). Not sure if the "real" error
> message is getting lost or perhaps further down in your stack trace,
> but it looks like an error with finding or reading the schema.
>
> Regards,
> Matt
>
>
> On Thu, Jun 22, 2017 at 3:30 PM, Uwe Geercken <uwe.geercken@web.de> wrote:
>> Hello everyone,
>>
>> I wanted to try the following
>> - get messages from a kafka topic. these are simple messages in CSV format
>> - use the PartitionRecord processor to get familiar with the RecordPath concept
>>
>> I started zookeeper and kafka on localhost and added some messages to a topic using
the kafka concole producer. A message looks like this:
>>
>> ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
>>
>> I can retrieve this message using the kafka concole consumer.
>>
>> To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor.
I configured the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry
1.3.0 with following schema:
>>
>> {
>> "type": "record",
>> "name": "flight_schema",
>> "fields": [
>> { "name": "flight_station", "type": "string" },
>> { "name": "flight_orientation", "type": "string" },
>> { "name": "flight_carrier", "type": "string" },
>> { "name": "flight_number", "type": "string" },
>> { "name": "flight_number_suffix", "type": "string" },
>> { "name": "flight_scheduled_date", "type": "string" },
>> { "name": "flight_scheduled_time", "type": "string" },
>> { "name": "flight_actual_date", "type": "string" },
>> { "name": "flight_actual_time", "type": "string" },
>> { "name": "flight_passengers", "type": "int" }
>> ]
>> }
>>
>> And then I have a CSVRecordWriter which uses the same schema. I also added a line
to logback.xml to debug the ConsumeKafkaRecord_0_10 processor.
>>
>> Now when I run the processors and add a message to the topic in kafka I get following
error:
>>
>>
>> 2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10
ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing
data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2
due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException:
org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>> org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
>> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
>> at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
>> at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>> at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>> at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.NullPointerException: null
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>> ... 18 common frames omitted
>>
>>
>>
>> I have played around with the settings of the processors and controllers quite a
lot, but always get this NullPointerException.
>>
>> I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the
messages and it does work.
>>
>> I hope that someone can point out what the problem is and help me.
>>
>> Greetings,
>>
>> Uwe

Mime
View raw message