nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <marka...@hotmail.com>
Subject Re: Reading all flowfiles queued for a processor (>20000 flowfiles)
Date Thu, 23 Aug 2018 18:31:21 GMT
Sam,

Yes, that's right. The session provides a transaction. So each "bin" has its own session.
This way,
once a Bin is ready to be combined, you can do so in a single transaction/session and then
the session
is complete. No need to try to manage which bins contain which sessions or vice versa.

Thanks
-Mark



> On Aug 23, 2018, at 4:31 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <tldzpysami4whpl1@gmail.com>
wrote:
> 
> Neat! So the BinFiles processor doesn't quite have the control over the
> binning/pairing that I wanted, but it got me on the right track. I think
> I've got a pretty lightweight custom processor that does what I'm looking
> for now.
> 
> Made a processor which extends AbstractSessionFactoryProcessor that has an
> AtomicReference<ProcessSession> that gets initialized on its first
> onTrigger, and a HashMap. Now, each OnTrigger creates a session as normal,
> and migrates flowfiles to the ProcessSession the processor holds onto. It
> then does the matching, migrates flowfiles back to the 'current'
> ProcessSession, which then combines them, and transfers them on their merry
> way. There's the potential for blowing the heap now, but I think I can put
> some controls in place to manage that.
> 
> Just sanity checking - is this a sane way to do this - moving things
> between the 'current session', and a 'held session'? It looks like that's
> pretty much how the MergeContent works; by having a session for each Bin.
> So instead of having N bins/sessions, I've just got +1 Session that holds
> onto the FlowFiles that I've keyed/seen?
> 
> Thanks Mark!
> 
> Cheers,
> Sam
> 
> On 2018/08/22 13:54:59, Mark Payne <m...@hotmail.com> wrote:
>> Hi Sam,>
>> 
>> There are a couple of ways to tackle this problem. My recommendation
> would be to look at extending the BinFiles processor.>
>> This is an abstract class, which MergeContent extends (and I think 1 or 2
> other processors?). Its job is to bin 'like flowfiles' together,>
>> and it can take care of pulling data from queues and efficiently binning
> the FlowFiles together. It is important, though, to keep in mind>
>> that FlowFiles contain attribute maps, and those can quickly exhaust your
> heap when you're trying to hold 10's or 100's of thousands>
>> of FlowFiles in a single processor.>
>> 
>> Thanks>
>> -Mark>
>> 
>> 
>> On Aug 22, 2018, at 8:07 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <
> tl...@gmail.com>> wrote:>
>> 
>> Hi NiFi devs,>
>> 
>> My understanding is that when I create a custom processor, and get
> FlowFiles with session.get(CustomFlowFileFilter), irrespective of how many
> times the CustomFlowFileFilter returns
> FlowFileFilterResult.ACCEPT_AND_CONTINUE or
> FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at
> most 20000 flowfiles, where 20000 is defined by the
> nifi.queue.swap.threshold setting in nifi.properties.>
>> (Disregarding that it's actually 19999, and that setting is not respected
> when running tests, which made this SUPER confusing to debug...)>
>> 
>> Attached a screenshot of that happening (also at:
> https://i.imgur.com/25QJxuj.png)>
>> 
>> My question is, Is there a way to force a custom processor to be able to
> read ALL queued flowfiles in all incoming connections?>
>> 
>> My particular use case is pairing flowfiles, and while there probably are
> other ways to pair files using Wait/Notify processors, I'm handling files
> in large throughput, with possible delays between the pairs arriving, and
> it's quite easy to hit the limit. I could also increase the swap threshold
> setting, but I keep hitting the problem. I've also played with custom
> prioritizers on connections in an attempt to maximise the chance of having
> pairs occur, but because I need to move unmatched flowfiles out, and back
> in, is essentially creating a busy loop. Seems like there should be a
> better way.>
>> 
>> Any ideas?>
>> 
>> Ideally, a way to force a custom processor to be able to read all queued
> flowfiles (swapping more than the threshold into memory, during a single
> OnTrigger call) would be the easiest solution. Is there one?>
>> 
>> Cheers,>
>> Sam>
>> 
>> 
>> 
>> 
>> 


Mime
View raw message