nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Iannone <bread...@gmail.com>
Subject Re: MergeContent resulting in corrupted JSON
Date Thu, 11 Jun 2020 14:43:20 GMT
I confirmed what you mentioned as well.

I also looked over many custom processor examples and looking for
clarification on a few things which I didn't see explicitly called out in
the developers guide.

   - Are their guidelines on when one should modify the original flowfile
   vs when you should clone vs when you should create net new?
   - Should heavier lifting such as decryption, formatting, etc. be done in
   a callback?


Thanks,
Jason

On Wed, Jun 10, 2020 at 4:32 PM Mark Payne <markap14@hotmail.com> wrote:

> I don’t think flushing should matter, if you’re writing directly to the
> provided OutputStream. If you wrap it in a BufferedOutputStream or
> something like that, then of course you’ll want to flush that. Assuming
> that you are extending AbstractProcessor, it will call session.commit() for
> you automatically when onTrigger() returns.
>
> I did just notice that you said you’re merging 1,000+ FlowFiles. That
> would make it kind of difficult to follow the provenance. Would recommend
> for debugging purposes, at least, that you try making small batches, maybe
> 25 FlowFiles or something like that. Would make it a lot easier to find the
> culprit
>
> On Jun 10, 2020, at 4:28 PM, Jason Iannone <breadfan@gmail.com> wrote:
>
> Excellent advice, thank you! When writing via
> ProcessSession.write(FlowFile, OutputStream) is it advised to flush and/or
> session.commit()? I noticed we aren't doing either, but we are invoking
> session.transfer.
>
> Thanks,
> Jason
>
>
> On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <markap14@hotmail.com> wrote:
>
>> Jason,
>>
>> Control characters should not cause any problem with MergeContent.
>> MergeContent just copies bytes from one stream to another. It’s also worth
>> noting that attributes don’t really come into play here. MergeContent is
>> combining the FlowFile content, so even if it has some weird attributes,
>> those won’t cause a problem in the output content. NiFi stores attributes
>> as a mapping of String to String key/value pairs (i.e., Map<String,
>> String>). So the processor is assuming that if you want to convert a
>> message header to an attribute, that header must be a string.
>>
>> Content in the repository is stored using “slabs” or “blocks.” One
>> processor at a time has the opportunity to write to a file in the content
>> repository. When the processor finishes writing and transfers the FlowFile
>> to the next processor, NiFi keeps track of which file its content was
>> written to, the byte offset where its content starts, and the length of the
>> content. The next time that a processor needs to write to the content of a
>> FlowFile, it may end up appending to that same file on disk, but the
>> FlowFile that the content corresponds to will keep track of the byte offset
>> into the file where its content begins and how many bytes in that file
>> belong to that FlowFile.
>>
>> My recommendation to track this down would be to find a FlowFile that is
>> corrupt, and then use the data provenance feature [1] to view its lineage.
>> Look at the FlowFiles that were joined together by MergeContent and see if
>> any of those is corrupt.
>>
>> Thanks
>> -Mark
>>
>> [1]
>> http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance
>>
>> On Jun 10, 2020, at 2:07 PM, Jason Iannone <breadfan@gmail.com> wrote:
>>
>> Hey Mark,
>>
>> I was thinking over this more and despite no complaints from Jackson
>> Objectmapper is it possible that hidden and/or control characters are
>> present in the JSON values which would then cause MergeContent to behave
>> this way? I looked over the code and nothing jumped out, but there is
>> something we had to do because of how the publisher is setting kafka header
>> attributes. Some attributes are bytes and not strings converted to bytes,
>> and ConsumeKafka seems to assume that these can always be converted to a
>> String. We had to change the encoding to be ISO8859 due to running into
>> issues with the bytes getting corrupted.
>>
>> I'm also trying to better understand how the content is being stored in
>> the content repository, and whether something is going wrong when writing
>> it out.
>>
>> Thanks,
>> Jason
>>
>> On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <markap14@hotmail.com> wrote:
>>
>>> Hey Jason,
>>>
>>> Thanks for reaching out. That is definitely odd and not something that
>>> I’ve seen or heard about before.
>>>
>>> Are you certain that the data is not being corrupted upstream of the
>>> processor? I ask because the code for the processor that handles writing
>>> out the content is pretty straight forward and hasn’t been modified in over
>>> 3 years, so I would expect to see it happen often if it were a bug in the
>>> MergeContent processor itself. Any chance that you can create a flow
>>> template/sample data that recreates the issue? Anything particularly unique
>>> about your flow?
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>> > On Jun 9, 2020, at 6:47 PM, Jason Iannone <breadfan@gmail.com> wrote:
>>> >
>>> > Hi all,
>>> >
>>> > Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent.
>>> The processor is being fed in many flowfiles with individual JSON records.
>>> The records have various field types including a hex-encoded byte[]. We are
>>> not trying to merge JSON records themselves but rather consolidate many
>>> flowfiles into fewer flowfiles.
>>> >
>>> > What we're seeing is that a random flowfile is split causing the merge
>>> file to be invalid JSON. When running multiple bins we saw the flowfile
>>> split across bins.
>>> >
>>> > Example
>>> > Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp:
>>> "123456789" }
>>> > Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp:
>>> "123456790" }
>>> > Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp:
>>> "123456790" }
>>> >
>>> > Merged Result:
>>> > {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
>>> > xbytes": A10F15D14B11", timestamp: "123456790" }
>>> > {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>>> > {"name": "3", "h
>>> >
>>> > Mergecontent Configuration:
>>> > Concurrent Tasks: 4
>>> > Merge Strategy: Bin-Packing Algorithm
>>> > Merge Format: Binary Concatenation
>>> > Attribute Strategy: Keep Only Common Attributes
>>> > Min. number of entries 1000
>>> > Max number of entries: 20000
>>> > Minimum group size: 10 KB
>>> > Maximum number of bins: 5
>>> > Header, Footer, and Demaractor are not set.
>>> >
>>> > We then backed off the below to reduce min and max entries, bin to 1,
>>> and thread to 1 and still see the same issue.
>>> >
>>> > Any insights?
>>> >
>>> > Thanks,
>>> > Jason
>>>
>>>
>>
>

Mime
View raw message