nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James McMahon <jsmcmah...@gmail.com>
Subject Re: Integrate new flowFile generation out of an ExecuteScript processor running a python script
Date Fri, 09 Feb 2018 00:28:21 GMT
Matt, thanks very much for all your help. I tried to embed the
session.create() and the session.transfer() into the loop that crunches
through the files. The ExecuteScript processor that runs my python script
does not throw any error, but no flowFiles result in the REL_SUCCESS output
queue.
I've replicated my python code here. You can quickly see where I make the
flowFile calls. What have I neglected? My goal is that each time I find a
file that qualifies, a flowFile will be sent out. I have executed a version
of this script from the command line that prints out each file that meets
the filtering criteria, and it does indeed find the set of files I expect -
so I know it gets into that portion of the code where I attempt to create
my flowFiles.

I believe I may be failing to understand either or both of classes I must
import or somehow an initialization of the session context.

Thanks again, Jim

#!/usr/bin/python
from datetime import datetime
from datetime import timedelta
from tempfile import mkstemp
from shutil import move
from os import remove
import os
import time
import calendar
#from org.apache.commons.io import IOUtils
#from java.nio.charset import StandardCharsets
#from org.apache.nifi.processor.io import StreamCallback

configFile="/home/nifi/latest/confCustom/ABC.config"
yearList = ["2018","2017"]
monthList = ["12","11","10","09","08","07","06","05","04","03","02","01"]
dayList = ["31","30",...."01"]

def datetime_from_str(datetime_str):
year = int(datetime_str[:4])
month = int(datetime_str[5:7])
day = int(datetime_str[8:10])
hour = int(datetime_str[11:13])
minute = int(datetime_str[14:16])
second = int(datetime_str[17:19])
microsecond = int(datetime_str[20:])
return datetime(year,month,day,hour,minute,second,microsecond)

if __name__ == "__main__":

#Read key parms that drive the generalized file list from the config
file....
with open(configFile, "r+") as f:
rootDir_line = f.readline().strip()
rootDir = rootDir_line[8:].strip()
lastRunDT_str_line = f.readline().strip()
lastRunDT_str = lastRunDT_str_line[10:].strip()

lastRunDT = satetime_from_str(lastRunDT_str)

# We will use the following last run DT to filter out all files with
# last modified dates that precede or are equal to this last run DT...
lastRunDT_secondsSinceEpoch = time.mktime(lastRunDT.timetuple())

#Recurse through directories beneath rootDir for group ABC...
for years in yearList:
for months in monthList:
for days in dayList:
path = "".join(["/mnt/stuff/",rootDir,"/",years,"/",months,"/",days,"/"])
if os.path.isdir(path):
for root, directories, filenames in os.walk(path):
for directory in directories:
pass
for filename in filenames:
fqfname = os.path.join(root,filename)
file_mod_time = os.stat(fqfname).st_mtime
if (file_mod_time > lastRunDT_secondsSinceEpoch):
head, tail = os.path.split(fqfname)
*flowFile = session.create()*
* flowFile = session.putAttribute(flowFile,"absolute.path",head)*
* flowFile = session.putAttribute(flowFile,"filename",tail)*
* session.transfer(flowFile,REL_SUCCESS)*
else:
pass
else:
pass

On Wed, Feb 7, 2018 at 3:33 PM, Matt Burgess <mattyb149@apache.org> wrote:

> Jim,
>
> You can use session.create() to create a new FlowFile from within your
> script. You don't need a parent, or to transfer input->output, or even
> write any content to the output FlowFile for your use case. After
> flowFile = session.create(), you can do flowFile =
> session.putAttribute(flowFile, "absolute.path", "/path/to/my/file")
> and then transfer the flowFile(s) to success at the end.
>
> Did I understand your use case correctly? If not please let me know
> and I'll try again :)
>
> Regards,
> Matt
>
>
> On Wed, Feb 7, 2018 at 2:51 PM, James McMahon <jsmcmahon3@gmail.com>
> wrote:
> > Good afternoon. I have a python script I will execute from an
> ExecuteScript
> > processor. This script applies complex filtering logic to select certain
> > files from a recursive directory structure. In my filtering loop, I will
> > have the fully qualified path and the filename. The fq path I intend to
> use
> > as absolute.path attribute in a flowFile, and the filename I intend to
> use
> > as filename attribute. My flowFiles will be passed from ExecuteScript to
> a
> > FetchFile processor.
> >
> > My question is this: how do I integrate the flowFile creation into my
> python
> > script, creating an empty flowFile and setting the attributes at that
> point
> > where I have the qualified file and metadata in my loop? I won't really
> have
> > an input stream - I'll work through my loop until certain conditions
> trigger
> > a break. As I find qualified files, I'll create a flowFile with no
> payload
> > but with attributes that FetchFile requires to bring the file into my
> > workflow.
> >
> > Thank you. -Jim
>

Mime
View raw message