nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neil Derraugh <neil.derra...@intellifylearning.com>
Subject Re: JSON array chunking
Date Fri, 01 Sep 2017 17:24:31 GMT
Hi Bryan,

Thanks for the tip, that was very helpful, and helped me finish.  I wanted
essentially your flow with a schema inference and no schema registry.  The
flow takes an arbitrary (no pre-defined schema other than being an array)
JSON array and chunks it out.  I attached the final result for anyone who's
curious.

Neil

On Thu, Aug 31, 2017 at 9:28 AM, Bryan Bende <bbende@gmail.com> wrote:

> Neil,
>
> I'm a little confused as to what format your initial data is in... You
> showed an example payload as JSON, but then mentioned using an
> AvroReader, so it wasn't clear to me if your starting point is JSON or
> Avro.
>
> Assuming it is JSON, I put together a template that shows how to split
> your sample data:
>
> https://gist.github.com/bbende/f73d06c0d35ed1aeb2603a8f87276ed7
>
> I used the second schema you have (the one where the top-level element
> is a record) and then SplitRecord with a JsonTreeReader and
> JsonRecordSetWriter.
>
> The sample data I sent in was your example data, and it produced two
> flow files coming out of SplitRecord, one for each element of the
> array.
>
> Let us know if this not what you are trying to do.
>
> Thanks,
>
> Bryan
>
>
> On Wed, Aug 30, 2017 at 8:31 PM, Neil Derraugh
> <neil.derraugh@intellifylearning.com> wrote:
> > I should have mentioned I tried starting with a JsonPathReader before the
> > AvroReader.  I had a property I was calling root with a value of $.  I
> can
> > post details about that too if it would be helpful.
> >
> > On Wed, Aug 30, 2017 at 8:08 PM, Neil Derraugh
> > <neil.derraugh@intellifylearning.com> wrote:
> >>
> >> I have arbitrary JSON arrays that I want to split into chunks.  I've
> been
> >> (unsuccessfully) trying to figure this out with InferAvroSchema ->
> >> SplitJson(AvroReader, JsonRecordSetWriter).
> >>
> >> Here's an example payload:
> >> [{
> >>     "id": "56740f4b-48de-0502-afdc-59a463b3f6dc",
> >>     "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2",
> >>     "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b",
> >>     "date_modified": 1503959931000,
> >>     "deleted": 0
> >>   },
> >>   {
> >>     "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f",
> >>     "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84",
> >>     "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8",
> >>     "date_modified": 1503959873000,
> >>     "deleted": 0
> >>   }]
> >>
> >> Here's the schema that gets inferred (the AvroReader's Avro Record Name
> is
> >> "root"):
> >> {
> >>   "type": "array",
> >>   "items": {
> >>     "type": "record",
> >>     "name": "root",
> >>     "fields": [
> >>       {
> >>         "name": "id",
> >>         "type": "string",
> >>         "doc": "Type inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"
> >>       },
> >>       {
> >>         "name": "account_id",
> >>         "type": "string",
> >>         "doc": "Type inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"
> >>       },
> >>       {
> >>         "name": "contact_id",
> >>         "type": "string",
> >>         "doc": "Type inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"
> >>       },
> >>       {
> >>         "name": "date_modified",
> >>         "type": "long",
> >>         "doc": "Type inferred from '1503959931000'"
> >>       },
> >>       {
> >>         "name": "deleted",
> >>         "type": "int",
> >>         "doc": "Type inferred from '0'"
> >>       }
> >>     ]
> >>   }
> >> }
> >>
> >> When I use ${inferred.avro.schema} for both the AvroReader and the
> >> JsonRecordSetWriter I get:
> >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
> >> Record Writer for
> >> StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-4886eb2c2a76,claim=
> StandardContentClaim
> >> [resourceClaim=StandardResourceClaim[id=1504118228480-325,
> >> container=default, section=325], offset=0,
> >> length=86462199],offset=0,name=accounts-contacts.json.
> avro,size=86462199];
> >> routing to failure: org.apache.nifi.schema.access.
> SchemaNotFoundException:
> >> org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}.
> >>
> >> The stack trace:
> >> 2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9]
> >> o.a.nifi.processors.standard.SplitRecord
> >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
> Record
> >> Writer for
> >> StandardFlowFileRecord[uuid=a5f720cf-98a8-4c29-bd91-098c7f25448d,claim=
> StandardContentClaim
> >> [resourceClaim=StandardResourceClaim[id=1504121074997-336,
> >> container=default, section=336], offset=1013917,
> >> length=454],offset=0,name=626851422080935,size=454]; routing to
> failure:
> >> org.apache.nifi.schema.access.SchemaNotFoundException:
> >> org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}
> >> org.apache.nifi.schema.access.SchemaNotFoundException:
> >> org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}
> >> at
> >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(
> AvroSchemaTextStrategy.java:55)
> >> at
> >> org.apache.nifi.serialization.SchemaRegistryService.getSchema(
> SchemaRegistryService.java:112)
> >> at sun.reflect.GeneratedMethodAccessor1466.invoke(Unknown Source)
> >> at
> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at
> >> org.apache.nifi.controller.service.StandardControllerServiceInvoc
> ationHandler.invoke(StandardControllerServiceInvocationHandler.java:89)
> >> at com.sun.proxy.$Proxy144.getSchema(Unknown Source)
> >> at
> >> org.apache.nifi.processors.standard.SplitRecord.
> onTrigger(SplitRecord.java:138)
> >> at
> >> org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> >> at
> >> org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1120)
> >> at
> >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:147)
> >> at
> >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> >> at
> >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:132)
> >> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >> at
> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >> at java.lang.Thread.run(Thread.java:748)
> >> Caused by: org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}
> >> at org.apache.avro.Schema.getFields(Schema.java:220)
> >> at org.apache.nifi.avro.AvroTypeUtil.createSchema(
> AvroTypeUtil.java:218)
> >> at org.apache.nifi.avro.AvroTypeUtil.createSchema(
> AvroTypeUtil.java:202)
> >> at
> >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(
> AvroSchemaTextStrategy.java:53)
> >> ... 19 common frames omitted
> >>
> >> Which looks like it's coming from the Writer, and is maybe about the
> root
> >> element being an array as opposed to a record.  So I hardcoded the
> schema in
> >> the JsonRecordSetWriter to be just the record like this:
> >> {
> >>   "type": "record",
> >>   "name": "root",
> >>   "fields": [
> >>     {
> >>       "name": "id",
> >>       "type": "string",
> >>       "doc": "Type inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"
> >>     },
> >>     {
> >>       "name": "account_id",
> >>       "type": "string",
> >>       "doc": "Type inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"
> >>     },
> >>     {
> >>       "name": "contact_id",
> >>       "type": "string",
> >>       "doc": "Type inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"
> >>     },
> >>     {
> >>       "name": "date_modified",
> >>       "type": "long",
> >>       "doc": "Type inferred from '1503959931000'"
> >>     },
> >>     {
> >>       "name": "deleted",
> >>       "type": "int",
> >>       "doc": "Type inferred from '0'"
> >>     }
> >>   ]
> >> }
> >> Which gave me:
> >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to split
> >> StandardFlowFileRecord[uuid=e2098e1b-f2f8-4ca8-926f-6d0e3643ce45,claim=
> StandardContentClaim
> >> [resourceClaim=StandardResourceClaim[id=1504118228480-325,
> >> container=default, section=325], offset=0,
> >> length=86462199],offset=0,name=accounts-contacts.json.
> avro,size=86462199]:
> >> org.apache.nifi.processor.exception.ProcessException: Failed to parse
> >> incoming data.
> >>
> >> The stack trace:
> >> 2017-08-30 19:31:21,690 ERROR [Timer-Driven Process Thread-3]
> >> o.a.nifi.processors.standard.SplitRecord
> >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to split
> >> StandardFlowFileRecord[uuid=41f02871-e5e8-496e-a671-bc3200c4bf8e,claim=
> StandardContentClaim
> >> [resourceClaim=StandardResourceClaim[id=1504121074874-335,
> >> container=default, section=335], offset=1028220,
> >> length=454],offset=0,name=626191417134736,size=454]:
> >> org.apache.nifi.processor.exception.ProcessException: Failed to parse
> >> incoming data
> >> org.apache.nifi.processor.exception.ProcessException: Failed to parse
> >> incoming data
> >> at
> >> org.apache.nifi.processors.standard.SplitRecord$1.
> process(SplitRecord.java:187)
> >> at
> >> org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2136)
> >> at
> >> org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2106)
> >> at
> >> org.apache.nifi.processors.standard.SplitRecord.
> onTrigger(SplitRecord.java:149)
> >> at
> >> org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> >> at
> >> org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1120)
> >> at
> >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:147)
> >> at
> >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> >> at
> >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:132)
> >> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >> at
> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >> at java.lang.Thread.run(Thread.java:748)
> >> Caused by: org.apache.nifi.schema.access.SchemaNotFoundException:
> >> org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}
> >> at
> >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(
> AvroSchemaTextStrategy.java:55)
> >> at
> >> org.apache.nifi.serialization.SchemaRegistryService.getSchema(
> SchemaRegistryService.java:112)
> >> at org.apache.nifi.avro.AvroReader.createRecordReader(
> AvroReader.java:92)
> >> at sun.reflect.GeneratedMethodAccessor1467.invoke(Unknown Source)
> >> at
> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at
> >> org.apache.nifi.controller.service.StandardControllerServiceInvoc
> ationHandler.invoke(StandardControllerServiceInvocationHandler.java:89)
> >> at com.sun.proxy.$Proxy158.createRecordReader(Unknown Source)
> >> at
> >> org.apache.nifi.processors.standard.SplitRecord$1.
> process(SplitRecord.java:152)
> >> ... 15 common frames omitted
> >> Caused by: org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}
> >> at org.apache.avro.Schema.getFields(Schema.java:220)
> >> at org.apache.nifi.avro.AvroTypeUtil.createSchema(
> AvroTypeUtil.java:218)
> >> at org.apache.nifi.avro.AvroTypeUtil.createSchema(
> AvroTypeUtil.java:202)
> >> at
> >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(
> AvroSchemaTextStrategy.java:53)
> >> ... 23 common frames omitted
> >>
> >> Can somebody point me to what I'm doing wrong?  Or suggest an
> alternative
> >> approach to chunking arbitrary JSON arrays?
> >>
> >> Thanks,
> >> Neil
> >
> >
>

Mime
View raw message