nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Parul Agrawal <parulagrawa...@gmail.com>
Subject Re: Need help in nifi- flume processor
Date Tue, 13 Oct 2015 12:36:48 GMT
Hi,

I tried with the above json element. But I am getting the below mentioned
error:

2015-10-12 23:53:39,209 ERROR [Timer-Driven Process Thread-9]
o.a.n.p.standard.ConvertJSONToSQL
ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to parse
StandardFlowFileRecord[uuid=dfc16db0-c7a6-4e9e-8b4d-8c5b4ec50742,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
section=1], offset=132621, length=55],offset=0,name=json,size=55] as JSON
due to org.apache.nifi.processor.exception.ProcessException: IOException
thrown from ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e]:
org.codehaus.jackson.JsonParseException: Unexpected character ('I' (code
73)): expected a valid value (number, String, array, object, 'true',
'false' or 'null')

Also I have a huge json object attached (new.json). Can you guide me on how
do i use ConvertJSONToSQL processor.
Should I use any other processor before using ConvertJSONToSQL processor so
that this new.json can be converted in to a flat document of key/value
pairs, or an array of flat documents.

Any help/guidance would be really useful.

Thanks and Regards,
Parul

On Mon, Oct 12, 2015 at 10:36 PM, Bryan Bende <bbende@gmail.com> wrote:

> I think ConvertJSONToSQL expects a flat document of key/value pairs, or an
> array of flat documents. So I think your JSON would be:
>
> [
>     {"firstname":"John", "lastname":"Doe"},
>     {"firstname":"Anna", "lastname":"Smith"}
> ]
>
> The table name will come from the Table Name property.
>
> Let us know if this doesn't work.
>
> -Bryan
>
>
> On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal <parulagrawal14@gmail.com>
> wrote:
>
>> Hi,
>>
>> Thank you very much for all the support.
>> I could able to convert XML format to json  using custom flume source.
>>
>> Now I would need ConvertJSONToSQL processor to insert data into SQL.
>> I am trying to get hands-on on this processor. Will update you on this.
>> Meanwhile if any example you could share to use this processor for a
>> sample
>> json data, then it would be great.
>>
>> ===============
>>
>> 1) I tried using ConvertJSONToSQL processor with the below sample json
>> file:
>>
>> "details":[
>>     {"firstname":"John", "lastname":"Doe"},
>>     {"firstname":"Anna", "lastname":"Smith"}
>> ]
>>
>> 2) I created table *details *in the postgreSQL
>> * select * from details ;*
>> * firstname | lastname*
>> *-----------+----------*
>> *(0 rows)*
>>
>> 3) ConvertJSONToSQL Processor property details are as below:
>> *Property  *                                               *Value*
>> JDBC Connection PoolInfo            DBCPConnectionPool
>> Statement TypeInfo                      INSERT
>> Table NameInfo                            details
>> Catalog NameInfo                         No value set
>> Translate Field NamesInfo             false
>> Unmatched Field BehaviorInfo       Ignore Unmatched Fields
>> Update KeysInfo                           No value set
>>
>> But I am getting the below mentioned error in ConvertJSONToSQL Processor.
>> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
>> o.a.n.p.standard.ConvertJSONToSQL
>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
>> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim
>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
>> SQL INSERT statement due to
>> org.apache.nifi.processor.exception.ProcessException: None of the fields in
>> the JSON map to the columns defined by the details table; routing to
>> failure: org.apache.nifi.processor.exception.ProcessException: None of the
>> fields in the JSON map to the columns defined by the details table
>>
>> Thanks and Regards,
>> Parul
>>
>> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <joey42@gmail.com>
>> wrote:
>>
>>> I've done something like this by wrapping the command in a shell script:
>>>
>>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
>>>
>>> My use case was slightly different, but I'm pretty sure you can adapt
>>> the same idea.
>>>
>>> -Joey
>>>
>>> On Oct 10, 2015, at 03:52, Parul Agrawal <parulagrawal14@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I actually need to get the data from pipe.
>>> So the actual command I would need is mkfifo /tmp/packet;tshark -i
>>> ens160 -T pdml >/tmp/packet.
>>> Is it possible to use ExecuteProcessor for multiple commands ?
>>>
>>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <parulagrawal14@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> I added custom flume source and when flume source is sending the data
>>>> to flume sink, below mentioned error is thrown at flume sink.
>>>>
>>>>  Administratively Yielded for 1 sec due to processing failure
>>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>>>> Exception: java.lang.IllegalStateException: close() called when transaction
>>>> is OPEN - you must either commit or rollback first
>>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>>>> o.a.n.c.t.ContinuallyRunProcessorTask
>>>> java.lang.IllegalStateException: close() called when transaction is
>>>> OPEN - you must either commit or rollback first
>>>>         at
>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>>>> ~[guava-r05.jar:na]
>>>>         at
>>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>         at
>>>> org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>         at
>>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>>>> ~[na:na]
>>>>         at
>>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>>>> ~[na:na]
>>>>         at
>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>>>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>>         at
>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>         at
>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>         at
>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>> [na:1.7.0_85]
>>>>         at
>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>> [na:1.7.0_85]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>> [na:1.7.0_85]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> [na:1.7.0_85]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> [na:1.7.0_85]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> [na:1.7.0_85]
>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>>>> o.a.n.processors.flume.ExecuteFlumeSink
>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>> section=7], offset=180436,
>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>> this session (StandardProcessSession[id=218318]); rolling back session:
>>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>> section=7], offset=180436,
>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>> this session (StandardProcessSession[id=218318])
>>>>
>>>> Any idea what could be wrong in this.
>>>>
>>>> Thanks and Regards,
>>>> Parul
>>>>
>>>>
>>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bbende@gmail.com> wrote:
>>>>
>>>>> Hi Parul,
>>>>>
>>>>> I think it would be good to keep the convo going on the users list
>>>>> since there are more people who can offer help there, and also helps
>>>>> everyone learn new solutions.
>>>>>
>>>>> The quick answer though is that NiFi has an ExecuteProcess processor
>>>>> which could execute "tshark -i eth0 -T pdml".
>>>>>
>>>>> There is not currently an XmlToJson processor, so this could be a
>>>>> place where you need a custom processor. For simple cases you can use
an
>>>>> EvaluateXPath processor to extract values from the XML, and then a
>>>>> ReplaceText processor to build a new json document from those extracted
>>>>> values.
>>>>>
>>>>> -Bryan
>>>>>
>>>>>
>>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <
>>>>> parulagrawal14@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Little more to add.....
>>>>>>  I need to keep reading the flowfile till END_TAG is received. i.e.
>>>>>> we may need to concatenate the flowfile data till END_TAG.
>>>>>> and then convert it to json and call PutFile() processor.
>>>>>>
>>>>>> Thanks and Regards,
>>>>>> Parul
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thank you very much again for the guidance provided.
>>>>>>> Basically I would need a processor which would convert XML file
to
>>>>>>> Json.
>>>>>>>
>>>>>>> Currently I have a flume source which is of type "exec" and the
>>>>>>> command used is "tshark -i eth0 -T pdml".
>>>>>>>
>>>>>>> Here Flume source keeps sending data to flume sink. This flow
file
>>>>>>> would be of PDML format.
>>>>>>>
>>>>>>> Now I need a processor which would do the following
>>>>>>>
>>>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>>>> and END TAG (</packet>)
>>>>>>> 2) Once the XML message is formed convert it to json.
>>>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>>>>
>>>>>>> I am not sure if I could able to explain the processor requirement.
>>>>>>> Would be really great if you could help me in this.
>>>>>>>
>>>>>>> Thanks and Regards,
>>>>>>> Parul
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <joey42@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> > If you plan to use NiFi for the long term, it might
be worth
>>>>>>>> investing in converting your custom Flume components to NiFi
processors. We
>>>>>>>> can help you get started if you need any guidance going that
route.
>>>>>>>>
>>>>>>>> +1. Running Flume sources/sinks is meant as a transition
step. It's
>>>>>>>> really useful if you have a complex Flume flow and want to
migrate
>>>>>>>> only parts of it over to NiFi at a time. I would port any
custom
>>>>>>>> sources and sinks to NiFi once you knew that it would meet
your
>>>>>>>> needs
>>>>>>>> well. NiFi has a lot of documentation on writing processors
and the
>>>>>>>> concepts map pretty well if you're already familiar with
Flume's
>>>>>>>> execution model.
>>>>>>>>
>>>>>>>> -Joey
>>>>>>>>
>>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bbende@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > Hi Parul,
>>>>>>>> >
>>>>>>>> > It is possible to deploy a custom Flume source/sink
to NiFi, but
>>>>>>>> due to the way the Flume processors load the classes for
the sources and
>>>>>>>> sinks, the jar you deploy to the lib directory also needs
to include the
>>>>>>>> other dependencies your source/sink needs (or they each need
to
>>>>>>>> individually be in lib/ directly).
>>>>>>>> >
>>>>>>>> > So here is a sample project I created that makes a shaded
jar:
>>>>>>>> > https://github.com/bbende/my-flume-source
>>>>>>>> >
>>>>>>>> > It will contain the custom source and following dependencies
all
>>>>>>>> in one jar:
>>>>>>>> >
>>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>>>> >
>>>>>>>> > I copied that to NiFi lib, restarted, created an
>>>>>>>> ExecuteFlumeSource processor with the following config:
>>>>>>>> >
>>>>>>>> > Source Type = org.apache.flume.MySource
>>>>>>>> > Agent Name = a1
>>>>>>>> > Source Name = r1
>>>>>>>> > Flume Configuration = a1.sources = r1
>>>>>>>> >
>>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>>>> >
>>>>>>>> > Keep in mind that this could become risky because any
classes
>>>>>>>> found in the lib directory would be accessible to all NARs
in NiFi and
>>>>>>>> would be found before classes within a NAR because the parent
is checked
>>>>>>>> first during class loading. This example isn't too risky
because we are
>>>>>>>> only bringing in flume jars and one guava jar, but for example
if another
>>>>>>>> nar uses a different version of guava this is going to cause
a problem.
>>>>>>>> >
>>>>>>>> > If you plan to use NiFi for the long term, it might
be worth
>>>>>>>> investing in converting your custom Flume components to NiFi
processors. We
>>>>>>>> can help you get started if you need any guidance going that
route.
>>>>>>>> >
>>>>>>>> > -Bryan
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>> >>
>>>>>>>> >> Hello Bryan,
>>>>>>>> >>
>>>>>>>> >> Thank you very much for your response.
>>>>>>>> >>
>>>>>>>> >> Is it possible to have customized flume source and
sink in Nifi?
>>>>>>>> >> I have my own customized source and sink? I followed
below steps
>>>>>>>> to add my own customized source but it did not work.
>>>>>>>> >>
>>>>>>>> >> 1) Created Maven project and added customized source.
(flume.jar
>>>>>>>> was created after this step)
>>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>>>> >>
>>>>>>>> >> Property           Value
>>>>>>>> >> Source Type         com.flume.source.Source
>>>>>>>> >> Agent Name      a1
>>>>>>>> >> Source Name         k1.
>>>>>>>> >>
>>>>>>>> >> But I am getting the below error in Flume Source
Processor.
>>>>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError
>>>>>>>> : /org/apache/flume/PollableSource."
>>>>>>>> >>
>>>>>>>> >> Can you please help me in this regard. Any step/configuration
I
>>>>>>>> missed.
>>>>>>>> >>
>>>>>>>> >> Thanks and Regards,
>>>>>>>> >> Parul
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bbende@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Hello,
>>>>>>>> >>>
>>>>>>>> >>> The NiFi Flume processors are for running Flume
sources and
>>>>>>>> sinks with in NiFi. They don't communicate with an external
Flume process.
>>>>>>>> >>>
>>>>>>>> >>> In your example you would need an ExecuteFlumeSource
configured
>>>>>>>> to run the netcat source, connected to a ExecuteFlumeSink
configured with
>>>>>>>> the logger.
>>>>>>>> >>>
>>>>>>>> >>> -Bryan
>>>>>>>> >>>
>>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal
<
>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>> Hi,
>>>>>>>> >>>>
>>>>>>>> >>>> I was trying to run Nifi Flume processor
with the below
>>>>>>>> mentioned
>>>>>>>> >>>> details but not could bring it up.
>>>>>>>> >>>>
>>>>>>>> >>>> I already started flume with the sample
configuration file
>>>>>>>> >>>> =============================================
>>>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>>>> >>>>
>>>>>>>> >>>> # Name the components on this agent
>>>>>>>> >>>> a1.sources = r1
>>>>>>>> >>>> a1.sinks = k1
>>>>>>>> >>>> a1.channels = c1
>>>>>>>> >>>>
>>>>>>>> >>>> # Describe/configure the source
>>>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>>>> >>>>
>>>>>>>> >>>> # Describe the sink
>>>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>>>> >>>>
>>>>>>>> >>>> # Use a channel which buffers events in
memory
>>>>>>>> >>>> a1.channels.c1.type = memory
>>>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>>>> >>>>
>>>>>>>> >>>> # Bind the source and sink to the channel
>>>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>>>> >>>> =============================================
>>>>>>>> >>>>
>>>>>>>> >>>> Command used to start flume : $ bin/flume-ng
agent --conf conf
>>>>>>>> >>>> --conf-file example.conf --name a1
>>>>>>>> -Dflume.root.logger=INFO,console
>>>>>>>> >>>>
>>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink
following
>>>>>>>> configuration was done:
>>>>>>>> >>>> Property           Value
>>>>>>>> >>>> Sink Type         logger
>>>>>>>> >>>> Agent Name      a1
>>>>>>>> >>>> Sink Name         k1.
>>>>>>>> >>>>
>>>>>>>> >>>> Event is sent to the flume using:
>>>>>>>> >>>> $ telnet localhost 44444
>>>>>>>> >>>> Trying 127.0.0.1...
>>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>>>> >>>> Escape character is '^]'.
>>>>>>>> >>>> Hello world! <ENTER>
>>>>>>>> >>>> OK
>>>>>>>> >>>>
>>>>>>>> >>>> But I could not get any data in the nifi
flume processor.
>>>>>>>> Request your
>>>>>>>> >>>> help in this.
>>>>>>>> >>>> Do i need to change the example.conf file
of flume so that
>>>>>>>> Nifi Flume
>>>>>>>> >>>> Sink should get the data.
>>>>>>>> >>>>
>>>>>>>> >>>> Thanks and Regards,
>>>>>>>> >>>> Parul
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> --
>>>>>>>> >>> Sent from Gmail Mobile
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message