nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Petronic <markpetro...@gmail.com>
Subject Re: MergeContent/SplitText - Performance against large CSV files
Date Mon, 23 Nov 2015 21:58:09 GMT
Mark Payne stated, "We should have a RouteCSV processor as well."

I am wondering now as I move further along in my ETL design and
implementation, is anyone actually working on a RouteCSV processor? I want
to avoid duplicating work if so, otherwise, I need to implement such a
behavior. What I really need to do is route on a grouping key made up of
some combination of column headers (or sub parts of a column header, like
the yyyymmdd part of some yyyymmddhhmmss timestamp in some column) with an
variable that is NOT part of the header fields that was received as an
attribute on the flow file from an upstream processor. Clearly, being able
to access the fields by column index vs. a regex is cleaner - at for the
CSV file use case.

So, just curious on plans for RouteCSV.

Thanks!

On Fri, Nov 13, 2015 at 8:29 PM, Mark Payne <markap14@hotmail.com> wrote:

> Mark,
>
> That's great! Unfortunately, though, no mind reading :) This is an
> extremely powerful pattern, though, that
> fits perfectly within NiFi's wheelhouse. In a more generic sense, what
> we're really doing can be thought of
> as registering a "streaming query." We can register a query for any text
> that matches a regular expression,
> or any text that begins with a certain string, etc. Then, data just
> streams through the Processor and goes
> wherever it is configured to go.
>
> Similarly, we can use EvaluateJsonPath in conjunction with
> RouteOnAttribute to do the same thing with JSON
> data. Or EvaluateXPath & RouteOnAttribute to create a streaming query
> against XML. I believe something is
> in the works for Avro, as well.
>
> This ability to take in a flood of data, configure a few "queries" and
> have that flood of data separated out into
> well-organized streams is probably my favorite aspect of NiFi. That being
> said, we are always learning and
> adapting and still reasonably new to the Open Source world. So feel free
> to critique and provide any feedback
> you have. Since this processor hasn't been officially released yet, this
> is the easiest time to affect large changes :)
>
> On Nov 13, 2015, at 6:46 PM, Mark Petronic <markpetronic@gmail.com> wrote:
>
> Got to say, Mark... Loving the RouteText processor!!! It definitely solved
> multiple tasks including, as a side effect, stripping the CSV header
> (because it does not match the regex) which I was doing before with a sed
> command. :) Thank you (and other contribs) for reading my mind and building
> this processor. Is that a feature of Nifi? Processors that show up before
> you ask for them? LOL. I was re-reading the developer guide and happened to
> see...
>
> "For example, imagine that we want to have a RouteCSV Processor such that
> it is configured with multiple Regular Expressions."
>
> LOL. Guess someone had the baby in mind for awhile, too. :)
>
> On Fri, Nov 13, 2015 at 12:57 PM, Mark Petronic <markpetronic@gmail.com>
> wrote:
>
>> Excellent! I will build and play ASAP. :)
>>
>> On Fri, Nov 13, 2015 at 12:25 PM, Mark Payne <markap14@hotmail.com>
>> wrote:
>>
>>> Mark,
>>>
>>> Ok thanks for the more detailed explanation. I think that makes
>>> RouteText a much more appealing solution. It is available on master now.
>>>
>>> Thanks
>>> -Mark
>>>
>>> Sent from my iPhone
>>>
>>> On Nov 13, 2015, at 12:12 PM, Mark Petronic <markpetronic@gmail.com>
>>> wrote:
>>>
>>> Thank you, Mark, for the quick reply. My comments on your comments...
>>>
>>> "That's a great question! 200 million per day equates to about 2K - 3K
>>> per second."
>>>
>>> Unfortunately, the rate will be much more extreme. Those records all
>>> show up over the course of about 4 hours. So, every time a new zip file
>>> appears on my NFS share, I grab it and process it. About 160 files will
>>> appear over those 4 hours. For example, the average sized zip file would
>>> likely contain about 180,000 or 1,600,000 records to process in that one
>>> scheduled Nifi run. And, it is very likely that, for a given run, say
>>> scheduled to run every 30 minutes, there could be multiple files to
>>> process. I did not mention it before but there are really two types of very
>>> large CSV files I have to process here, one is 18M records and the other
>>> 200M records per day. So, traffic is very bursty.
>>>
>>> Does that change anything regarding the intended use case for the new
>>> RouteText
>>>
>>> "We should have a RouteCSV processor as well."
>>>
>>> That would be very nice and definitely more performant without the need
>>> for regex matching. However, I definitely would benefit even from
>>> RouteText, in the interim. For my use case, the regex will be pretty simple
>>> as the timestamp is close to the front of the record, but I see where you
>>> are going on the potential complexity with groups and widely spread out
>>> fields of interest.
>>>
>>> Is RouteText available on any branch where I could build and play around
>>> with it before 0.4.0?
>>>
>>> Thanks,
>>> Mark
>>>
>>> On Fri, Nov 13, 2015 at 11:14 AM, Mark Payne <markap14@hotmail.com>
>>> wrote:
>>>
>>>> Mark,
>>>>
>>>> That's a great question! 200 million per day equates to about 2K - 3K
>>>> per second. So that is quite reasonable.
>>>> You are very correct, though, that splitting that CSV into tons of
>>>> one-line FlowFiles does indeed have a cost.
>>>> Specifically, the big cost is the Provenance data that is generated at
>>>> that rate. But again 2K - 3K per second
>>>> going through a handful of Processors is a very reasonable workload.
>>>>
>>>> I will caution you, though, that there is a ticket [1] where people
>>>> will sometimes run into Out Of Memory Errors
>>>> if they try to split a huge CSV into individual FlowFiles because it
>>>> holes all of those FlowFile objects (not the
>>>> data itself but the attributes) in memory until the session is
>>>> committed. The workaround for this (until that ticket
>>>> is completed) is to use a SplitText to split into 10,000 lines or so
>>>> per FlowFile and then another SplitText to
>>>> split each of those smaller ones into 1-line FlowFiles.
>>>>
>>>> Also of note, in 0.4.0, which is expected to be released in around a
>>>> week or so, there is a new RouteText
>>>> Processor. This, I think, will make your life far easier. Rather than
>>>> using SplitText, Extract Text, and MergeContent
>>>> in order to group the text, RouteText will allow you to supply a
>>>> Grouping Regex. So that regex can just pull out the
>>>> device id, year, month, and day, from each line and group together
>>>> lines of text that have the same values into
>>>> a single FlowFile. For instance, if your CSV looked like:
>>>>
>>>> # device_id, device_manufacturer, value, year, month, day, hour
>>>> 1234, Famous Manufacturer, 83, 2015, 11, 13, 12
>>>>
>>>> You could define a grouping regex as:
>>>> (\d+), .*?, .*?, (\d+), (\d+), (\d+), .*
>>>>
>>>> It looks complex but it's just breaking apart the CSV into individual
>>>> fields and grouping on device_id, year, month, day.
>>>> This will also create a RouteText.Group attribute with the value "1234,
>>>> 2015, 11, 13"
>>>>
>>>> This processor provides two benefits: it combines all of the grouping
>>>> into a single Processor, and it cuts down on the
>>>> millions of FlowFiles that are generated and then merged back together.
>>>>
>>>> As I write this, though, I am realizing that the regex above is quite a
>>>> pain. We should have a RouteCSV processor as well.
>>>> Though it won't provide any features that RouteText can't provide, it
>>>> will make configuration far easier. I created a ticket
>>>> for this here [2]. I'm not sure that it will make it into the 0.4.0
>>>> release, though.
>>>>
>>>> I hope this helps!
>>>>
>>>> Thanks
>>>> -Mark
>>>>
>>>> [1] https://issues.apache.org/jira/browse/NIFI-1008
>>>> [2] https://issues.apache.org/jira/browse/NIFI-1161
>>>>
>>>>
>>>> On Nov 13, 2015, at 10:44 AM, Mark Petronic <markpetronic@gmail.com>
>>>> wrote:
>>>>
>>>> I have a concept question. Say I have 10 GB of CSV files/day containing
>>>> records where 99% of them are from the last couple days but there are
>>>> stragglers that are late reported that can date back many months. Devices
>>>> that are powered off at times don't report but eventually do when powered
>>>> on and report old, captured data - store and forward kind of thing. I want
>>>> to capture all the data for historic reasons. There are about 200 million
>>>> records per day to process. I want to put this data in Hive tables that are
>>>> partitioned by year, month, and day and use ORC columnar storage. These
>>>> tables are external Hive tables and point to the directories where I want
>>>> to drop these files on HDFS, manually add new partitions, as needed, and
>>>> immediately be able to query using HQL.
>>>>
>>>> Nifi Concept:
>>>>
>>>> 0. Use GetText to get a CSV file
>>>> 1. Use UpdateAttribute to parse the incoming CSV file name to obtain a
>>>> device_id and set that as an attribute on the flow file
>>>> 2. Use SplitText to split each row into a flow file.
>>>> 3. Use ExtractText to identify the field that is the record timestamp
>>>> and create a year, month, and day attribute on the flow file from that
>>>> timestamp.
>>>> 4. Use MergeContent with a grouping key made up of
>>>> (device_id,year,month,day)
>>>> 5. Convert each file to ORC (many Parquet). This stage will likely
>>>> require me building a custom processor because the conversion is not going
>>>> to be a simple A-to-B. I want to do some validation on fields, type
>>>> conversion, and other custom stuff against some source-of-truth schema
>>>> stored in a web service with REST API.
>>>> 6. Use PutHDFS to store these ORC files in directories like
>>>> .../device_id/year=2015/month=11/day=9 by using the attributes already
>>>> present from the upstream processors to build up the path, where device_id
>>>> is the Hive table name and the year, month, day are the partition key
>>>> name=value per Hive format. The file names will just be some unique ID,
>>>> they don't really matter
>>>> 7. Use ExecuteProcessStream to execute a Hive script that will "alter
>>>> table add partitions...." for any partitions that were newly created on
>>>> this schedule run
>>>>
>>>> Is this insane or is it what Nifi was designed to do? I could
>>>> definitely see using a Spark job to do the group by
>>>> (device_id,year,month,day) stage. 200M flow files from the SplitText is the
>>>> one that has me wondering if I am nuts thinking of doing that? There must
>>>> be overhead on flow files and deprecating them to one line each seems to
me
>>>> as a worst case scenario. But it all depends on the core design and whether
>>>> Nifi is optimized to handle such a use case.
>>>>
>>>> Thanks,
>>>> Mark
>>>>
>>>>
>>>>
>>>
>>
>
>

Mime
View raw message