nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bryan Bende <>
Subject Re: MergeContent/SplitText - Performance against large CSV files
Date Tue, 24 Nov 2015 13:27:37 GMT

As far as I know, no one is working the RouteCSV processor yet. There is a
JIRA for it though:


On Mon, Nov 23, 2015 at 4:58 PM, Mark Petronic <>

> Mark Payne stated, "We should have a RouteCSV processor as well."
> I am wondering now as I move further along in my ETL design and
> implementation, is anyone actually working on a RouteCSV processor? I want
> to avoid duplicating work if so, otherwise, I need to implement such a
> behavior. What I really need to do is route on a grouping key made up of
> some combination of column headers (or sub parts of a column header, like
> the yyyymmdd part of some yyyymmddhhmmss timestamp in some column) with an
> variable that is NOT part of the header fields that was received as an
> attribute on the flow file from an upstream processor. Clearly, being able
> to access the fields by column index vs. a regex is cleaner - at for the
> CSV file use case.
> So, just curious on plans for RouteCSV.
> Thanks!
> On Fri, Nov 13, 2015 at 8:29 PM, Mark Payne <> wrote:
>> Mark,
>> That's great! Unfortunately, though, no mind reading :) This is an
>> extremely powerful pattern, though, that
>> fits perfectly within NiFi's wheelhouse. In a more generic sense, what
>> we're really doing can be thought of
>> as registering a "streaming query." We can register a query for any text
>> that matches a regular expression,
>> or any text that begins with a certain string, etc. Then, data just
>> streams through the Processor and goes
>> wherever it is configured to go.
>> Similarly, we can use EvaluateJsonPath in conjunction with
>> RouteOnAttribute to do the same thing with JSON
>> data. Or EvaluateXPath & RouteOnAttribute to create a streaming query
>> against XML. I believe something is
>> in the works for Avro, as well.
>> This ability to take in a flood of data, configure a few "queries" and
>> have that flood of data separated out into
>> well-organized streams is probably my favorite aspect of NiFi. That being
>> said, we are always learning and
>> adapting and still reasonably new to the Open Source world. So feel free
>> to critique and provide any feedback
>> you have. Since this processor hasn't been officially released yet, this
>> is the easiest time to affect large changes :)
>> On Nov 13, 2015, at 6:46 PM, Mark Petronic <>
>> wrote:
>> Got to say, Mark... Loving the RouteText processor!!! It definitely
>> solved multiple tasks including, as a side effect, stripping the CSV header
>> (because it does not match the regex) which I was doing before with a sed
>> command. :) Thank you (and other contribs) for reading my mind and building
>> this processor. Is that a feature of Nifi? Processors that show up before
>> you ask for them? LOL. I was re-reading the developer guide and happened to
>> see...
>> "For example, imagine that we want to have a RouteCSV Processor such that
>> it is configured with multiple Regular Expressions."
>> LOL. Guess someone had the baby in mind for awhile, too. :)
>> On Fri, Nov 13, 2015 at 12:57 PM, Mark Petronic <>
>> wrote:
>>> Excellent! I will build and play ASAP. :)
>>> On Fri, Nov 13, 2015 at 12:25 PM, Mark Payne <>
>>> wrote:
>>>> Mark,
>>>> Ok thanks for the more detailed explanation. I think that makes
>>>> RouteText a much more appealing solution. It is available on master now.
>>>> Thanks
>>>> -Mark
>>>> Sent from my iPhone
>>>> On Nov 13, 2015, at 12:12 PM, Mark Petronic <>
>>>> wrote:
>>>> Thank you, Mark, for the quick reply. My comments on your comments...
>>>> "That's a great question! 200 million per day equates to about 2K - 3K
>>>> per second."
>>>> Unfortunately, the rate will be much more extreme. Those records all
>>>> show up over the course of about 4 hours. So, every time a new zip file
>>>> appears on my NFS share, I grab it and process it. About 160 files will
>>>> appear over those 4 hours. For example, the average sized zip file would
>>>> likely contain about 180,000 or 1,600,000 records to process in that one
>>>> scheduled Nifi run. And, it is very likely that, for a given run, say
>>>> scheduled to run every 30 minutes, there could be multiple files to
>>>> process. I did not mention it before but there are really two types of very
>>>> large CSV files I have to process here, one is 18M records and the other
>>>> 200M records per day. So, traffic is very bursty.
>>>> Does that change anything regarding the intended use case for the new
>>>> RouteText
>>>> "We should have a RouteCSV processor as well."
>>>> That would be very nice and definitely more performant without the need
>>>> for regex matching. However, I definitely would benefit even from
>>>> RouteText, in the interim. For my use case, the regex will be pretty simple
>>>> as the timestamp is close to the front of the record, but I see where you
>>>> are going on the potential complexity with groups and widely spread out
>>>> fields of interest.
>>>> Is RouteText available on any branch where I could build and play
>>>> around with it before 0.4.0?
>>>> Thanks,
>>>> Mark
>>>> On Fri, Nov 13, 2015 at 11:14 AM, Mark Payne <>
>>>> wrote:
>>>>> Mark,
>>>>> That's a great question! 200 million per day equates to about 2K - 3K
>>>>> per second. So that is quite reasonable.
>>>>> You are very correct, though, that splitting that CSV into tons of
>>>>> one-line FlowFiles does indeed have a cost.
>>>>> Specifically, the big cost is the Provenance data that is generated at
>>>>> that rate. But again 2K - 3K per second
>>>>> going through a handful of Processors is a very reasonable workload.
>>>>> I will caution you, though, that there is a ticket [1] where people
>>>>> will sometimes run into Out Of Memory Errors
>>>>> if they try to split a huge CSV into individual FlowFiles because it
>>>>> holes all of those FlowFile objects (not the
>>>>> data itself but the attributes) in memory until the session is
>>>>> committed. The workaround for this (until that ticket
>>>>> is completed) is to use a SplitText to split into 10,000 lines or so
>>>>> per FlowFile and then another SplitText to
>>>>> split each of those smaller ones into 1-line FlowFiles.
>>>>> Also of note, in 0.4.0, which is expected to be released in around a
>>>>> week or so, there is a new RouteText
>>>>> Processor. This, I think, will make your life far easier. Rather than
>>>>> using SplitText, Extract Text, and MergeContent
>>>>> in order to group the text, RouteText will allow you to supply a
>>>>> Grouping Regex. So that regex can just pull out the
>>>>> device id, year, month, and day, from each line and group together
>>>>> lines of text that have the same values into
>>>>> a single FlowFile. For instance, if your CSV looked like:
>>>>> # device_id, device_manufacturer, value, year, month, day, hour
>>>>> 1234, Famous Manufacturer, 83, 2015, 11, 13, 12
>>>>> You could define a grouping regex as:
>>>>> (\d+), .*?, .*?, (\d+), (\d+), (\d+), .*
>>>>> It looks complex but it's just breaking apart the CSV into individual
>>>>> fields and grouping on device_id, year, month, day.
>>>>> This will also create a RouteText.Group attribute with the value
>>>>> "1234, 2015, 11, 13"
>>>>> This processor provides two benefits: it combines all of the grouping
>>>>> into a single Processor, and it cuts down on the
>>>>> millions of FlowFiles that are generated and then merged back together.
>>>>> As I write this, though, I am realizing that the regex above is quite
>>>>> a pain. We should have a RouteCSV processor as well.
>>>>> Though it won't provide any features that RouteText can't provide, it
>>>>> will make configuration far easier. I created a ticket
>>>>> for this here [2]. I'm not sure that it will make it into the 0.4.0
>>>>> release, though.
>>>>> I hope this helps!
>>>>> Thanks
>>>>> -Mark
>>>>> [1]
>>>>> [2]
>>>>> On Nov 13, 2015, at 10:44 AM, Mark Petronic <>
>>>>> wrote:
>>>>> I have a concept question. Say I have 10 GB of CSV files/day
>>>>> containing records where 99% of them are from the last couple days but
>>>>> there are stragglers that are late reported that can date back many months.
>>>>> Devices that are powered off at times don't report but eventually do
>>>>> powered on and report old, captured data - store and forward kind of
>>>>> I want to capture all the data for historic reasons. There are about
>>>>> million records per day to process. I want to put this data in Hive tables
>>>>> that are partitioned by year, month, and day and use ORC columnar storage.
>>>>> These tables are external Hive tables and point to the directories where
>>>>> want to drop these files on HDFS, manually add new partitions, as needed,
>>>>> and immediately be able to query using HQL.
>>>>> Nifi Concept:
>>>>> 0. Use GetText to get a CSV file
>>>>> 1. Use UpdateAttribute to parse the incoming CSV file name to obtain
>>>>> device_id and set that as an attribute on the flow file
>>>>> 2. Use SplitText to split each row into a flow file.
>>>>> 3. Use ExtractText to identify the field that is the record timestamp
>>>>> and create a year, month, and day attribute on the flow file from that
>>>>> timestamp.
>>>>> 4. Use MergeContent with a grouping key made up of
>>>>> (device_id,year,month,day)
>>>>> 5. Convert each file to ORC (many Parquet). This stage will likely
>>>>> require me building a custom processor because the conversion is not
>>>>> to be a simple A-to-B. I want to do some validation on fields, type
>>>>> conversion, and other custom stuff against some source-of-truth schema
>>>>> stored in a web service with REST API.
>>>>> 6. Use PutHDFS to store these ORC files in directories like
>>>>> .../device_id/year=2015/month=11/day=9 by using the attributes already
>>>>> present from the upstream processors to build up the path, where device_id
>>>>> is the Hive table name and the year, month, day are the partition key
>>>>> name=value per Hive format. The file names will just be some unique ID,
>>>>> they don't really matter
>>>>> 7. Use ExecuteProcessStream to execute a Hive script that will "alter
>>>>> table add partitions...." for any partitions that were newly created
>>>>> this schedule run
>>>>> Is this insane or is it what Nifi was designed to do? I could
>>>>> definitely see using a Spark job to do the group by
>>>>> (device_id,year,month,day) stage. 200M flow files from the SplitText
is the
>>>>> one that has me wondering if I am nuts thinking of doing that? There
>>>>> be overhead on flow files and deprecating them to one line each seems
to me
>>>>> as a worst case scenario. But it all depends on the core design and whether
>>>>> Nifi is optimized to handle such a use case.
>>>>> Thanks,
>>>>> Mark

View raw message