nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Burgess <>
Subject Re: End of stream?
Date Sat, 07 Nov 2015 03:01:13 GMT
Sounds very promising, thank you!! I'll share what I find out :)

Are there other group-related use cases? Maybe some non-incremental statistical measures?


Sent from my iPhone

> On Nov 6, 2015, at 9:48 PM, Michael Moser <> wrote:
> Matt,
> There is the MonitorActivity processor, which "Monitors the flow for
> activity and sends out an indicator when the flow has not had any data for
> some specified amount of time and again when the flow's activity is
> restored".  You could look at how MonitorActivity is coded to get ideas for
> how your ReservoirSampling processor can do what you need.
> -- Mike
> On Fri, Nov 6, 2015 at 11:49 AM, Matthew Burgess <>
> wrote:
>> No that makes sense, thanks much!
>> So for my case, I'm thinking I'd want another attribute from GetFile called
>> "lastInStream" or something? It would be set once processing of the current
>> directory is complete (for the time being), and reset each time the
>> onTrigger is called.  At that point it's really more of a "lastInBatch", so
>> maybe instead I could use the batch size somehow as a hint to the
>> ReservoirSampling processor that the current reservoir is ready to send
>> along?  The use case is a kind of burst processing (or per-batch
>> filtering),
>> where FlowFiles are available in "groups", where I could sample from the
>> incoming group with equal probability to give a smaller output group.
>> From:  Joe Witt <>
>> Reply-To:  <>
>> Date:  Friday, November 6, 2015 at 11:38 AM
>> To:  <>
>> Subject:  Re: End of stream?
>> Matt,
>> For processors in the middle of the flow the null check is important
>> for race conditions where it is told it can run but by the time it
>> does there are no flowfiles left.  The framework though in general
>> will avoid this because it is checking if there is work to do.  So, in
>> short you can't use that mechanism to know there are no items left to
>> process.
>> The only way to know that a given flowfile was the last in a bunch
>> would be for that fact to be an attribute on a given flow file.
>> There is really no concept of an end of stream so to speak from a
>> processor perspective.  Processors are either running on not running.
>> You can, as i mentioned before though, use attributes of flowfiles to
>> annotate their relative position in a stream.
>> Does that help explain it at all or did I make it more confusing?
>> Thanks
>> Joe
>> On Fri, Nov 6, 2015 at 11:32 AM, Matthew Burgess <>
>> wrote:
>>> Does NiFi have the concept of an "end of stream" or is it designed to
>> pretty
>>> much always be running? For example if I use a GetFile processor
>> pointing at
>>> a single directory (with remove files = true), once all the files have
>> been
>>> processed, can downstream processors know that?
>>> I'm working on a ReservoirSampling processor, and I have it successfully
>>> building the reservoir from all incoming FlowFiles. However it never
>> gets to
>>> the logic that sends the sampled FlowFiles to the downstream processor
>> (just
>>> a PutFile at this point). I have the logic in a block like:
>>> FlowFile flowFile = session.get();
>>> if(flowFile == null) {
>>>   // send reservoir
>>> }
>>> else {
>>>  // build reservoir
>>> }
>>> But the if-clause never gets entered.  Is there a different approach
>> and/or
>>> am I misunderstanding how the data flow works?
>>> Thanks in advance,
>>> Matt

View raw message