nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Witt <joe.w...@gmail.com>
Subject Re: End of stream?
Date Sat, 07 Nov 2015 02:54:04 GMT
Also meant to reply back on this earlier...

It would be a reasonable JIRA to add logic into GetFile to add an
attribute to GetFile to signal that a given flow file was sourced by
the 'last file left' in a given directory or source.  However, it is
somewhat odd in that when is something considered the last?  Also of
note here is that data could be prioritized post GetFile and then
you'd really not know if you're dealing with the last one or the first
one or anything in between.  We'd really need GetFile to put a
timestamp and sequence id on or something.  Hmmm.

Given what you're trying to do could instead this logic of sample
groups around some time interval simply be part of that processor?

Thanks
Joe

On Fri, Nov 6, 2015 at 9:48 PM, Michael Moser <moser.mw@gmail.com> wrote:
> Matt,
>
> There is the MonitorActivity processor, which "Monitors the flow for
> activity and sends out an indicator when the flow has not had any data for
> some specified amount of time and again when the flow's activity is
> restored".  You could look at how MonitorActivity is coded to get ideas for
> how your ReservoirSampling processor can do what you need.
>
> -- Mike
>
>
> On Fri, Nov 6, 2015 at 11:49 AM, Matthew Burgess <mattyb149@gmail.com>
> wrote:
>
>> No that makes sense, thanks much!
>>
>> So for my case, I'm thinking I'd want another attribute from GetFile called
>> "lastInStream" or something? It would be set once processing of the current
>> directory is complete (for the time being), and reset each time the
>> onTrigger is called.  At that point it's really more of a "lastInBatch", so
>> maybe instead I could use the batch size somehow as a hint to the
>> ReservoirSampling processor that the current reservoir is ready to send
>> along?  The use case is a kind of burst processing (or per-batch
>> filtering),
>> where FlowFiles are available in "groups", where I could sample from the
>> incoming group with equal probability to give a smaller output group.
>>
>>
>> From:  Joe Witt <joe.witt@gmail.com>
>> Reply-To:  <dev@nifi.apache.org>
>> Date:  Friday, November 6, 2015 at 11:38 AM
>> To:  <dev@nifi.apache.org>
>> Subject:  Re: End of stream?
>>
>> Matt,
>>
>> For processors in the middle of the flow the null check is important
>> for race conditions where it is told it can run but by the time it
>> does there are no flowfiles left.  The framework though in general
>> will avoid this because it is checking if there is work to do.  So, in
>> short you can't use that mechanism to know there are no items left to
>> process.
>>
>> The only way to know that a given flowfile was the last in a bunch
>> would be for that fact to be an attribute on a given flow file.
>>
>> There is really no concept of an end of stream so to speak from a
>> processor perspective.  Processors are either running on not running.
>> You can, as i mentioned before though, use attributes of flowfiles to
>> annotate their relative position in a stream.
>>
>> Does that help explain it at all or did I make it more confusing?
>>
>> Thanks
>> Joe
>>
>> On Fri, Nov 6, 2015 at 11:32 AM, Matthew Burgess <mattyb149@gmail.com>
>> wrote:
>> >  Does NiFi have the concept of an "end of stream" or is it designed to
>> pretty
>> >  much always be running? For example if I use a GetFile processor
>> pointing at
>> >  a single directory (with remove files = true), once all the files have
>> been
>> >  processed, can downstream processors know that?
>> >
>> >  I'm working on a ReservoirSampling processor, and I have it successfully
>> >  building the reservoir from all incoming FlowFiles. However it never
>> gets to
>> >  the logic that sends the sampled FlowFiles to the downstream processor
>> (just
>> >  a PutFile at this point). I have the logic in a block like:
>> >
>> >  FlowFile flowFile = session.get();
>> >  if(flowFile == null) {
>> >    // send reservoir
>> >  }
>> >  else {
>> >   // build reservoir
>> >  }
>> >
>> >  But the if-clause never gets entered.  Is there a different approach
>> and/or
>> >  am I misunderstanding how the data flow works?
>> >
>> >  Thanks in advance,
>> >  Matt
>> >
>> >
>>
>>
>>
>>

Mime
View raw message