nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kessler, Jon" <>
Subject Re: EXT: Re: Re: [Discuss] Data prioritization - A proposed solution
Date Fri, 01 Nov 2019 15:40:07 GMT
I finally got around to creating this ticket:
May I please have contributor status so that I can assign it to myself and submit a PR?
From: Kessler, Jon <>
Sent: Thursday, October 17, 2019 11:54 AM
To: <>
Subject: EXT: Re: Re: [Discuss] Data prioritization - A proposed solution

Joe, hopefully I addressed all of your questions:

Your interpretation of the scheduling aspect is correct. These queues will pretend to be empty
a certain % of the time if higher priority data recently moved elsewhere. That % is configurable
on a per rule basis which allows the operator to determine how much to stagger the data associated
with each rule. That % is also how the rules are ranked in terms of order of priority. The
higher the %, the more often a rule will make use of its threads so the higher its priority
is considered to be.

Administration: You are correct that the ruleset is provided at the flow controller level
but will be leveraged by all connections regardless of whether or not they use the BucketPrioritizer
(more details on this below). This overall solution only works if all FlowFileQueues are of
this new implementation which is why I tied it to,1,khT7KpnXsP8UViPixkk5vq9293DkX9lU09S2GeUBaKYVdDr1TnVONYMcykhfPmHfDp0J-xlVYakccQsMx1MQIf0Cut40R_i_TAz1cL6N5nvanw,,&typo=1

The sorting function here takes place on insertion into any connection on which a BucketPrioritizer
is set. Once a FlowFile has been sorted into a bucket we maintain that state so that each
time it moves into a new connection we already know in which bucket it should be placed without
needing to have a BucketPrioritizer set on that connection. Each bucket in a connection is
just a FIFO queue so no additional sorting is done. You should only have to configure connections
to use the BucketPrioritizer at points in your flow where you believe you'll have enough information
to accurately determine priority but not beyond that point unless you want to re-evaluate
downstream for some reason. There is administration involved in setting these BucketPrioritizers
on some connections but it should be minimal per flow (sometimes as few as one).

Some additional information: When you delete a rule the next time each FlowFile moves that
was already associated with that rule it will be re-evaluated against the ruleset when it
enters the next connection regardless of whether or not a BucketPrioritizer was set on that
connection. Also FlowFiles that have yet to be evaluated (have yet to encounter a BucketPrioritizer)
will not be staggered. This was a design decision that if we don't know what a priority is
for a given FlowFile we should get it to that point in the flow as soon as possible. This
decision was a result of emperical evidence that when we did stagger unevaluated data an incoming
flow of high priority data slowed its own upstream processing down once it was identified
and processed as high priority.

Multi-tenancy: Agreed that a global priority list could be too restrictive for multi-tenancy
and should be addressed.

Per swapping, this is an area where I admittedly need to put more thought into my implementation
because there is plenty of room for improvement. Right now I'm just swapping files to disk
in order of least to greatest priority but they are all stored together. Therefore they're
read back into memory in order of least to greatest priority. More work should be done here.

  - Jon
From: Joe Witt <>
Sent: Thursday, October 17, 2019 8:12:52 AM
Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution


Probably some details I don't quite understand yet so responses here are to
get there...

The concept for scheduling is interesting.  Does this basically work around
the fact that we have an unfair scheduler so this has queue implementations
which pretend data is not available when it knows that there is higher
priority data available elsewhere thus returning more threads to the pool
faster to increase the likelihood that queues with higher priority data
will get served more often?

The notion of prioritization implies there is a sorting function happening
somewhere.  NiFi now does sorting on insertion to every queue.  At what
points are you suggesting sorting can be done/reduced to?

Administration: The existing model does require each prioritizer to be set
for each queue.  Yours does as well - to opt into this you'd have to select
the BucketPrioritizer right?  It seems like you're saying the priority
ruleset would be provided at the flow controller level and be enforced by
all connections which leverage this prioritizer.  For large multi-tenant
nifi flows having a global ruleset might be too limiting but maybe we just
dont worry about that yet.

How does this idea work with the fact that queues as the reach a given
threshold have their data swapped out to disk and as data gets worked off
the flowfiles get swapped back into memory?


On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <> wrote:

> I want to start a discussion about a new prioritization mechanism that
> addresses some of the issues that I believe exist in the current solution.
> These issues are:
>  - Scheduling: No consideration is given to data priority when determining
> which component is given the next available thread with which to work
>  - Constant sorting: Because all flowfiles in a given connection share the
> same PriorityQueue they must be sorted every time they move. While this
> sort is efficient it can add up as queues grow deep.
>  - Administration: There is a costly human element to managing the value
> used as a priority ranking as priorities change. You must also ensure every
> connection in the appropriate flow has the proper prioritizer assigned to
> it to make use of the property.
> We have developed a prototype of a new FlowFileQueue implementation that
> addresses these issues. Use of this implementation is controlled via
so you can opt-in or out system-wide without doing a lot of
> configuration. Its design goals are:
>   - Instead of using the value of a FlowFile attribute as a ranking,
> maintain a set of expression language rules to define your priorities. The
> highest ranked rule that a given FlowFile satisfies will be that FlowFile's
> priority
>   - Because we have a finite set of priority rules we can utilize a bucket
> sort in our connections. One bucket per priority rule. The bucket/rule with
> which a FlowFile is associated with will be maintained so that as it moves
> through the system we do not have to re-evaluate that Flowfile against our
> ruleset unless we have reason to do so.
>   - Control where in your flow FlowFiles are evaluated against the ruleset
> with a new Prioritizer implementation: BucketPrioritizer.
>   - When this queue implementation is polled it will be able to check
> state to see if any data of a higher priority than what it currently
> contains recently (within 5s) moved elsewhere in the system. If higher
> priority data has recently moved elsewhere, the connection will only
> provide a FlowFile X% of the time where X is defined along with the rule.
> This allows higher priority data to have more frequent access to threads
> without thread-starving lower priority data.
>   - Rules will be managed via a menu option for the flow and changes to
> them take effect instantly. This allows you to change your priorities
> without stopping/editing/restarting various components on the graph.
> I intend to contribute this solution but first want to solicit input and
> opinions.
>   - Jon Kessler

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message