nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Madhukar Thota <madhukar.th...@gmail.com>
Subject ExecuteScript Processor Performance
Date Mon, 02 May 2016 14:03:25 GMT
Hi

I am using ExecuteScript Processor( using python/jython script pasted
below) to process http querystring along with useragent parsing. The
processor is very slow and not able to handle heavy load. Lot of them of
getting queued and waiting for the processor to process it. How can i
improve the performance and processing?

Script:

import simplejson as json
import datetime
import time
from org.apache.nifi.processor.io import StreamCallback
from user_agents import parse
import urllib
import urlparse

def query_dict(querystring):
     if not querystring:
         return {}
     query = urllib.unquote(querystring).rstrip()
     query = query.split('&')
     query = [q.split('=') for q in query]
     return dict([(q[0], ' '.join(q[1:])) for q in query])

def starPassword(route):
    parsed = urlparse.urlsplit(route)
    if '@' not in parsed.netloc:
        return route

    userinfo, _, location = parsed.netloc.partition('@')
    username, _, password = userinfo.partition(':')
    if not password:
        return route

    userinfo = ':'.join([username, '*****'])
    netloc = '@'.join([userinfo, location])
    parsed = parsed._replace(netloc=netloc)
    return urlparse.urlunsplit(parsed)


def num(s):
    try:
        return int(s)
    except ValueError:
        try:
            return float(s)
        except ValueError:
            try:
                return s
            except ValueError:
                raise ValueError('argument parsing error')


class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        if flowFile.getAttribute('http.query.string'):
            d = query_dict(flowFile.getAttribute('http.query.string'))
            obj = {'timestamp': ltime,
                       'browser':
str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.family),
                       'browser_version':
str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.version_string),
                       'os':
str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.family),
                       'os_version':
str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.version_string),
                       'client_ip':
flowFile.getAttribute('http.remote.addr')}

            for key in d:
                obj[key.replace(".", "_")] = num(starPassword(d[key]))
            outputStream.write(bytearray(json.dumps(obj, separators=(',',
':'))))
        else:
            pass


flowFile = session.get()
if flowFile is not None:
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, REL_SUCCESS)

Any help is appreciated.

-Madhu

Mime
View raw message