nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject Re: Reading all flowfiles queued for a processor (>20000 flowfiles)
Date Thu, 23 Aug 2018 08:31:14 GMT
Neat! So the BinFiles processor doesn't quite have the control over the
binning/pairing that I wanted, but it got me on the right track. I think
I've got a pretty lightweight custom processor that does what I'm looking
for now.

Made a processor which extends AbstractSessionFactoryProcessor that has an
AtomicReference<ProcessSession> that gets initialized on its first
onTrigger, and a HashMap. Now, each OnTrigger creates a session as normal,
and migrates flowfiles to the ProcessSession the processor holds onto. It
then does the matching, migrates flowfiles back to the 'current'
ProcessSession, which then combines them, and transfers them on their merry
way. There's the potential for blowing the heap now, but I think I can put
some controls in place to manage that.

Just sanity checking - is this a sane way to do this - moving things
between the 'current session', and a 'held session'? It looks like that's
pretty much how the MergeContent works; by having a session for each Bin.
So instead of having N bins/sessions, I've just got +1 Session that holds
onto the FlowFiles that I've keyed/seen?

Thanks Mark!


On 2018/08/22 13:54:59, Mark Payne <> wrote:
> Hi Sam,>
> There are a couple of ways to tackle this problem. My recommendation
would be to look at extending the BinFiles processor.>
> This is an abstract class, which MergeContent extends (and I think 1 or 2
other processors?). Its job is to bin 'like flowfiles' together,>
> and it can take care of pulling data from queues and efficiently binning
the FlowFiles together. It is important, though, to keep in mind>
> that FlowFiles contain attribute maps, and those can quickly exhaust your
heap when you're trying to hold 10's or 100's of thousands>
> of FlowFiles in a single processor.>
> Thanks>
> -Mark>
> On Aug 22, 2018, at 8:07 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <>> wrote:>
> Hi NiFi devs,>
> My understanding is that when I create a custom processor, and get
FlowFiles with session.get(CustomFlowFileFilter), irrespective of how many
times the CustomFlowFileFilter returns
FlowFileFilterResult.ACCEPT_AND_CONTINUE or
FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at
most 20000 flowfiles, where 20000 is defined by the
nifi.queue.swap.threshold setting in>
> (Disregarding that it's actually 19999, and that setting is not respected
when running tests, which made this SUPER confusing to debug...)>
> Attached a screenshot of that happening (also at:>
> My question is, Is there a way to force a custom processor to be able to
read ALL queued flowfiles in all incoming connections?>
> My particular use case is pairing flowfiles, and while there probably are
other ways to pair files using Wait/Notify processors, I'm handling files
in large throughput, with possible delays between the pairs arriving, and
it's quite easy to hit the limit. I could also increase the swap threshold
setting, but I keep hitting the problem. I've also played with custom
prioritizers on connections in an attempt to maximise the chance of having
pairs occur, but because I need to move unmatched flowfiles out, and back
in, is essentially creating a busy loop. Seems like there should be a
better way.>
> Any ideas?>
> Ideally, a way to force a custom processor to be able to read all queued
flowfiles (swapping more than the threshold into memory, during a single
OnTrigger call) would be the easiest solution. Is there one?>
> Cheers,>
> Sam>

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message