nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Salvatore Papa <salvatore.p...@gmail.com>
Subject Re: Coding a Processor that writes to multiple output flowfiles at once
Date Mon, 23 Nov 2015 21:12:42 GMT
Perfect, thanks Mark.

On Tue, Nov 24, 2015 at 2:55 AM, Mark Payne <markap14@hotmail.com> wrote:

> Salvatore,
>
> The caveat about the append method is that if you append to an incoming
> FlowFile, it has
> to copy the contents of the incoming FlowFile before it can append to it.
> However, if you append
> to a new FlowFile (or a FlowFile that you've already written to in the
> same session), then the append
> is extremely efficient and does not need to copy anything. It actually
> holds open an OutputStream under
> the hood so that you can keep writing to the same OutputStream.
>
> In general, I would advise going the route that you specified here,
> though, where in one processor
> you are splitting the data out like you need and in a second processor you
> do the conversion from
> one format to another. These are very independent concepts, and so
> breaking them into separate
> processors increases the cohesion and lets you easily reuse the Processors
> if you need to do something
> slightly different later.
>
> Thanks
> -mark
>
>
> > On Nov 23, 2015, at 10:49 AM, Salvatore Papa <salvatore.papa@gmail.com>
> wrote:
> >
> > Hi Bryan, Mark,
> >
> > I knew I was forgetting something! So I had actually noticed the
> > session.append, but I think there are two caveats which make it not what
> I
> > want:
> >
> > 1) Does session.append truly 'append'? Or will it re-write/copy the
> content
> > that's already in the new flowfile (I think I saw that as a note
> somewhere
> > - please correct me if i'm wrong. But if i'm right, performance wise,
> > that's actually worse than reading/writing the input file O(N) times,
> > that'd make it O(N^2))
> > 2) The output actually isn't raw/text - I have other writers using that
> > stream. For example, reading a text CSV, and writing each column to to
> > Avro. Something that may not be 'appendable', hence requiring the output
> > flowfile OutputStreams to stay open for the entire duration of the
> > processing.
> >
> > Thanks for the suggestion though - I hadn't seen the RouteText processor
> > before. It's actually very close to what i'm looking for! I'll play
> around
> > with it and see if it suits - at the very least... Splitting via text
> first
> > (skipping the requirement for the nested writer), and then writing each
> of
> > those out to (e.g.) Avro in a second processor, may be the best bet.
> >
> > Thanks Bryan and Mark!
> >
> > On Tue, Nov 24, 2015 at 1:18 AM, Mark Payne <markap14@hotmail.com>
> wrote:
> >
> >> Hey Salvatore,
> >>
> >> I think the key piece that you are missing is the
> ProcessSession.append()
> >> method. You can
> >> use this efficiently append to FlowFile A, then to FlowFile B, then to
> >> FlowFile A, then to FlowFile C,
> >> or what-have-you. A good example that comes to mind is the RouteText
> >> Processor. This is available
> >> on the 'master' branch.
> >>
> >> So the overall logic would look something like:
> >>
> >> List<FlowFile> flowFiles = new ArrayList<>();
> >> for (int i=0; i < numColumns; i++) {
> >>    FlowFile colFlowFile = session.create(originalFlowFile);
> >>    flowFiles.add(colFlowFile);
> >> }
> >>
> >> // read line of text
> >> session.read(originalFlowFile, new InputStreamCallback() {
> >>      void process(final InputStream rawIn) {
> >>          try (final BufferedReader in = new BufferedReader(new
> >> InputStreamReader(rawIn))) {
> >>                String line = in.readLine();
> >>
> >>                String[] columns = line.split(",");
> >>                for (int i=0; i < columns.length; i++) {
> >>                        FlowFile colFlowFile = flowFiles.get(i);
> >>                        colFlowFile = session.append(colFlowFile, new
> >> OutputStreamCallback() {
> >>                                void process(final OutputStream out) {
> >>
> >> out.write(columns[i].getBytes(StandardCharsets.UTF_8);
> >>                                }
> >>                        });
> >>
> >>                        flowFiles.set(i, colFlowFile);
> >>                }
> >>        }
> >> }
> >>
> >> But as mentioned, the RouteText processor is a great full processor to
> use
> >> as an example.
> >>
> >> Let us know if you run into any more problems!
> >>
> >> Thanks
> >> -Mark
> >>
> >>
> >>
> >>> On Nov 23, 2015, at 2:40 AM, Salvatore Papa <salvatore.papa@gmail.com>
> >> wrote:
> >>>
> >>> Heya NiFi devs,
> >>>
> >>> I'm having a bit of trouble trying to wrap my head around a valid way
> of
> >>> tackling this problem with the available Processor templates. I'd like
> to
> >>> split an input flowfile into N different flowfiles, 1 going into 1 of N
> >>> relationships.
> >>>
> >>> A simplistic way of viewing it would be: A very large CSV file, with N
> >>> columns, and I want to split each column into its own flowfile, and
> each
> >> of
> >>> these flowfiles to its own relationship (or with an attribute saying
> >> which
> >>> column it belongs to).
> >>>
> >>> Basic premise is for an example with two columns, and only two lines:
> >>> * Read a line, write first column value to flowfile A, write second
> >> column
> >>> value to flowfile B
> >>> * Read next line, appending first column value to flowfile A, appending
> >>> second column value to flowfile B
> >>> Followed by one of:
> >>> * Send flowfile A to relationship A, and send flowfile B to
> relationship
> >> B
> >>> or
> >>> * Set attribute "A" to flowfile A, attribute "B" to flowfile B, then
> send
> >>> both A and B to a 'success' relationship.
> >>>
> >>> Unfortunately, I can't seem to find a way to write to multiple
> flowfiles
> >> at
> >>> once, or at least, write to an outputstream for one flowfile, then
> write
> >> to
> >>> another outputstream for another flowfile, then continue writing to the
> >>> first flowfile.
> >>>
> >>> If they weren't such large files, i'd be okay with reading the input
> >> file N
> >>> times, pulling out the different part each time, but i'd like to only
> >> have
> >>> to read each line (by extension, the file) only once.
> >>>
> >>> I've written AbstractProcessors before for simple One-to-One
> >>> transformations, and even Merge processors which use are an extension
> of
> >>> AbstractSessionFactoryProcessors to do Many-to-One, and even Split
> >>> AbstractProcessors for One-to-Many in serial (splitting at different
> >>> places, even clone(flowfile, start, size); But I can't work out a way
> to
> >> do
> >>> this One-to-Many in parallel.
> >>>
> >>> Any ideas? Am I missing something useful? Do I just have to do it
> reading
> >>> it multiple times? Just a really simple proof of concept explaining the
> >>> design would be enough to get me started.
> >>>
> >>> Kind regards,
> >>> Salvatore
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message