nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <marka...@hotmail.com>
Subject Re: MergeContent/SplitText - Performance against large CSV files
Date Fri, 13 Nov 2015 16:14:48 GMT
Mark,

That's a great question! 200 million per day equates to about 2K - 3K per second. So that
is quite reasonable.
You are very correct, though, that splitting that CSV into tons of one-line FlowFiles does
indeed have a cost.
Specifically, the big cost is the Provenance data that is generated at that rate. But again
2K - 3K per second
going through a handful of Processors is a very reasonable workload.

I will caution you, though, that there is a ticket [1] where people will sometimes run into
Out Of Memory Errors
if they try to split a huge CSV into individual FlowFiles because it holes all of those FlowFile
objects (not the
data itself but the attributes) in memory until the session is committed. The workaround for
this (until that ticket
is completed) is to use a SplitText to split into 10,000 lines or so per FlowFile and then
another SplitText to
split each of those smaller ones into 1-line FlowFiles.

Also of note, in 0.4.0, which is expected to be released in around a week or so, there is
a new RouteText
Processor. This, I think, will make your life far easier. Rather than using SplitText, Extract
Text, and MergeContent
in order to group the text, RouteText will allow you to supply a Grouping Regex. So that regex
can just pull out the
device id, year, month, and day, from each line and group together lines of text that have
the same values into
a single FlowFile. For instance, if your CSV looked like:

# device_id, device_manufacturer, value, year, month, day, hour
1234, Famous Manufacturer, 83, 2015, 11, 13, 12

You could define a grouping regex as:
(\d+), .*?, .*?, (\d+), (\d+), (\d+), .*

It looks complex but it's just breaking apart the CSV into individual fields and grouping
on device_id, year, month, day.
This will also create a RouteText.Group attribute with the value "1234, 2015, 11, 13"

This processor provides two benefits: it combines all of the grouping into a single Processor,
and it cuts down on the
millions of FlowFiles that are generated and then merged back together.

As I write this, though, I am realizing that the regex above is quite a pain. We should have
a RouteCSV processor as well.
Though it won't provide any features that RouteText can't provide, it will make configuration
far easier. I created a ticket
for this here [2]. I'm not sure that it will make it into the 0.4.0 release, though.

I hope this helps!

Thanks
-Mark

[1] https://issues.apache.org/jira/browse/NIFI-1008
[2] https://issues.apache.org/jira/browse/NIFI-1161 <https://issues.apache.org/jira/browse/NIFI-1161>


> On Nov 13, 2015, at 10:44 AM, Mark Petronic <markpetronic@gmail.com> wrote:
> 
> I have a concept question. Say I have 10 GB of CSV files/day containing records where
99% of them are from the last couple days but there are stragglers that are late reported
that can date back many months. Devices that are powered off at times don't report but eventually
do when powered on and report old, captured data - store and forward kind of thing. I want
to capture all the data for historic reasons. There are about 200 million records per day
to process. I want to put this data in Hive tables that are partitioned by year, month, and
day and use ORC columnar storage. These tables are external Hive tables and point to the directories
where I want to drop these files on HDFS, manually add new partitions, as needed, and immediately
be able to query using HQL.
> 
> Nifi Concept: 
> 
> 0. Use GetText to get a CSV file
> 1. Use UpdateAttribute to parse the incoming CSV file name to obtain a device_id and
set that as an attribute on the flow file
> 2. Use SplitText to split each row into a flow file. 
> 3. Use ExtractText to identify the field that is the record timestamp and create a year,
month, and day attribute on the flow file from that timestamp. 
> 4. Use MergeContent with a grouping key made up of (device_id,year,month,day)
> 5. Convert each file to ORC (many Parquet). This stage will likely require me building
a custom processor because the conversion is not going to be a simple A-to-B. I want to do
some validation on fields, type conversion, and other custom stuff against some source-of-truth
schema stored in a web service with REST API.
> 6. Use PutHDFS to store these ORC files in directories like .../device_id/year=2015/month=11/day=9
by using the attributes already present from the upstream processors to build up the path,
where device_id is the Hive table name and the year, month, day are the partition key name=value
per Hive format. The file names will just be some unique ID, they don't really matter
> 7. Use ExecuteProcessStream to execute a Hive script that will "alter table add partitions...."
for any partitions that were newly created on this schedule run
> 
> Is this insane or is it what Nifi was designed to do? I could definitely see using a
Spark job to do the group by (device_id,year,month,day) stage. 200M flow files from the SplitText
is the one that has me wondering if I am nuts thinking of doing that? There must be overhead
on flow files and deprecating them to one line each seems to me as a worst case scenario.
But it all depends on the core design and whether Nifi is optimized to handle such a use case.

> 
> Thanks,
> Mark


Mime
View raw message