nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Uwe Geercken" <uwe.geerc...@web.de>
Subject Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10
Date Thu, 22 Jun 2017 19:30:59 GMT
Hello everyone,
 
I wanted to try the following
- get messages from a kafka topic. these are simple messages in CSV format
- use the PartitionRecord processor to get familiar with the RecordPath concept
 
I started zookeeper and kafka on localhost and added some messages to a topic using the kafka
concole producer. A message looks like this:
 
ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
 
I can retrieve this message using the kafka concole consumer.
 
To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor. I configured
the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry 1.3.0
with following schema:
 
{
"type": "record",
"name": "flight_schema",
"fields": [
   { "name": "flight_station", "type": "string" },
   { "name": "flight_orientation", "type": "string" },
   { "name": "flight_carrier", "type": "string" },
   { "name": "flight_number", "type": "string" },
   { "name": "flight_number_suffix", "type": "string" },
   { "name": "flight_scheduled_date", "type": "string" },
   { "name": "flight_scheduled_time", "type": "string" },
   { "name": "flight_actual_date", "type": "string" },
   { "name": "flight_actual_time", "type": "string" },
   { "name": "flight_passengers", "type": "int" }
  ]
}
 
And then I have a CSVRecordWriter which uses the same schema. I also added a line to logback.xml
to debug the ConsumeKafkaRecord_0_10 processor.
 
Now when I run the processors and add a message to the topic in kafka I get following error:
 

2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10
ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing
data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2
due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException:
org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
    at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: null
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
    ... 18 common frames omitted
 
 
 
I have played around with the settings of the processors and controllers quite a lot, but
always get this NullPointerException.
 
I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the messages
and it does work.
 
I hope that someone can point out what the problem is and help me.
 
Greetings,
 
Uwe

Mime
View raw message