nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Carlos Manuel Fernandes (DSI)" <carlos.antonio.fernan...@cgd.pt>
Subject RE: Dynamic Mapping
Date Thu, 04 Oct 2018 18:32:12 GMT
Hi Ryan,

If I understand well, you have a jolt map specification for each user. In that case you can
put  the relation guid, map_spec  in some table of a external database.
After that in your flow  before JoltTransformJSON you create a custom processor to read the
configuration from the table and transform the map_sec in an attribute  you can put in jolt
specification property of JoltTransformJSON   processor,  using expression language.

I created a simple general processor for read from a relational database and transform the
fields in attributes , I can  share with  you:


[cid:image001.png@01D45C17.2272F770]

In  pdaPool property you put the name of you DBCPConnectionPool.

The code of custom processor named executeSqlOnFlowV1.grovy:

import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import java.sql.DriverManager
import groovy.sql.Sql


def getConnFromPool(dbServiceName) {
       def lookup = context.controllerServiceLookup
       def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find
{ cs ->
             lookup.getControllerServiceName(cs) == dbServiceName
       }
       def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
       return conn
}


def writeFlowFile(flowFile, str) {
       flowFile = session.write(flowFile, {
             outputStream ->
             str.eachLine { line ->
                    outputStream.write(line.bytes)
                    outputStream.write('\n'.bytes)
             }
       } as OutputStreamCallback)

       return flowFile
}

//Main Program
def  pda,stmt,rows, cols=[]
def flowFile = session.get()
if (!flowFile) return

try {
       //create a DB Connection from Pool
       pda=new Sql( getConnFromPool(pdaPool.value))
       stmt=query.evaluateAttributeExpressions(flowFile).value.toString()

       log.info("executeQuery:${stmt}")

       //create new attributes which names are the columns names and the values are the values
of the columns
       pda.firstRow(stmt, {meta-> (1..meta.columnCount).each {cols << meta.getColumnName(it)}
})
       {
             cols.each {col->
               flowFile=session.putAttribute(flowFile,col,"${it.getAt(col)}")
             }
       }

       session.transfer(flowFile, REL_SUCCESS)

}
catch(Exception e) {
       log.error('executeQuery: error:', e)
       flowFile=writeFlowFile(flowFile,"Error executing query\n${e}\n${e.getStackTrace()}")
       session.transfer(flowFile, REL_FAILURE)
}
finally {
       pda?.close()
}


I hope this help.

Carlos


From: Ryan H [mailto:ryan.howell.development@gmail.com]
Sent: quinta-feira, 4 de outubro de 2018 18:22
To: users@nifi.apache.org
Subject: Dynamic Mapping

Hi All,

I have been working on an integration between two systems with NiFi in the middle as the integration
point. Basically when an event happens on System-A, such as a record update, those changes
need to be propagated to System-B. For the first use case, I have set up a data flow that
listens for incoming requests from System-A, performs a mapping, the sends the mapped data
to System-B.

Generalized Flow for "Create_Event" (dumbed down significantly):
System-A "Create_Event" -> HandleHTTPRequest -> JoltTransformJSON -> InvokeHTTP ->
System-B "Create_Event"

This works great for the first case with a predefined mapping in JoltTransformJSON. Now I
want to generalize it so that the same data flow can be used for all Create_Event's on System-A.

Here is where the issue comes in. There is a base schema for System-A that has a base mapping
to the base schema in System-B. Users of the System have the ability to "extend" the base
schema to add/remove fields and modify the base mapping. So each time the Create_Event happens,
the mapping that is used should be the unique mapping spec associated to that user (call it
a GUID that comes along with the request).

The data flow is the exact same for all Create_Events, except for the mapping, which will
be unique to the user.

Does anyone know of a way to load up a different mapping to be used on a per-request basis?
I used JoltTransformJSON just as a proof of concept, so it does not need to be a Jolt spec
and can be modified to meet the needs of whatever would work for this.

I started to look into schema registry, but kind of got lost a bit and wasn't sure if it could
be applied to this situation.

Any Help is, as always, greatly appreciated.


Cheers,

Ryan H.
Mime
View raw message