nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andy LoPresto <alopre...@apache.org>
Subject Re: stop processing related flowfiles
Date Wed, 28 Nov 2018 19:58:33 GMT
Could you have logic in your custom processor which sets a flag per topic when a message from
that topic fails? Using something like DistributedMapCacheServer [1] would let you store a
marker when topic X has a failing message, and then use a gate processor before feeding the
custom processor which blocks messages from topic X from proceeding. 

You could do it by splitting out the flowfiles into a connection per topic using RouteOnAttribute,
but that doesn’t scale nicely to 200 topics. You might be able to do it with a simple LookupService,
but I would probably just write an ExecuteScript processor that has this logic:

def flowfileTopic = flowfile.getAttribute(“topic)
def topicIsBlocked = getCacheServer().getKey(flowfileTopic)
if (topicIsBlocked) {
    session.penalize(flowfile)
    routeToBlockedRelationship(flowfile)
} else {
    routeToSuccessRelationship(flowfile)
}

From ConsumeKafka, go to an EnforceOrder processor. From the EO, go to the ExecuteScript processor.
From ES, success goes to CustomProcessor, and failure goes *back to EnforceOrder*. That way,
“new” topic X messages will come in, go through the EO (stay in order), ES (fail), and
be queued behind the “old” ordered topic X messages in the queue to EO. When you resolve
the issue, you clear the key in the cache, and the processing of topic X begins again. 

You could combine this with a Wait/Notify pair if you wanted to automate that part. Also look
at the penalty duration setting in the processor [2], which allows you to delay re-processing
if you penalize the flowfile. 

[1] https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-distributed-cache-services-nar/1.8.0/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html
[2] https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#penalization-vs-yielding

Andy LoPresto
alopresto@apache.org
alopresto.apache@gmail.com
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Nov 28, 2018, at 11:19 AM, James Srinivasan <james.srinivasan@gmail.com> wrote:
> 
> Hopefully you already know this:
> 
> "Kafka only provides a total order over records within a partition, not between different
partitions in a topic. Per-partition ordering combined with the ability to partition data
by key is sufficient for most applications. However, if you require a total order over records
this can be achieved with a topic that has only one partition, though this will mean only
one consumer process per consumer group."
> 
> (From https://kafka.apache.org/documentation/ <https://kafka.apache.org/documentation/>)
> 
> On Wed, 28 Nov 2018, 13:55 Boris Tyukin <boris@boristyukin.com <mailto:boris@boristyukin.com>
wrote:
> Hi guys,
> 
> I am trying to come up with a good design for the following challenge:
> 
> 1. ConsumeKafka processor consumes messages from 200 topics.
> 
> 2. The next processor is a custom groovy processor that does some data transformation
and also puts transformed data into target system. It is crucial to process messages in topics
in order. 
> 
> 3. This is the tricky part - if any error is raised during step 2, I want to stop processing
any new flowfiles for that topic while continue processing messages for other topics.
> 
> In other words, I want to enforce order and process only one message per topic at the
time and stop processing messages for a given topic, if currently processed message failed.
It is kind like FIFO queue, that stops pushing items out of the queue if current item errors
out.
> 
> Is it possible to do it? I am open to use an external queue or cache like Redis. 
> 
> I've played a bit with EnforceOrder and Notify/Wait processors but still cannot wrap
my head about it.
> 
> Appreciate your help,
> Boris


Mime
View raw message