nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <>
Subject Re: MergeContent resulting in corrupted JSON
Date Wed, 10 Jun 2020 20:32:28 GMT
I don’t think flushing should matter, if you’re writing directly to the provided OutputStream.
If you wrap it in a BufferedOutputStream or something like that, then of course you’ll want
to flush that. Assuming that you are extending AbstractProcessor, it will call session.commit()
for you automatically when onTrigger() returns.

I did just notice that you said you’re merging 1,000+ FlowFiles. That would make it kind
of difficult to follow the provenance. Would recommend for debugging purposes, at least, that
you try making small batches, maybe 25 FlowFiles or something like that. Would make it a lot
easier to find the culprit

On Jun 10, 2020, at 4:28 PM, Jason Iannone <<>>

Excellent advice, thank you! When writing via ProcessSession.write(FlowFile, OutputStream)
is it advised to flush and/or session.commit()? I noticed we aren't doing either, but we are
invoking session.transfer.


On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <<>>

Control characters should not cause any problem with MergeContent. MergeContent just copies
bytes from one stream to another. It’s also worth noting that attributes don’t really
come into play here. MergeContent is combining the FlowFile content, so even if it has some
weird attributes, those won’t cause a problem in the output content. NiFi stores attributes
as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the
processor is assuming that if you want to convert a message header to an attribute, that header
must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a
time has the opportunity to write to a file in the content repository. When the processor
finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which
file its content was written to, the byte offset where its content starts, and the length
of the content. The next time that a processor needs to write to the content of a FlowFile,
it may end up appending to that same file on disk, but the FlowFile that the content corresponds
to will keep track of the byte offset into the file where its content begins and how many
bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then
use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined
together by MergeContent and see if any of those is corrupt.



On Jun 10, 2020, at 2:07 PM, Jason Iannone <<>>

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible
that hidden and/or control characters are present in the JSON values which would then cause
MergeContent to behave this way? I looked over the code and nothing jumped out, but there
is something we had to do because of how the publisher is setting kafka header attributes.
Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume
that these can always be converted to a String. We had to change the encoding to be ISO8859
due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository,
and whether something is going wrong when writing it out.


On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <<>>
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard
about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because
the code for the processor that handles writing out the content is pretty straight forward
and hasn’t been modified in over 3 years, so I would expect to see it happen often if it
were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample
data that recreates the issue? Anything particularly unique about your flow?


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <<>>
> Hi all,
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor
is being fed in many flowfiles with individual JSON records. The records have various field
types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but
rather consolidate many flowfiles into fewer flowfiles.
> What we're seeing is that a random flowfile is split causing the merge file to be invalid
JSON. When running multiple bins we saw the flowfile split across bins.
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1
and still see the same issue.
> Any insights?
> Thanks,
> Jason

View raw message