nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthew Burgess <mattyb...@gmail.com>
Subject Re: End of stream?
Date Fri, 06 Nov 2015 16:49:44 GMT
No that makes sense, thanks much!

So for my case, I'm thinking I'd want another attribute from GetFile called
"lastInStream" or something? It would be set once processing of the current
directory is complete (for the time being), and reset each time the
onTrigger is called.  At that point it's really more of a "lastInBatch", so
maybe instead I could use the batch size somehow as a hint to the
ReservoirSampling processor that the current reservoir is ready to send
along?  The use case is a kind of burst processing (or per-batch filtering),
where FlowFiles are available in "groups", where I could sample from the
incoming group with equal probability to give a smaller output group.


From:  Joe Witt <joe.witt@gmail.com>
Reply-To:  <dev@nifi.apache.org>
Date:  Friday, November 6, 2015 at 11:38 AM
To:  <dev@nifi.apache.org>
Subject:  Re: End of stream?

Matt,

For processors in the middle of the flow the null check is important
for race conditions where it is told it can run but by the time it
does there are no flowfiles left.  The framework though in general
will avoid this because it is checking if there is work to do.  So, in
short you can't use that mechanism to know there are no items left to
process.

The only way to know that a given flowfile was the last in a bunch
would be for that fact to be an attribute on a given flow file.

There is really no concept of an end of stream so to speak from a
processor perspective.  Processors are either running on not running.
You can, as i mentioned before though, use attributes of flowfiles to
annotate their relative position in a stream.

Does that help explain it at all or did I make it more confusing?

Thanks
Joe

On Fri, Nov 6, 2015 at 11:32 AM, Matthew Burgess <mattyb149@gmail.com>
wrote:
>  Does NiFi have the concept of an "end of stream" or is it designed to pretty
>  much always be running? For example if I use a GetFile processor pointing at
>  a single directory (with remove files = true), once all the files have been
>  processed, can downstream processors know that?
> 
>  I'm working on a ReservoirSampling processor, and I have it successfully
>  building the reservoir from all incoming FlowFiles. However it never gets to
>  the logic that sends the sampled FlowFiles to the downstream processor (just
>  a PutFile at this point). I have the logic in a block like:
> 
>  FlowFile flowFile = session.get();
>  if(flowFile == null) {
>    // send reservoir
>  }
>  else {
>   // build reservoir
>  }
> 
>  But the if-clause never gets entered.  Is there a different approach and/or
>  am I misunderstanding how the data flow works?
> 
>  Thanks in advance,
>  Matt
> 
> 




Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message