nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Giovanni Lanzani" <>
Subject Re: A bag of groovy questions regarding the ExecuteScript processor
Date Wed, 04 Oct 2017 21:36:08 GMT
Hi Matt,

Thanks for the answers.

session.get(N).each) Good to know, I thought a roll-back was inevitable 
with uncatched exceptions;

ScriptTester) Since you're here: I've could only get the script to 
download when adding this to the `repositories` in the `.build`

     maven {
         url ''

Is that how it's supposed to work?

fatJar) I've actually saw that with Gradle you can easily do something 
like this

shadowJar {
    dependencies {

That way the fat jar will be much smaller but still executable by NiFi. 
Without that a 15kb jar ends up being a 8mb fat jar.

on-the-fly-reload) I'd rather hack the API that doing that :) Are there 
any pointers/examples for this `InvokeScriptedProcessor`? It seems all 
rather new and esoteric by looking at its docs.



On 4 Oct 2017, at 18:33, Matt Burgess wrote:

> Giovanni,
> I second all of Andy's answers, they are spot-on. For the each()
> construct, they are "safe" in the sense that you will be working with
> one flow file at a time, but remember that there is only one
> "session". If you throw an Exception from inside the each(), then it
> will be caught by ExecuteScript (if not caught by your script), and
> the entire session will be rolled back. You are probably better off
> with the approach you outlined where you wrap the logic in the
> try/catch and route to success/failure accordingly... unless an error
> indicates a "retry all", then a rollback is likely what you want.
> For the ScriptTester, I haven't yet added support for setting
> attributes on incoming flow file(s), I am trying to think of a clean
> way to allow them for arbitrary flow files such as when the --input
> switch is specified. Suggestions are welcome :) For the first go-round
> I might allow something such that attributes would be added to all
> flow files, or at least for one coming in via STDIN.
> For the single fat/shaded JAR, you can certainly do things that way,
> but if you are using Groovy, Clojure, or Javascript/Nashorn, you can
> put all the JARs in a single directory (not nested!) and just add the
> directory to your Module Directory property. That might save you a
> build/package step. Doesn't help with reloading though.
> For the on-the-fly reload of an updated fat JAR, you could (at the
> expense of performance) have the script load the JAR. At that point
> you'd probably be better served with InvokeScriptedProcessor so you
> could add a FileWatcher at startup, and reload the JAR from a separate
> thread when changes are detected. In either case I believe you'd be
> looking at creating a URLClassLoader with your fat JAR as the only
> URL, and the current ClassLoader as its parent. Then you can set the
> Thread's context classloader to the new one, and/or you may need to do
> some more classloading voodoo.
> Not sure if I covered all your questions/comments, but if not please
> let me know and I will try again :)
> Regards,
> Matt
> On Wed, Oct 4, 2017 at 3:18 AM, Giovanni Lanzani
> <> wrote:
>> Hi Andy,
>> That's very helpful, thanks! Inline my comments, waiting for Matt to 
>> come
>> home :)
>> On 3 Oct 2017, at 22:44, Andy LoPresto wrote:
>> Giovanni,
>> A lot of great questions here. I’ll try to go through them but I 
>> hope Matt
>> weighs in as well (he is on vacation for the next few days though).
>> * The only time I am aware the Jars are reloaded is at processor 
>> restart (I
>> believe this is the same for the script content if defined by a 
>> referenced
>> file as well). The scriptingComponentHelper setup*() methods execute 
>> inside
>> ExecuteScript#setup(), which has @OnScheduled annotation [1].
>> Is there anyone that has written sort of script (I don't know if it 
>> is
>> possible) to query the NiFi API for all the (Groovy ExecuteScript)
>> processors using a particular module directory (we plan to use a 
>> single one
>> for everything), so that I could add a new step, after the shadowJar
>> deployment, that restarts all of them?
>> I imagine this would be a fairly common use case. We're I'm currently
>> working we have the following workflow:
>> Have a single jar with all the code that the groovy scripts will 
>> need;
>> The groovy scripts will use that code with minimal boilerplate around 
>> it, so
>> all the (non-NiFi) related code is in the jar. This makes it very 
>> easy to
>> test the logic in the jar. We added some extra code to ensure the 
>> functions
>> that the groovy scripts will call are "NiFi compatible" (right now 
>> it's just
>> .getBytes(StandardCharsets.UTF_8)) We don't use Matt framework 
>> because we
>> need incoming flowFile to have attributes, and I couldn't figure out 
>> how to
>> do it :)
>> NiFi has a flow to fetch new master updates on the repo and compile 
>> the
>> (fat) jar as a result. However we would need to restart the 
>> ExecuteScript
>> processors by hand and... no/no? :) A script would help greatly here 
>> (if
>> nobody has one, I will dig into the API to see what's possible. I 
>> might just
>> parse the whole xml file if there's no way to do so via the API;
>> * I’m not sure how other users bundle their dependencies, but 
>> shadow Jars
>> would be fine for this use case, and Matt has referenced using them 
>> in his
>> script-tester article [2].
>> * Yes, while there are small idiosyncrasies with each language 
>> flavor, the
>> NiFi-related domain is fairly consistent. In this case, iterating 
>> over a
>> number of flowfiles for processing in a single Groovy script is fine.
>> Session.get(int) [3] is delegated to ProcessSession and returns
>> List<FlowFile>, so you can use any of the Groovy collections methods 
>> over
>> it.
>> So what happens in this case
>> def n = 0
>> session.get(N).each{ flowFile ->
>> if(n ==0) {
>> //do something
>> } else {
>> throw Exception
>> }
>> session.transfer(flowFile, REL_SUCCESS)
>> n += 1
>> }
>> Will the first flowFile be successfully transferred or will a 
>> rollback
>> happen? (Note: I usually wrap the logic in try/catch and then, based 
>> on the
>> result, transfer the file to REL_SUCCESS/REL_FAILURE
>> Thanks again,
>> Giovanni
>> Hopefully this helps you and if Matt or anyone else sees a mistake, 
>> they
>> correct it and add their thoughts. Thanks.
>> [1]
>> [2]
>> <>
>> [3]
>> <>
>> Andy LoPresto
>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69
>> On Oct 3, 2017, at 1:09 PM, Giovanni Lanzani
>> <> wrote:
>> I apologize if this is specified elsewhere, but I couldn't find it.
>> I was wondering when the jars, used by a particular Groovy script (in 
>> the
>> ExecuteScript processor), are reloaded. I.e. if one jar is updated, 
>> when
>> will the script pick up the new version? I know that upon restarting 
>> the
>> processor, the updated jar is considered, but I was wondering in 
>> which other
>> occasions that happens;
>> Do people tend to use fat (shadow) jars for this sort of jars 
>> referenced by
>> groovy scripts? I don't think it makes sense to keep track of all the
>> dependencies manually otherwise;
>> When using the {P,J}ython processor, I read Matt advice to use the 
>> following
>> construct in the script:
>> for flowFile in session.get(N):
>> if flowFile:
>> # do your thing here
>> Does the same hold for Groovy, i.e. should someone do
>> session.get(N).each{ flowFile ->
>> // do your thing here
>> if(condition) {
>> session.transfer(flowFile, REL_SUCCESS)
>> } else {
>> session.transfer(flowFile, REL_FAILURE)}
>> }
>> Is this approach safe in groovy inside a each? Or is this approach 
>> not
>> needed at all in Groovy, while it is needed in {P,J}ython?
>> Thanks in advance!
>> Giovanni

View raw message