nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aldrin Piri <aldrinp...@gmail.com>
Subject Re: ConsumeMQTT processor causing resource issues when deployed in MiNiFi
Date Wed, 03 Oct 2018 16:42:49 GMT
Hi Patricia,

Thank you for the additional information.  I have spent some time
evaluating this issue and have a few ideas about how this might be
occurring.

Could you please specify how you configured your MiNiFi instance to provide
the MQTT extensions?  These are not bundled with 0.5.0 in the released
convenience binary.

A listing of your lib directory from the instance and the app log would be
supremely helpful if available.

Thanks,
Aldrin

On Mon, Oct 1, 2018 at 10:51 AM Patricia Quill <pquill01@yahoo.co.uk> wrote:

> Thanks for the response.
> Am using version 1.7.0 NiFi & version 0.5.0 MiNiFi. Have NiFi running as a
> Docker image.
> The workflow is basically the sample that is on the "Getting Started with
> NiFi" web site but have substituted the "TailFile" processor on the remote
> MiNiFi with "ConsumeMQTT".
>
> The change I made to JVM settings was to increase the initial & max heap
> size in file bootstrap.conf on the MiNiFi server. The VM is configured with
> 16GB RAM.
>
>
> # JVM memory settings
> #java.arg.2=-Xms256m
> #java.arg.3=-Xmx256m
> java.arg.2=-Xms2048m
> java.arg.3=-Xmx2048m
>
> Was unable to obtain the thread dump as the OS refuses to do anything
> after the OutOfMemory error is received.
> Think I have narrowed down the problem. Have setup a mosquitto broker on
> the MiNiFi server. If I configure ConsumeMQTT with a URI of
> tcp://localhost:1883 there are no issues. The problem seems to be happening
> if I replace localhost with the IP address of the MiNiFi server. Repeat
> messages like the below are generated until eventually the OutOfMemory
> error occurs.
>
> 2018-10-01 10:25:07,421 ERROR [Timer-Driven Process Thread-3]
> o.a.nifi.processors.mqtt.ConsumeMQTT
> ConsumeMQTT[id=3f3ecc40-41ca-319d-0000-000000000000] Connection to
> tcp://xxxxxxxxxx:1883 lost (or was never connected) and ontrigger connect
> failed. Yielding processor: Connection lost (32109) - java.io.EOFException
> org.eclipse.paho.client.mqttv3.MqttException: Connection lost
> at
> org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:181)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException: null
> at java.io.DataInputStream.readByte(DataInputStream.java:267)
> at
> org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
> at
> org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:133)
> ... 7 common frames omitted
>
> The firewall on the VM is disabled and the ConsumeMQTTProcessor works on
> the NiFi server when configured with an IP address.
> Wondering if I am missing a dependency? I copied the following to the lib
> directory on the MiNiFi server
> -nifi-mqtt-nar-1.7.0.nar
> -nifi-standard-services-api-nar-1.7.0.nar
>
> The other possibility is there is some security setting in the lab that I
> am using that is causing this but then am curious why it works in NiFi.
>
>
>
>
>
> On Friday, 28 September 2018, 14:38, Joe Witt <joe.witt@gmail.com> wrote:
>
>
> Certainly could be that memory space on the machine is constrained
> enough that creation of a new thread isn't possible.  Could be a
> resource leak.  To Aldrin's point though I'd say the thread dump would
> be really helpful.  In fact, I'd recommend grabbing a few at different
> time intervals such as shortly after launch, after a 'while' when
> things are working well, and 'when it hits this point'.
>
> How much memory is on the system and how much is configured for the JVM?
>
> On Fri, Sep 28, 2018 at 9:30 AM Aldrin Piri <aldrinpiri@gmail.com> wrote:
> >
> > Hello Patricia,
> >
> > Could you provide more details about JVM properties that you tried
> configuring?  Please also let us know the relevant versions of MiNiFi and
> NiFi you are using.  If possible, providing the flow you are attempting to
> run would be helpful.  Otherwise, if you could get into specifics about how
> the consume processor is configured would give a few more datapoints.
> >
> > Additionally, I would like to request that you grab us a thread dump
> from MiNiFi when this issue is exhibited.  This can be accomplished by
> running bin/minifi.sh dump <dump file name>.
> >
> > At its core, and beyond the initial configuration startup, MiNiFi is
> running primarily the same core libraries as NiFi and would expect similar
> JVM configurations to have similar success in operating.  The inability to
> create new threads is a bit of a curious one that I have not seen
> especially when the same flow is not at issue in NiFi.
> >
> > Thanks!
> > --aldrin
> >
> > On Fri, Sep 28, 2018 at 5:04 AM Patricia Quill <pquill01@yahoo.co.uk>
> wrote:
> >>
> >> Have deployed a ConsumeMQTT processor to a remote server using MiNiFi
> (Java agent)  and configured it to subscribe to a mosquitto broker. Problem
> I am seeing is that it is causing the OS to run out of resources.
> >>
> >> 2018-09-27 09:06:37,060 WARN [Timer-Driven Process Thread-9]
> o.a.n.controller.tasks.ConnectableTask Administratively Yielding
> ConsumeMQTT[id=b3271a8d-cc15-3f10-0000-000000000000] due to uncaught
> Exception: java.lang.OutOfMemoryError: unable to create new native thread
> >> java.lang.OutOfMemoryError: unable to create new native thread
> >> at java.lang.Thread.start0(Native Method)
> >> at java.lang.Thread.start(Thread.java:717)
> >> at
> java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
> >> at
> java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1603)
> >> at
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334)
> >> at
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
> >> at
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
> >> at
> org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.start(ClientComms.java:675)
> >> at
> org.eclipse.paho.client.mqttv3.internal.ClientComms.connect(ClientComms.java:280)
> >> at
> org.eclipse.paho.client.mqttv3.internal.ConnectActionListener.connect(ConnectActionListener.java:185)
> >> at
> org.eclipse.paho.client.mqttv3.MqttAsyncClient.connect(MqttAsyncClient.java:774)
> >> at
> org.eclipse.paho.client.mqttv3.MqttClient.connect(MqttClient.java:333)
> >> at
> org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.setAndConnectClient(AbstractMQTTProcessor.java:370)
> >> at
> org.apache.nifi.processors.mqtt.ConsumeMQTT.reconnect(ConsumeMQTT.java:347)
> >> at
> org.apache.nifi.processors.mqtt.ConsumeMQTT.onTrigger(ConsumeMQTT.java:256)
> >> at
> org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onTrigger(AbstractMQTTProcessor.java:355)
> >> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
> >> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
> >> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> >> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >>
> >> I've tried it on a number of different OS types  - Ubuntu 14_04, Ubuntu
> 16_04 and CentOS 7. I have tried tweaking the JVM properties but to no
> avail. Am using NiFi 1.7.0.
> >> The MiNiFi setup seems to be OK as I tried a test with the TailFile
> processor and had no issues.  The ConsumeMQTT processor works without any
> issues if it's deployed on the NiFi server.
> >>
> >> Has anyone else come across this or have any suggestions?
>
>
>

Mime
View raw message