nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koji Kawamura <ijokaruma...@gmail.com>
Subject Re: Wait/Notify inconsistent behavior
Date Wed, 09 Jan 2019 01:09:33 GMT
Hi Louis,

Glad to hear that works.

There are two approaches I can think of, to accumulate and combine
multiple service results.
The goal is creating a single FlowFile containing all service call
results, right?

1. Use DistributedCache(DC) as a shared heap space
- Each part of flow calls external services and store its result into DC
- Use cache keys in something like "OriginalRequestID_serviceType"
format. For example, "OriginalIncomingFlowFileUUID_serviceA"
- Use notify as well to let Wait know progress. Use serviceType as counter name
- After Wait finishes, use LookupAttribute or LookupRecord to enrich
the original FlowFile by fetching the accumulated results from DC
using DistributedMapCacheLookupService

2. Use MergeContent processor
- Route the original FlowFile into multiple sub-flows, meaning croning
the same FlowFile
- At each sub flow, call external service, update result FlowFile's
attribute so that MergeContent can wait and merge them:
-- fragment.identifier: some common id
-- fragment.count: the total number of sub-flows
-- fragment.index: to control merge order
- Route each resulting FlowFile into the same MergeContent processor,
use 'Defragment' Merge Strategy to merge FlowFiles based on
'fragment.*' attributes
-- Use Header, Footer and Demarcator intelligently to combine
contents, e.g. to form a valid JSON... etc

I'm not sure how do you want to loop, but here is another example flow
doing a traditional 'for' loop in NiFi.
https://gist.github.com/ijokarumawak/01c4fd2d9291d3e74ec424a581659ca8

Hope these can be helpful.

Thanks,
Koji

On Wed, Jan 9, 2019 at 1:04 AM Luis Carmona <lcarmona@openpartner.cl> wrote:
>
>
> Hi Koji,
>
> tryed with some manual tests and seems to be working now, doesn't have the problema I
had before. Today I will try it with a massvice flow and that will be the final check.
>
> Thanks for your help. Do you know where I can get a sample about processing in a loop,
I mean, send things to a server, wait the answer on that, send to a second server accumulating
the answers of both, and all this in a finite loop determined by the answers, Gathering all
the answers in one final Json.
>
>
> Thank you very much.
>
> LC
>
>
>
> ----- Mensaje original -----
> De: "Koji Kawamura" <ijokarumawak@gmail.com>
> Para: "users" <users@nifi.apache.org>
> Enviados: Lunes, 7 de Enero 2019 22:22:12
> Asunto: Re: Wait/Notify inconsistent behavior
>
> Hi Luis,
>
> Look forward to know how it goes with a consolidated single Notify instance.
> Another benefit of doing so is by increasing 'Signal Buffer Count',
> the number of updates against the Distributed Cache Service can be
> decreased in your case, because multiple FlowFiles share the same
> signal id. Notify processor can merge counter deltas locally then
> update the cache entry only once.
>
> Thanks,
> Koji
>
> On Tue, Jan 8, 2019 at 3:50 AM Luis Carmona <lcarmona@openpartner.cl> wrote:
> >
> > Hi Koji,
> >
> > thanks for taking the time to answer my question
> >
> > In the Wait processor:
> > - Signal CounterName is empty (default).
> > - Target Signal Count is set to 2
> >
> > About the Notify processor, I used two of them thinking about that previously I
set differently ${filename} in the precedings UpdateAttribute(s).
> >
> > I attached the image of both processors, and the template as well.
> >
> >
> > The whole point of my layout is to send things to a server, wait the answer on that,
send to a second server accumulating the answers of both, and all this in a finite loop determined
by the answers, Gathering all the answers in one final Json.
> >
> > By now Im stuck in the wait/notify issue, thanks for the sample I'll look into it.
Then I will see how to get the loop.
> >
> > Thanks a lot,
> >
> > Regards,
> >
> > LC
> >
> >
> > ----- Mensaje original -----
> > De: "Koji Kawamura" <ijokarumawak@gmail.com>
> > Para: "users" <users@nifi.apache.org>
> > Enviados: Domingo, 6 de Enero 2019 23:42:56
> > Asunto: Re: Wait/Notify inconsistent behavior
> >
> > The reason tu put two Notify processors is that I'm using different
> >
> > Hi Luis,
> >
> > Just a quick question, how are the "Signal Counter Name" and "Target
> > Signal Count" properties for the Wait processor configured?
> > If you'd like to wait the two sub-flow branches to complete, then:
> > "Signal Counter Name" should be blank, meaning check total count for
> > all counter names
> > "Target Signal Count" should be 2.
> >
> > If those are configured like that, then would you be able to share
> > your flow as a template for further investigation?
> >
> > One more thing, although Notify processor cares about atomicity, due
> > to the underlying distributed cache mechanism, concurrent writes to
> > the same cache identifier can overwrite existing signal, meaning one
> > of the two notifications can be lost.
> > To avoid this, using the same Notify instance at 3a and 3b in your
> > flow is highly recommended.
> > Here is an example flow to do that:
> > https://gist.github.com/ijokarumawak/6da4bd914236e941071cad103e1186dd
> >
> > Thanks,
> > Koji
> >
> > On Sat, Jan 5, 2019 at 11:28 AM Joe Witt <joe.witt@gmail.com> wrote:
> > >
> > > thanks for letting us know.  the lists can be a bit awkward from a user experience
pov.  no worries
> > >
> > > On Fri, Jan 4, 2019, 9:26 PM Luis Carmona <lcarmona@openpartner.cl wrote:
> > >>
> > >> I'm sorry,
> > >>
> > >> got messages from nifi-users saying "UNCHECKED", and reading about understood
the message did not get trough.
> > >>
> > >> Thanks for letting me know.
> > >>
> > >> LC
> > >>
> > >> ________________________________
> > >> De: "Joe Witt" <joe.witt@gmail.com>
> > >> Para: "users" <users@nifi.apache.org>
> > >> Enviados: Viernes, 4 de Enero 2019 23:23:02
> > >> Asunto: Re: Wait/Notify inconsistent behavior
> > >>
> > >> Please avoid sending more copies of the question.  Hopefully someone familiar
with the processors in question will be available in time.
> > >>
> > >>
> > >> Thanks
> > >>
> > >>
> > >> On Fri, Jan 4, 2019 at 9:14 PM Luis Carmona <lcarmona@openpartner.cl>
wrote:
> > >>>
> > >>> Hi everyone,
> > >>>
> > >>> Im having a strange behavior with Wait / Notify mechanism. Attached
is
> > >>> the image of the flow.
> > >>> Basically I'm trying to insert in Elastic search two record,
> > >>> simultaneously, and if both went ok, then insert a record on a bpm
service.
> > >>>
> > >>> For that (in the image):
> > >>>
> > >>> - Step 1: Set the attribute fragment.identifier to 1
> > >>> - Step 2: Send the flow to Wait state, and,
> > >>>           for 2a I set the attribute filename to 'caso' (without the
> > >>> quotes) just before the POST to ElasticSearch
> > >>>           for 2b I set the attribute filename to 'doc'  (without the
> > >>> quotes) just before the other POST to ElasticSearch
> > >>> - Step 3: On 3a, once the insert is finished, I'm expecting the notify
> > >>> sends the signal to the wait state, and same thing for 3b.
> > >>> - Step 4: HERE is my problem:
> > >>>           Sometimes the WAIT has count.caso = 1, and count.doc=1 so
> > >>> everything goes well on the RouteAttribute after step 5.
> > >>>           Some other, it just receives one of the Nitifications, either
> > >>> the 'doc' one, or the 'caso' one, so as the other doesn't come,
> > >>>           the firts to arrive gets Queued. I 've checked and the two
> > >>> Elastic insertions work almost inmediatly, so thatÅ› not the problem.
> > >>> - Setp 5: Is the expected path unless there was a real fail, but it
is
> > >>> not what is happening.
> > >>>
> > >>> Please any help, or tip would be very preciated.
> > >>>
> > >>> Regards,
> > >>>
> > >>> LC
> > >>>
> > >>

Mime
View raw message