nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yuri Nikonovich <>
Subject Re: How to configure site-to-site communication between nodes in one cluster.
Date Thu, 02 Jun 2016 12:38:16 GMT
Our company has several clients and for each client few times a day we run
following ETL process:
1) fetch from database 100 000 - 1000 000 (depending on the client) records
with one sql query (The query is provided to me and it might be very
complex and I can't split it).
2) validate each record in the dataset against my domain model
3) enrich it
4) save the dataset to the cassandra database.

I want to do this as fast as possible. So I've decided to split each
dataset into chunks 1000 records each and parallelize validation/enrichment

And one more question. How to make a processor wait until all chunks from a
particular dataset are validated? This is needed because I want to
implement a validation threshold. For example, if less than 10 percent of
the records in the dataset are invalid, then dataset is considered to be
valid and It may be saved in the database, but if more - I should consider
it invalid and thrown away. This processor I want to run in parallel too,
for me it looks similar to reduce phase in Map Reduce terminology

2016-06-02 15:00 GMT+03:00 Bryan Bende <>:

> It really comes down to what works best for your use case....
> NiFi is not made to compete with distributed computation frameworks like
> Spark, Map-Reduce, etc, its job is to bring data to them. So if you need to
> run a computation across 100s-1000s of nodes, then you would do that in
> Hadoop. NiFi clusters are usually around 10 nodes or less.
> For ETL, NiFi can tackle some use-cases, but again, there are situations
> where something like sqoop is going to be a better choice because its
> specifically engineered for massive extraction from a database.
> All that being said, it sounds like you haven't had a problem getting the
> data out of the database with NiFi, is the transform part of your flow
> taking longer than you expected? can you share more about what you are
> doing to each record, and how many records?
> -Brya
> On Thu, Jun 2, 2016 at 5:19 AM, Yuri Nikonovich <>
> wrote:
>> Hi
>> Thank you, Bryan.
>> I've built my pipeline like you've described with RPG to process splitted
>> parts. The thing that concerns me is the approach to clustering with each
>> node running complete flow separately from other nodes. This approach makes
>> me think that Nifi isn't suited for heavy ETL processes running within its
>> processors. Maybe it is better to use Nifi flow as an orchestration tool
>> and do heavy work (like validation or transformation) with other tools
>> (like Hadoop for example). For example Fetch data from DB -> SplitIntoAvro
>> -> Send it to validation/transformation Hadoop Job -> get results back to
>> Nifi -> do other things. What do you think of this approach?
>> 2016-06-01 21:24 GMT+03:00 Bryan Bende <>:
>>> NiFi is definitely suitable for processing large files, but NiFi's
>>> clustering model works a little different than some of the distributed
>>> processing frameworks people are used to.
>>> In a NiFi cluster, each node runs the same flow/graph, and it is the
>>> data that needs to be partitioned across the nodes. How to partition the
>>> data really depends on the use-case (that is what the article I linked to
>>> was all about).
>>> In your scenario there are a couple of ways to achieve parallelism...
>>> Process everything on the node that the HTTP requests comes in on, and
>>> increase the Concurrent Tasks (# of threads) for the processors after
>>> SplitAvro so that multiple chunks can be transformed and send to Cassandra
>>> in parallel.
>>> I am assuming the HTTP requests are infrequent and are acting as a
>>> trigger for the process, but if they are frequent you could put a load
>>> balancer in front of NiFi to distribute those requests across the nodes.
>>> The other option is to use the RPG redistribution technique to
>>> redistribute the chunks across the cluster, can still adjust the Concurrent
>>> Tasks on the processors to have each node doing more in parallel.
>>> You would put SplitAvro -> RPG that points to itself, then somewhere
>>> else on the flow there is an Input Port -> follow on processors, the RPG
>>> connects to that Input Port.
>>> The receive HTTP request would be set to run on Primary Node only.
>>> It will come down to which is faster... processing the chunks locally on
>>> one node with multiple threads, or transferring the chunks across the
>>> network and processing them on multiple nodes with multiple threads.
>>> On Wed, Jun 1, 2016 at 12:37 PM, Yuri Nikonovich <>
>>> wrote:
>>>> Hello, Bryan
>>>> Thanks for the answer.
>>>> You've understood me correctly. What I'm trying to achieve is to put
>>>> some validation on the dataset. So I fetch all data with one query from
>>>> db(I can't change this behavior), then I use SplitAvro processor to split
>>>> it into chunks say 1000 records each. After that I want to treat each chunk
>>>> independently, transform each record in a chunk according to my domain
>>>> model, validate it and save. This transform-load work I want to distribute
>>>> across the cluster.
>>>> While reading about Nifi I've haven't found any information about flows
>>>> like mine. This fact worries me a little. Maybe I'm trying to do something
>>>> that is not suitable for Nifi.
>>>> Is Nifi a suitable tool for processing large files or I should not do
>>>> actual processing work outside the Nifi flow?
>>>> 2016-06-01 17:28 GMT+03:00 Bryan Bende <>:
>>>>> Hello,
>>>>> This post [1] has a description of how to redistribute data with in
>>>>> the same cluster. You are correct that it involves a RPG pointing back
>>>>> the same cluster.
>>>>> One thing to keep in mind is that typically we do this with a List +
>>>>> Fetch pattern, where the List operation produces lightweight results
>>>>> the list of filenames to fetch, then redistributes those results and
>>>>> fetching happens in parallel.
>>>>> In your case, if i understand it correctly, you will have already
>>>>> fetched the data on the first node, and then have to transfer the actual
>>>>> data to the cluster nodes which could have some overhead.
>>>>> It might require a custom processor to do this, but you might want to
>>>>> consider somehow determining what needs to be fetched after receiving
>>>>> HTTP request, and redistributing that so each node can then fetch from
>>>>> DB in parallel.
>>>>> Let me know if this doesn't make sense.
>>>>> -Bryan
>>>>> [1]
>>>>> On Wed, Jun 1, 2016 at 6:06 AM, Yuri Nikonovich <>
>>>>> wrote:
>>>>>> Hi
>>>>>> I have the following flow:
>>>>>> Receive HTTP request -> Fetch data from db -> split it in chunks
>>>>>> fixed size -> process each chunk and save it to Cassandra.
>>>>>> I've built a flow and it works perfectly on non-clustered setup.
>>>>>> when I configured clustered setup
>>>>>> I found out that all heavy work is done only on one node. So if the
>>>>>> flow has started on node1 it will run to the end on node1. What I
want to
>>>>>> achieve is to spread data chunks fetched from DB across the cluster
>>>>>> order to process them in parallel, but it looks like Nifi doesn't
send flow
>>>>>> files between nodes in a cluster.
>>>>>> As far as I understand, in order to make node send data to another
>>>>>> node I should create a remote process group and send data to this
RPG. All
>>>>>> examples I could find on Internet describe RPGs as cluster-to-cluster
>>>>>> communication or remote node-to-cluster communication. So for my
case, I
>>>>>> assume, have to create RPG pointing to the same cluster. Could you
>>>>>> provide me a guide how to do this.
>>>>>> --
>>>>>> Regards,
>>>>>> Nikanovich Yury
>>>> --
>>>> С уважением,
>>>> Юрий Никонович
>> Regards,
>> Nikanovich Yury

Nikanovich Yury

View raw message