nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeremy Dyer <jdy...@gmail.com>
Subject Re: InferAvroSchema OoM w/ large json
Date Fri, 15 Sep 2017 02:00:25 GMT
Neil - The number of records analyzed property in conjunction with JSON content is a little
hairy. It's pretty clear with something like CSV where each line is considered a record. With
JSON that distinction becomes less clear on what is actually a "record". AKA is the record
the object or each element in the array. Ultimately NiFi invokes the Kite SDK to handle this
task. Kite attempts to parse the JSON and iterate through X number of assumed Avro schemas/records.
I would assume that kite is probably attempting to read in this entire array of objects and
causing your OOM error. 

I would feed in a smaller set of data to the InferAvroScema processor and then use the resulting
output schema from that run on subsequent processors removing the InferAvroScema from the
flow all together. I am making the assumption that this data is always the same. If that is
not the case this will not work and we can look at other options. 

Don't worry we will get it going at some point 👌 

- Jeremy Dyer



Sent from my iPhone
> On Sep 14, 2017, at 5:13 PM, Neil Derraugh <neil.derraugh@intellifylearning.com>
wrote:
> 
> I have a 328MB json file sitting in a queue that feeds into an InferAvroSchema (1.3.0)
processor that is currently stopped.  The json file consists of a single array containing
just over 1.5M small objects.  The heap is set to 4GB.  Before starting the processor the
heap usage on the node that has the file is about 20%. If I start the InferAvroSchema eventually
I run out of heap and start seeing the following in the logs.  
> 
> 2017-09-14 13:01:25,597 WARN [Clustering Tasks Thread-3] o.apache.nifi.controller.FlowController
Failed to send heartbeat due to: org.apache.nifi.cluster.protocol.ProtocolException: Failed
marshalling 'HEARTBEAT' protocol message due to: javax.net.ssl.SSLException: Received fatal
alert: unexpected_message
> 2017-09-14 13:01:25,655 INFO [Curator-ConnectionStateManager-0] o.a.n.c.l.e.CuratorLeaderElectionManager
org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager$ElectionListener@27ae11db
Connection State changed to SUSPENDED
> 2017-09-14 13:01:25,690 ERROR [Curator-Framework-0] o.a.c.f.imps.CuratorFrameworkImpl
Background operation retry gave up
> org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
> 	at org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] o.a.c.f.imps.CuratorFrameworkImpl
Background retry gave up
> org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:838)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] o.a.c.f.imps.CuratorFrameworkImpl
Background operation retry gave up
> org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
> 	at org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] o.a.c.f.imps.CuratorFrameworkImpl
Background retry gave up
> org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:838)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> 2017-09-14 13:01:25,696 ERROR [Curator-Framework-0] o.a.c.f.imps.CuratorFrameworkImpl
Background operation retry gave up
> org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
> 	at org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
> 	at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> 
> The node drops out of the cluster and doesn't seem to rejoin unless I restart nifi on
that node.
> 
> The InferAvroSchema processor is only looking at the first 10 records.  It's an array
of objects so the first ten elements in the array could be pulled out and processed without
processing the whole file as long as we accept that it won't otherwise be validated at this
step.  Is that a possibility?  Is there a better way to do what I'm trying to do?

Mime
View raw message