nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Witt <joe.w...@gmail.com>
Subject Re: Data Ingestion forLarge Source Files and Masking
Date Wed, 13 Jan 2016 05:38:45 GMT
Hello

So the performance went from what sounded pretty good to what sounds
pretty problematic.  The rate now sounds like it is around 5MB/s which
is indeed quite poor.  Building on what Bryan said there does appear
to be some good opportunities to improve the performance.  The link he
provided just expanded to cover the full range to look at is here [1].

Couple key points to note:
1) Use of a buffered line oriented reader than preserves the new lines
2) write to a buffered writer that accepts strings and understands
which charset you intend to write out
3) avoid strong concat with newline

Also keep in mind you how large any single line could be because if
they can be quite large you may need to consider the GC pressure that
can be caused.  But let's take a look at how things are after these
easier steps first.

[1] https://github.com/apache/nifi/blob/ee14d8f9dd0c3f18920d910fcddd6d79b8b9f9cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java#L334-L361

Thanks
Joe

On Tue, Jan 12, 2016 at 10:35 PM, Juan Sequeiros <hellojuan@gmail.com> wrote:
> Obaid,
>
> Since you mention that you will have dedicated ETL servers and assume they
> will also have a decent amount of ram on them, then I would not shy away
> from increasing your threads.
>
> Also in your staging directory if you do not need to keep originals, then
> might consider GetFile and on that one use one thread.
>
> Hi Joe,
>
> Yes, I took consideration of existinh RAID and HW settings. We have 10G NIC
> for all hadoop intra-connectivity and the server in question is an edge node
> of our hadoop cluster.
> In production scenario we will use dedicated ETL servers having high
> performance(>500MB/s) local disks.
>
> Sharing a good news, I have successfully mask & load to HDFS 110 GB data
> using below flow:
>
> ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) > FetchFile
> (1 thread) > maskColumn(4 threads) > PutHDFS (1 threads).
>
> * used 4 threads for masking and 1 for other because I found it is the
> slowest component.
>
> However, It seems to be too slow. It was processing 2GB files in  6 minutes.
> It may be because of my masking algorithm(although masking algorithm is
> pretty simple FPE with some simple twist).
> However I want to be sure that the way I have written custom processor is
> the most efficient way. Please below code chunk and let me know whether it
> is the fastest way to process flowfiles (csv source files) which needs
> modifications on specific columns:
>
> * parseLine method contains logic for masking.
>
>        flowfile = session.write(flowfile, new StreamCallback() {
>         @Override
>            public void process(InputStream in, OutputStream out) throws
> IOException {
>
>         BufferedReader reader = new BufferedReader(new
> InputStreamReader(in));
>         String line;
>         if(skipHeader == true && headerExists==true) { // to skip header, do
> an additional line fetch before going to next step
>         if(reader.ready())   reader.readLine();
>         } else if( skipHeader == false && headerExists == true) { // if
> header is not skipped then no need to mask, just pass through
>         if(reader.ready())   out.write((reader.readLine()+"\n").getBytes());
>         }
>
>         // decide about empty line earlier
>         while ((line = reader.readLine()) != null) {
>         if(line.trim().length() > 0 ) {
>         out.write( parseLine(line, seperator, quote, escape,
> maskColumns).getBytes() );
>         }
> };
> out.flush();
>            }
>        });
>
>
>
>
> Thanks in advance.
> -Obaid
>
>
> On Tue, Jan 5, 2016 at 12:36 PM, Joe Witt <joe.witt@gmail.com> wrote:
>>
>> Obaid,
>>
>> Really happy you're seeing the performance you need.  That works out
>> to about 110MB/s on average over that period.  Any chance you have a
>> 1GB NIC?  If you really want to have fun with performance tuning you
>> can use things like iostat and other commands to observe disk,
>> network, cpu.  Something else to consider too is the potential
>> throughput gains of multiple RAID-1 containers rather than RAID-5
>> since NiFi can use both in parallel.  Depends on your goals/workload
>> so just an FYI.
>>
>> A good reference for how to build a processor which does altering of
>> the data (transformation) is here [1].  It is a good idea to do a
>> quick read through that document.  Also, one of the great things you
>> can do as well is look at existing processors.  Some good examples
>> relevant to transformation are [2], [3], and [4] which are quite
>> simple stream transform types. Or take a look at [5] which is a more
>> complicated example.  You might also be excited to know that there is
>> some really cool work done to bring various languages into NiFi which
>> looks on track to be available in the upcoming 0.5.0 release which is
>> NIFI-210 [6].  That will provide a really great option to quickly
>> build transforms using languages like Groovy, JRuby, Javascript,
>> Scala, Lua, Javascript, and Jython.
>>
>> [1]
>> https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#enrich-modify-content
>>
>> [2]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
>>
>> [3]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
>>
>> [4]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
>>
>> [5]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
>>
>> [6] https://issues.apache.org/jira/browse/NIFI-210
>>
>> Thanks
>> Joe
>>
>> On Mon, Jan 4, 2016 at 9:32 PM, obaidul karim <obaidcuet@gmail.com> wrote:
>> > Hi Joe,
>> >
>> > Just completed by test with 100GB data (on a local RAID 5 disk on a
>> > single
>> > server).
>> >
>> > I was able to load 100GB data within 15 minutes(awesome!!) using below
>> > flow.
>> > This throughput is enough to load 10TB data in a day with a single and
>> > simple machine.
>> > During the test, server disk I/O went up to 200MB/s.
>> >
>> >     ExecuteProcess(touch and mv to input dir) > ListFile > FetchFile (4
>> > threads) > PutHDFS (4 threads)
>> >
>> > My Next action is to incorporate my java code for column masking with a
>> > custom processor.
>> > I am now exploring on that. However, if you have any good reference on
>> > custom processor(altering actual data) please let  me know.
>> >
>> > Thanks,
>> > Obaid
>> >
>> >
>> >
>> > On Mon, Jan 4, 2016 at 9:11 AM, obaidul karim <obaidcuet@gmail.com>
>> > wrote:
>> >>
>> >> Hi Joe,
>> >>
>> >> Yes, symlink is another option I was thinking when I was trying to use
>> >> getfile.
>> >> Thanks for your insights, I will update you on this mail chain when my
>> >> entire workflow completes. So that thus could be an reference for other
>> >> :).
>> >>
>> >> -Obaid
>> >>
>> >> On Monday, January 4, 2016, Joe Witt <joe.witt@gmail.com> wrote:
>> >>>
>> >>> Obaid,
>> >>>
>> >>> You make a great point.
>> >>>
>> >>> I agree we will ultimately need to do more to make that very valid
>> >>> approach work easily.  The downside is that puts the onus on NiFi to
>> >>> keep track of a variety of potentially quite large state about the
>> >>> directory.  One way to avoid that expense is if NiFi can pull a copy
>> >>> of then delete the source file.  If you'd like to keep a copy around
I
>> >>> wonder if a good approach is to simply create a symlink to the
>> >>> original file you want NiFi to pull but have the symlink in the NiFi
>> >>> pickup directory.  NiFi is then free to read and delete which means
it
>> >>> simply pulls whatever shows up in that directory and doesn't have to
>> >>> keep state about filenames and checksums.
>> >>>
>> >>> I realize we still need to do what you're suggesting as well but
>> >>> thought I'd run this by you.
>> >>>
>> >>> Joe
>> >>>
>> >>> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim <obaidcuet@gmail.com>
>> >>> wrote:
>> >>> > Hi Joe,
>> >>> >
>> >>> > Condider a scenerio, where we need to feed some older files and
we
>> >>> > are
>> >>> > using
>> >>> > "mv" to feed files to input directory( to reduce IO we may use
>> >>> > "mv").
>> >>> > If we
>> >>> > use "mv", last modified date will not changed. And this is very
>> >>> > common
>> >>> > on a
>> >>> > busy file collection system.
>> >>> >
>> >>> > However, I think I can still manage it by adding additional "touch"
>> >>> > before
>> >>> > moving fole in the target directory.
>> >>> >
>> >>> > So, my suggestion is to add file selection criteria as an
>> >>> > configurable
>> >>> > option in listfile process on workflow. Options could be last
>> >>> > modified
>> >>> > date(as current one) unique file names, checksum etc.
>> >>> >
>> >>> > Thanks again man.
>> >>> > -Obaid
>> >>> >
>> >>> >
>> >>> > On Monday, January 4, 2016, Joe Witt <joe.witt@gmail.com>
wrote:
>> >>> >>
>> >>> >> Hello Obaid,
>> >>> >>
>> >>> >> The default behavior of the ListFile processor is to keep track
of
>> >>> >> the
>> >>> >> last modified time of the files it lists.  When you changed
the
>> >>> >> name
>> >>> >> of the file that doesn't change the last modified time as tracked
>> >>> >> by
>> >>> >> the OS but when you altered content it does.  Simply 'touch'
on the
>> >>> >> file would do it too.
>> >>> >>
>> >>> >> I believe we could observe the last modified time of the directory
>> >>> >> in
>> >>> >> which the file lives to detect something like a rename.  However,
>> >>> >> we'd
>> >>> >> not know which file was renamed just that something was changed.
>> >>> >> So
>> >>> >> it require keeping some potentially problematic state to deconflict
>> >>> >> or
>> >>> >> requiring the user to have a duplicate detection process
>> >>> >> afterwards.
>> >>> >>
>> >>> >> So with that in mind is the current behavior sufficient for
your
>> >>> >> case?
>> >>> >>
>> >>> >> Thanks
>> >>> >> Joe
>> >>> >>
>> >>> >> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim <obaidcuet@gmail.com>
>> >>> >> wrote:
>> >>> >> > Hi Joe,
>> >>> >> >
>> >>> >> > I am now exploring your solution.
>> >>> >> > Starting with below flow:
>> >>> >> >
>> >>> >> > ListFIle > FetchFile > CompressContent > PutFile.
>> >>> >> >
>> >>> >> > Seems all fine. Except some confusion with how ListFile
>> >>> >> > identifies
>> >>> >> > new
>> >>> >> > files.
>> >>> >> > In order to test, I renamed a already processed file and
put in
>> >>> >> > in
>> >>> >> > input
>> >>> >> > folder and found that the file is not processing.
>> >>> >> > Then I randomly changed the content of the file and it
was
>> >>> >> > immediately
>> >>> >> > processed.
>> >>> >> >
>> >>> >> > My question is what is the new file selection criteria
for
>> >>> >> > "ListFile" ?
>> >>> >> > Can
>> >>> >> > I change it only to file name ?
>> >>> >> >
>> >>> >> > Thanks in advance.
>> >>> >> >
>> >>> >> > -Obaid
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt <joe.witt@gmail.com>
>> >>> >> > wrote:
>> >>> >> >>
>> >>> >> >> Hello Obaid,
>> >>> >> >>
>> >>> >> >> At 6 TB/day and average size of 2-3GB per dataset
you're looking
>> >>> >> >> at
>> >>> >> >> a
>> >>> >> >> sustained rate of 70+MB/s and a pretty low transaction
rate.  So
>> >>> >> >> well
>> >>> >> >> within a good range to work with on a single system.
>> >>> >> >>
>> >>> >> >> 'I's there any way to by pass writing flow files on
disk or
>> >>> >> >> directly
>> >>> >> >> pass those files to HDFS as it is ?"
>> >>> >> >>
>> >>> >> >>   There is no way to bypass NiFi taking a copy of
that data by
>> >>> >> >> design.
>> >>> >> >> NiFi is helping you formulate a graph of dataflow
requirements
>> >>> >> >> from
>> >>> >> >> a
>> >>> >> >> given source(s) through given processing steps and
ultimate
>> >>> >> >> driving
>> >>> >> >> data into given destination systems.  As a result
it takes on
>> >>> >> >> the
>> >>> >> >> challenge of handling transactionality of each interaction
and
>> >>> >> >> the
>> >>> >> >> buffering and backpressure to deal with the realities
of
>> >>> >> >> different
>> >>> >> >> production/consumption patterns.
>> >>> >> >>
>> >>> >> >> "If the files on the spool directory are compressed(zip/gzip),
>> >>> >> >> can
>> >>> >> >> we
>> >>> >> >> store files on HDFS as uncompressed ?"
>> >>> >> >>
>> >>> >> >>   Certainly.  Both of those formats (zip/gzip) are
supported in
>> >>> >> >> NiFi
>> >>> >> >> out of the box.  You simply run the data through the
proper
>> >>> >> >> process
>> >>> >> >> prior to the PutHDFS process to unpack (zip) or decompress
>> >>> >> >> (gzip)
>> >>> >> >> as
>> >>> >> >> needed.
>> >>> >> >>
>> >>> >> >> "2.a Can we use our existing java code for masking
? if yes then
>> >>> >> >> how ?
>> >>> >> >> 2.b For this Scenario we also want to bypass storing
flow files
>> >>> >> >> on
>> >>> >> >> disk. Can we do it on the fly, masking and storing
on HDFS ?
>> >>> >> >> 2.c If the source files are compressed (zip/gzip),
is there any
>> >>> >> >> issue
>> >>> >> >> for masking here ?"
>> >>> >> >>
>> >>> >> >>   You would build a custom NiFi processor that leverages
your
>> >>> >> >> existing
>> >>> >> >> code.  If your code is able to operate on an InputStream
and
>> >>> >> >> writes
>> >>> >> >> to
>> >>> >> >> an OutputStream then it is very likely you'll be able
to handle
>> >>> >> >> arbitrarily large objects with zero negative impact
to the JVM
>> >>> >> >> Heap
>> >>> >> >> as
>> >>> >> >> well.  This is thanks to the fact that the data is
present in
>> >>> >> >> NiFi's
>> >>> >> >> repository with copy-on-write/pass-by-reference semantics
and
>> >>> >> >> that
>> >>> >> >> the
>> >>> >> >> API is exposing those streams to your code in a transactional
>> >>> >> >> manner.
>> >>> >> >>
>> >>> >> >>   If you want the process of writing to HDFS to also
do
>> >>> >> >> decompression
>> >>> >> >> and masking in one pass you'll need to extend/alter
the PutHDFS
>> >>> >> >> process to do that.  It is probably best to implement
the flow
>> >>> >> >> using
>> >>> >> >> cohesive processors (grab files, decompress files,
mask files,
>> >>> >> >> write
>> >>> >> >> to hdfs).  Given how the repository construct in NiFi
works and
>> >>> >> >> given
>> >>> >> >> how caching in Linux works it is very possible you'll
be quite
>> >>> >> >> surprised by the throughput you'll see.  Even then
you can
>> >>> >> >> optimize
>> >>> >> >> once you're sure you need to.  The other thing to
keep in mind
>> >>> >> >> here
>> >>> >> >> is
>> >>> >> >> that often a flow that starts out as specific as this
turns into
>> >>> >> >> a
>> >>> >> >> great place to tap the stream of data to feed some
new system or
>> >>> >> >> new
>> >>> >> >> algorithm with a different format or protocol.  At
that moment
>> >>> >> >> the
>> >>> >> >> benefits become even more obvious.
>> >>> >> >>
>> >>> >> >> Regarding the Flume processes in NiFi and their memory
usage.
>> >>> >> >> NiFi
>> >>> >> >> offers a nice hosting mechanism for the Flume processes
and
>> >>> >> >> brings
>> >>> >> >> some of the benefits of NiFi's UI, provenance, repository
>> >>> >> >> concept.
>> >>> >> >> However, we're still largely limited to the design
assumptions
>> >>> >> >> one
>> >>> >> >> gets when building a Flume process and that can be
quite memory
>> >>> >> >> limiting.  We see what we have today as a great way
to help
>> >>> >> >> people
>> >>> >> >> transition their existing Flume flows into NiFi by
leveraging
>> >>> >> >> their
>> >>> >> >> existing code but would recommend working to phase
the use of
>> >>> >> >> those
>> >>> >> >> out in time so that you can take full benefit of what
NiFi
>> >>> >> >> brings
>> >>> >> >> over
>> >>> >> >> Flume.
>> >>> >> >>
>> >>> >> >> Thanks
>> >>> >> >> Joe
>> >>> >> >>
>> >>> >> >>
>> >>> >> >> On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim
>> >>> >> >> <obaidcuet@gmail.com>
>> >>> >> >> wrote:
>> >>> >> >> > Hi,
>> >>> >> >> >
>> >>> >> >> > I am new in Nifi and exploring it as open source
ETL tool.
>> >>> >> >> >
>> >>> >> >> > As per my understanding, flow files are stored
on local disk
>> >>> >> >> > and
>> >>> >> >> > it
>> >>> >> >> > contains
>> >>> >> >> > actual data.
>> >>> >> >> > If above is true, lets consider a below scenario:
>> >>> >> >> >
>> >>> >> >> > Scenario 1:
>> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day)
of files
>> >>> >> >> > coming
>> >>> >> >> > from
>> >>> >> >> > external sources
>> >>> >> >> > - I want to push those files to HDFS as it is
without any
>> >>> >> >> > changes
>> >>> >> >> >
>> >>> >> >> > Scenario 2:
>> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day)
of files
>> >>> >> >> > coming
>> >>> >> >> > from
>> >>> >> >> > external sources
>> >>> >> >> > - I want to mask some of the sensitive columns
>> >>> >> >> > - Then send one copy to HDFS and another copy
to Kafka
>> >>> >> >> >
>> >>> >> >> > Question for Scenario 1:
>> >>> >> >> > 1.a In that case those 5-6TB data will be again
written on
>> >>> >> >> > local
>> >>> >> >> > disk
>> >>> >> >> > as
>> >>> >> >> > flow files and will cause double I/O. Which eventually
may
>> >>> >> >> > cause
>> >>> >> >> > slower
>> >>> >> >> > performance due to I/O bottleneck.
>> >>> >> >> > Is there any way to by pass writing flow files
on disk or
>> >>> >> >> > directly
>> >>> >> >> > pass
>> >>> >> >> > those files to HDFS as it is ?
>> >>> >> >> > 1.b If the files on the spool directory are
>> >>> >> >> > compressed(zip/gzip),
>> >>> >> >> > can
>> >>> >> >> > we
>> >>> >> >> > store files on HDFS as uncompressed ?
>> >>> >> >> >
>> >>> >> >> > Question for Scenario 2:
>> >>> >> >> > 2.a Can we use our existing java code for masking
? if yes
>> >>> >> >> > then
>> >>> >> >> > how ?
>> >>> >> >> > 2.b For this Scenario we also want to bypass
storing flow
>> >>> >> >> > files
>> >>> >> >> > on
>> >>> >> >> > disk.
>> >>> >> >> > Can
>> >>> >> >> > we do it on the fly, masking and storing on HDFS
?
>> >>> >> >> > 2.c If the source files are compressed (zip/gzip),
is there
>> >>> >> >> > any
>> >>> >> >> > issue
>> >>> >> >> > for
>> >>> >> >> > masking here ?
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> > In fact, I tried above using flume+flume interceptors.
>> >>> >> >> > Everything
>> >>> >> >> > working
>> >>> >> >> > fine with smaller files. But when source files
greater that
>> >>> >> >> > 50MB
>> >>> >> >> > flume
>> >>> >> >> > chocks :(.
>> >>> >> >> > So, I am exploring options in NiFi. Hope I will
get some
>> >>> >> >> > guideline
>> >>> >> >> > from
>> >>> >> >> > you
>> >>> >> >> > guys.
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> > Thanks in advance.
>> >>> >> >> > -Obaid
>> >>> >> >
>> >>> >> >
>> >
>> >
>
>

Mime
View raw message