nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From József Mészáros <joe.mesza...@impresstv.com>
Subject Re: custom processor - parse flowFile to many kafka messages
Date Mon, 14 Sep 2015 10:20:49 GMT
Tim,

I needed a very similar workflow:  I had a bunch of CSV files, containing
web tracking events, and I wanted to convert every line to JSON, and then
push them to Kafka as a separate message. The solution was:

GetFile --> ConvertCSVToAvro --> ConvertAvroToJson --> PutKafka

It does not split your huge file(s) into several fow file per line, instead
it converts your content to Apache Avro format (
https://avro.apache.org/docs/current/).

I had tab separated files, which was not supported by the original
ConvertCSVToAvro implementation, so I created a tiny patch:

   - JIRA issue: https://issues.apache.org/jira/browse/NIFI-944
   - Github PR: https://github.com/apache/nifi/pull/87 (waiting for merge)

The ConvertAvroToJson processor exposed the Avro records as array of JSON
objects into a single line, which was not appropriate for my scenario, so I
added a new boolean property, determining how avro records are exposed:
either as a sequence of single Objects (false), writing every Object to a
new line, or as an array of Objects (true). The details for this
modification:

   - JIRA issue : https://issues.apache.org/jira/browse/NIFI-945
   - Github PR : https://github.com/apache/nifi/pull/88 (waiting for merge)


Besides the Avro based solution I created a direct csv2json converter using
Jackson CSV extension (https://github.com/FasterXML/jackson-dataformat-csv).
It converts csv files  directly to JSON and does not use Avro, as an
intermediate format. This custom processor is not published yet, but if you
think it is helpful, I can create a JIRA issue and a Github PR.

I hope it helps you.

Joe


On Mon, Sep 14, 2015 at 5:23 AM, timF <tim@fogarty.org> wrote:

> Thanks for all the feedback.  Looking at the source code for SplitText, I
> see
> that it parses the input FlowFile, storing the created output FlowFiles in
> a
> list, and then at the end sends the list all at once with a single call to
> session.transfer().  This could be a problem when there are millions of
> records in the input file.
>
> Is there a technical reason why SplitText creates all the output flow files
> before sending them out?  If I were to write my own split process, or a
> combination of GetFile and SplitText where I read the input file line by
> line, can I create an output flow file, send it out, then create the next
> one, send it out, etc?
>
> Does the next processor in the flow get the flow file as soon as it is sent
> with session.transfer?
>
>
>
> --
> View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/custom-processor-parse-flowFile-to-many-kafka-messages-tp2782p2803.html
> Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message