nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <tldzpysami4wh...@gmail.com>
Subject Reading all flowfiles queued for a processor (>20000 flowfiles)
Date Wed, 22 Aug 2018 12:07:06 GMT
Hi NiFi devs,

My understanding is that when I create a custom processor, and get
FlowFiles with session.get(CustomFlowFileFilter), irrespective of how many
times the CustomFlowFileFilter returns
FlowFileFilterResult.ACCEPT_AND_CONTINUE or
FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at
most 20000 flowfiles, where 20000 is defined by the
nifi.queue.swap.threshold setting in nifi.properties.
(Disregarding that it's actually 19999, and that setting is not respected
when running tests, which made this SUPER confusing to debug...)

Attached a screenshot of that happening (also at:
https://i.imgur.com/25QJxuj.png)

My question is, Is there a way to force a custom processor to be able to
read ALL queued flowfiles in all incoming connections?

My particular use case is pairing flowfiles, and while there probably are
other ways to pair files using Wait/Notify processors, I'm handling files
in large throughput, with possible delays between the pairs arriving, and
it's quite easy to hit the limit. I could also increase the swap threshold
setting, but I keep hitting the problem. I've also played with custom
prioritizers on connections in an attempt to maximise the chance of having
pairs occur, but because I need to move unmatched flowfiles out, and back
in, is essentially creating a busy loop. Seems like there should be a
better way.

Any ideas?

Ideally, a way to force a custom processor to be able to read all queued
flowfiles (swapping more than the threshold into memory, during a single
OnTrigger call) would be the easiest solution. Is there one?

Cheers,
Sam

Mime
  • Unnamed multipart/mixed (inline, None, 0 bytes)
View raw message