nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Scott Wagner <swag...@beenverified.com>
Subject Re: Replace flowfile contents from InvokeScriptedProcessor?
Date Mon, 08 May 2017 15:09:09 GMT
James,

     Please find below a script that takes all of the attribute from a 
FlowFile, and then iterates the content of the input flowfile, and calls 
python's format() method on the entire content and replaces it with the 
output.  The use case for this is I can make a FlowFile with content 
that contains references to attribute names in the content, and this 
processor will replace those strings with the actual attributes 
themselves using python's native dictionary formatting methods:

import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil

class TransformCallback(StreamCallback):
     def __init__(self, flowFile):
         self.flowFile = flowFile

     def process(self, inputStream, outputStream):
         try:
             attrs = self.flowFile.getAttributes()
             pf = FileUtil().wrap(inputStream)
             output = []
             for line in pf.readlines():
                 line = line.format(**attrs).rstrip('\n')
                 output.append(line)
             outputStream.write('\n'.join(output))
         except:
             traceback.print_exc(file=sys.stdout)
             raise


flowFiles = session.get(10)
for flowFile in flowFiles:
     if flowFile is None:
         continue
     flowFile = session.write(flowFile, TransformCallback(flowFile))
     session.transfer(flowFile, REL_SUCCESS)

My real-world usage of this is to have a file on disk that contains a 
set of SQL queries that will need to be executed and reference the 
attribute names in that flat file, then run it through this processor to 
do the string replacements for me.  Here's a sample of what the input 
would look like:

DROP TABLE IF EXISTS {output_table_name}
;
CREATE TABLE {output_table_name} AS
   SELECT
     am.some_id AS {col_some_id},
     am.some_other_id AS {col_some_other_id},
     input.*
   FROM
     {table_name} input
     LEFT OUTER JOIN some_other_table sot
       ON sot.data_hash = 
f_hash_data(substring(input.{col_data_element_1}, 1, 70), 
substring(input.{col_data_element_2}, 1, 32), 
substring(input.{col_data_element_3}, 1, 2), input.{col_data_element_4}, 
input.{col_data_element_5})

When a FlowFile is created, I just make sure that the attributes that I 
need are populated correctly and then the SQL script will have the 
references to the correct column names in it.

Hope that helps,
Scott
> James McMahon <mailto:jsmcmahon3@gmail.com>
> Friday, May 5, 2017 1:42 PM
> Matt, here is what I have tried. No error is thrown, but I can't 
> reference attribute ABCD in the callback function. What am I 
> misunderstanding?  -Jim
>
> class PyStreamCallback(StreamCallback) :
>      def __init__(self, attrs):
>           self.attrs = attrs
>      def process(self,inputStream, outputStream):
>           
> outputStream.write(Unicode(json.dumps(self.getAttribute('ABCDE')))
>
> class UpdateAttributes(Processor) :
>
>      def __init__(self) :
>           self.result = {}
>           self.__rel_success = 
> Relationship.Builder().name("success".description("Success").build()
>
>      ( I def an initialize, getRelationships, validate, 
> getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified)
>
>      def onTrigger(self, context, sessionFactory):
>           session = sessionFactory.createSession()
>           try :
>                flowfiles = session.get(20)
>                     for flowfile in flowfiles :
>                          if flowfile is none :
>                               return
>
>                          flowfile = session.putAttribute(flowfile, 
> "ABCDE", "LARRY_CURLEY_MOE")
>
>                          flowfile = 
> session.write(flowfile,PyStreamCallback(PySet(flowfile.getAttributes())))
>
>                          session.transfer(flowfile, self.__rel_success)
>                  session.commit()
>           except:
>                 session.rollback()
>                 raise
>
> processor = UpdateAttributes()
>
>
> Matt Burgess <mailto:mattyb149@apache.org>
> Tuesday, May 2, 2017 12:26 PM
> If you want to use attributes inside the callback, I recommend 
> building a dictionary from flowfile.getAttributes() and passing that 
> into the PyStreamCallback constructor:
>
> class PyStreamCallback(StreamCallback):
>     def __init__(self, attrs):
>         self.attrs = attrs
>
>
> # ... later on ...
> session.write(flowfile, PyStreamCallback(PySet(flowfile.getAttributes())))
>
>  Or something like that.  I thought I'd seen an example somewhere but 
> I can't find it.
>
> Regards,
> Matt
>
>
> Andy LoPresto <mailto:alopresto@apache.org>
> Tuesday, May 2, 2017 12:06 PM
> I am not a Python expert, but if you set “self.result[“x”]” in one 
> class, can you reference it in a separate class? What is the exception 
> you are getting?
>
> Andy LoPresto
> alopresto@apache.org <mailto:alopresto@apache.org>
> /alopresto.apache@gmail.com <mailto:alopresto.apache@gmail.com>/
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
>
> James McMahon <mailto:jsmcmahon3@gmail.com>
> Tuesday, May 2, 2017 12:01 PM
> Thanks for your reply Matt. I think this cuts to the heart of my 
> failure here. I have tried treating PyStreamCallback() as a second 
> class in my python file, like so:
>
> class PyStreamCallback(StreamCallback) :
>      def __init__(self):
>           pass
>      def process(self, inputStream, outputStream) :
>           
> outputStream.write(Unicode(json.dumps(self.result['thisThing'])))
>
> classUpdateAttributes(Processor) :
>      def __init__(self) :
>           self.result{}
>           self.__rel_success = Relationship......etc etc
> .
> .
> .
>      def onTrigger(self, context, sessionFactory):
> .
> .
> .
>           self.result['thisThing'] = flowfile.getAttribute("s3.key")
>           # this fails:    flowfile = 
> session.write(flowfile,PyStreamCallback())
>           # this fails too:  flowfile = 
> session.write(self,flowfile,PyStreamCallback())
>
> Am I mistaken to configure PyStreamCallback as a second 
> independent class? Should it be a defined method within class 
> UpdateAttributes() ?
>
>
> Matt Burgess <mailto:mattyb149@apache.org>
> Tuesday, May 2, 2017 11:45 AM
> Jim, you still can/should use something like a PyStreamCallback().  
> ExecuteScript is basically an onTrigger() body, so you can use the 
> same approach inside your onTrigger() body in 
> InvokeScriptedProcessor.  Pass an instance of your PyStreamCallback 
> into something like:
>
> flowfile = session.write(flowfile, PyStreamCallback())
>
> at some point before you transfer the flow file. If you need 
> variables/data from outside the PyStreamCallback() inside, you can 
> pass them into the constructor or (more dangerously) mess with the 
> scope of the variable(s).
>
> Regards,
> Matt
>
>
>


Mime
View raw message