Glad to hear it is working, Kumiko. Matt Foley's suggestion was very good. 

Andy LoPresto
alopresto@apache.org
alopresto.apache@gmail.com
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69


On Jun 18, 2016, at 14:34, Kumiko Yada <Kumiko.Yada@ds-iq.com> wrote:

Thank you, Andy!  It worked!

 

And also, the workaround “MS SDK routine, in a 'synchronized' method” is working.  I really appreciate everyone for all help!

 

Thanks

Kumiko

 

From: Andy LoPresto [mailto:alopresto@apache.org]
Sent: Friday, June 17, 2016 5:49 PM
To: users@nifi.apache.org
Cc: Kevin Verhoeven <Kevin.Verhoeven@ds-iq.com>; Ki Kang <Ki.Kang@ds-iq.com>
Subject: Re: Custom processor is failing for concurrency

 

Kumiko,

 

I’m not sure if this is a remnant of the email formatting, but it looks like your “throws IOException, CloudException” declaration from the method contract somehow got inserted in the return value before the closing semicolon. I don’t have all the dependencies, so I trust that your API invocations are correct, but the following is valid Java. Try the below:

 

import java.io.IOException;

public final class AzureSDKWrapper {

    private static Object lockAB = new Object();

    public static com.microsoft.rest.ServiceResponse<Void> AzureDLCreateFile(DataLakeStoreAccountManagementClient adlsClient, DataLakeStoreFileSystemManagementClient adlsFileSystemClient, String adlsAccountName, String path, String contents, boolean force) throws IOException, CloudException {
        synchronized (lockAB) {
            byte[] bytesContents = contents.getBytes();

            // Create file with contents
            return adlsFileSystemClient.getFileSystemOperations().create(adlsAccountName, path, bytesContents, force);
        }
    }

}

 

Andy LoPresto

PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

 

On Jun 17, 2016, at 3:20 PM, Kumiko Yada <Kumiko.Yada@ds-iq.com> wrote:

 

Hello,

 

I wrote the wrapper class, but I’m getting the following errors.  How can I wrap this method correctly?

 

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project nifi-azure-dlstore-processors: Compilation failure: Compilation failure:

[ERROR] /E:/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/AzureSDKWrapper.java:[29,130] ';' expected

[ERROR] /E:/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/AzureSDKWrapper.java:[29,138] not a statement

[ERROR] /E:/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/AzureSDKWrapper.java:[29,149] ';' expected

[ERROR] /E:/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/AzureSDKWrapper.java:[29,151] not a statement

[ERROR] /E:/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/AzureSDKWrapper.java:[29,165] ';' expected

 

______

public final class AzureSDKWrapper {

 

                private static Object lockAB = new Object();

 

                public static com.microsoft.rest.ServiceResponse<Void> AzureDLCreateFile(DataLakeStoreAccountManagementClient adlsClient,

                                                                                                                DataLakeStoreFileSystemManagementClient adlsFileSystemClient,

                                                                                                                String adlsAccountName,

                                                                                                                String path, String contents, boolean force) {

                                synchronized(lockAB) {                

                                                byte[] bytesContents = contents.getBytes();                     

                                               

                                                // Create file with contents

                                                return adlsFileSystemClient.getFileSystemOperations().create(adlsAccountName, path, bytesContents, force) throws IOException, CloudException;

                                }

                }

}

 

 

create

com.microsoft.rest.ServiceResponse<Void> create(String accountName,

                                                String directFilePath,

                                                byte[] streamContents,

                                                Boolean overwrite)

                                         throws com.microsoft.azure.CloudException,

                                                IOException,

                                                IllegalArgumentException

Creates a file with optionally specified content.

Parameters:

accountName - The Azure Data Lake Store account to execute filesystem operations on.

directFilePath - The Data Lake Store path (starting with '/') of the file to create.

streamContents - The file contents to include when creating the file. This parameter is optional, resulting in an empty file if not specified.

overwrite - The indication of if the file should be overwritten.

Returns:

the ServiceResponse object if successful.

Throws:

com.microsoft.azure.CloudException - exception thrown from REST call

IOException - exception thrown from serialization/deserialization

IllegalArgumentException - exception thrown from invalid parameters

 

Thanks

Kumiko

 

From: Matt Foley [mailto:mfoley@hortonworks.com] 
Sent: Friday, June 10, 2016 12:53 AM
To: users@nifi.apache.org
Cc: Kevin Verhoeven <Kevin.Verhoeven@ds-iq.com>; Ki Kang <Ki.Kang@ds-iq.com>
Subject: Re: Custom processor is failing for concurrency

 

Hi Kumiko,

since the Azure SDK calls are controlling an external resource (not part of your PutFileAzureDLStore class) in a non-thread-safe way, the safest thing to do is lock on an object of static scope, hence guaranteed to be shared by all threads wherever spawned, rather than "this".  (Using "this" as the lock object is only useful when all the threads are sharing a single instance object referenced by "this".  If the instances of "this" are themselves spawned per-thread, you don't get the desired mutex.)  

 

Furthermore, you may find that you want to make other SDK calls from classes other than PutFileAzureDLStore, and they may interact in a non-thread-safe way too.  Therefore I would wrap all offending calls in a separate class of static methods, callable from any other classes, like this:

 

​// Suppose SDK.methodA and SDK.methodB interact with each other non-thread-safely,

// while SDK.methodC is non-thread-safe but doesn't interact with methodA or methodB.

 

import com.ms.azure.SDK.*;

 

public final class WrapSDK {

 

​private static Object lockAB = new Object();​

private static Object lockC = new Object();​

 

public static <returntypeA> methodA( <signatureA> ... ) {

​synchronized(lockAB) {

​return SDK.methodA( <argvals> ... ); 

​}

​}

 

public static <returntypeB> methodB( <signatureB> ... ) {

​synchronized(lockAB) {

​return SDK.methodB( <argvals> ... ); 

​​}

​}

 

public static <returntypeC> methodC( <signatureC> ... ) {

​synchronized(lockC) {

​return SDK.methodC( <argvals> ... ); 

​​}

​}

}

 

Now, wherever you would have called SDK.methodA(), just call WrapSDK.methodA() instead, with the same args.  All the complexity stays in the wrapper class.

 

This approach requires you to explicitly wrap every method signature you wish to use.  The alternative is complex and almost certainly not worth the effort for your use case.  (See http://stackoverflow.com/questions/14714368/java-reflection-with-object-as-a-parameter for a hint, if you really want to pursue it.)

Hope this helps,

--Matt


From: Kumiko Yada <Kumiko.Yada@ds-iq.com>
Sent: Thursday, June 09, 2016 1:54 PM
To: users@nifi.apache.org
Cc: Kevin Verhoeven; Ki Kang
Subject: RE: Custom processor is failing for concurrency

 

 

The problems are :

Line 209 - SetupClients(creds);

Line 217 - CreateFile(_path + _filename, value.get(), true);

 

To wrap the call in synchronized method, are these looked correct?  Or for the line 217, does the method have to return something?

 

For the line 209:

        synchronized (this) {

            SetupClients(creds);

            if (creds!= null) {

                return creds;

            }

 


For the line 217:

        synchronized {

CreateFile(_path + _filename, value.get(), true);

        }

 

Thanks

Kumiko

 

From: Matt Foley [mailto:mfoley@hortonworks.com] 
Sent: Thursday, June 9, 2016 12:27 PM
To: users@nifi.apache.org
Cc: Kevin Verhoeven <Kevin.Verhoeven@ds-iq.com>; Ki Kang <Ki.Kang@ds-iq.com>
Subject: Re: Custom processor is failing for concurrency

 

​If there are multiple SDK calls that share the same problematic code (that is, multiple SDK methods that would interact with each other in a non-thread-safe way), then one must synchronize their calls to a shared lock object, which only requires a couple more lines of code.


From: Oleg Zhurakousky <ozhurakousky@hortonworks.com>
Sent: Thursday, June 09, 2016 12:15 PM
To: users@nifi.apache.org
Cc: Kevin Verhoeven; Ki Kang
Subject: Re: Custom processor is failing for concurrency

 

+1, was just responding with the same.

 

On Jun 9, 2016, at 3:11 PM, Matt Foley <mfoley@hortonworks.com> wrote:

 

Kumiko, would it be sufficient to just wrap your call to the non-thread-safe MS SDK routine, in a 'synchronized' method?  You could then use the standard NiFi thread management and avoid a lot of complexity.  And the result should be >= efficiency of having a dedicated thread to manage the problem action.

--Matt F


From: Kumiko Yada <Kumiko.Yada@ds-iq.com>
Sent: Thursday, June 09, 2016 11:49 AM
To: users@nifi.apache.org
Cc: Kevin Verhoeven; Ki Kang
Subject: Re: Custom processor is failing for concurrency

 

Hi Bryan,

Does this mean that even I create the multiple threads in onTriger, I will still hit the Microsoft SDK issue where it's not a thread safe?  Sounds like basically what I am trying to do and creating the  multiple threads via UI might be the same thing.  

Thanks
Kumiko 


From: Bryan Bende <bbende@gmail.com>
Sent: Thursday, June 9, 2016 11:26:10 AM
To: users@nifi.apache.org
Cc: Kevin Verhoeven; Ki Kang
Subject: Re: Custom processor is failing for concurrency

 

Kumiko,

 

In general you shouldn't have to create threads in your processors, with the exception of some special cases.

The framework has a thread pool and it takes one of those threads and calls the onTrigger method of your processor.

 

If you want multiple threads to call onTrigger, then each processor has a Concurrent Tasks property in the UI on the scheduling tab, 

which equates to the number of threads that will concurrently call onTrigger.

 

A processor developer needs to only worry about the business logic in the onTrigger method, and needs to ensure 

thread-safe access to any member variables or state stored in the processor.

 

Hope that helps.

 

-Bryan

 

 

On Thu, Jun 9, 2016 at 2:11 PM, Kumiko Yada <Kumiko.Yada@ds-iq.com> wrote:

Microsoft found this is an issue with the SDK, they are working on a fix, they do not have the ETA for the fix.  To workaround this issue, I’m trying to create the multiple threads in using AbstractSessionFactoryProcessor and handle the Create a file in a single thread.   I’m having a problem that the single thread is not working correctly.  The processor is still acting like a single thread.

 

When I create a thread to handle the create a file, do I have to call this method using java.util.concurrent.ExecutorService?

 

Are there any sample processors that I can take a look?

 

Thanks

Kumiko

 

From: Kumiko Yada [mailto:Kumiko.Yada@ds-iq.com] 
Sent: Sunday, June 5, 2016 6:28 PM
To: users@nifi.apache.org
Cc: Ki Kang <Ki.Kang@ds-iq.com>; Kevin Verhoeven <Kevin.Verhoeven@ds-iq.com>
Subject: RE: Custom processor is failing for concurrency

 

Thank you, Bryan.  I’m working with Microsoft on this issue.  Will keep you guys updated.

 

Thanks

Kumiko

 

From: Bryan Bende [mailto:bbende@gmail.com] 
Sent: Friday, June 3, 2016 2:32 PM
To: users@nifi.apache.org
Subject: Re: Custom processor is failing for concurrency

 

It is hard to say for sure, but I think your NiFi processor is generally ok regarding thread safety, but I think there could be a problem in the Azure SDK code...

 

RequestFactory has an instance of BaseUrl and every time RequestFactory.create() is called, it calls BaseUrl.url().

 

The implementation of BaseUrl is the following (according to my IntelliJ attaching the sources...):

 

public class AutoRestBaseUrl implements BaseUrl {
/** A template based URL with variables wrapped in {}s. */
private String template;
/** a mapping from {} wrapped variables in the template and their actual values. */
private Map<CharSequence, String> mappings;

@Override
public HttpUrl url() {
String url = template;
for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {
url = url.replace(entry.getKey(), entry.getValue());
}
mappings.clear();
return HttpUrl.parse(url);
}

/**
* Creates an instance of a template based URL.
*
* @param url the template based URL to use.
*/
public AutoRestBaseUrl(String url) {
this.template = url;
this.mappings = new HashMap<>();
}

/**
* Sets the value for the {} wrapped variables in the template URL.
* @param matcher the {} wrapped variable to replace.
* @param value the value to set for the variable.
*/
public void set(CharSequence matcher, String value) {
this.mappings.put(matcher, value);
}
}

 

The exception is coming from the line where it is looping over the entryset:

 

for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {

 

Right after that loop it calls mappings.clear() so if the RequestFactory is shared by multiple threads (which I think it is), then one thread could be iterating over the set, which another calls mappings.clear().

 

 

On Fri, Jun 3, 2016 at 5:09 PM, Oleg Zhurakousky <ozhurakousky@hortonworks.com> wrote:

Kumiko 

 

It appears that the current state of the source you linked in is not in sync with what is in the stack trace. Perhaps you have made some code modifications (e.g., line 218 is an empty line in code while it has a pointer in the star trace).

In any event, from what I can see the error is coming from Azure libraries (not NiFi). Specifically ‘com.microsoft.rest.AutoRestBaseUrl.url(..)’ seems to be doing some iteration where I presume the remove is called. Perhaps it is not a thread safe class after all. What does Microsoft documentation says? Have you looked at the source to see what’s going on there? If its open please link and we can tale a look.

 

Cheers

Oleg

 

On Jun 3, 2016, at 4:58 PM, Kumiko Yada <Kumiko.Yada@ds-iq.com> wrote:

 

 

Thanks

Kumiko

 

From: Bryan Bende [mailto:bbende@gmail.com
Sent: Friday, June 3, 2016 12:57 PM
To: users@nifi.apache.org
Subject: Re: Custom processor is failing for the custom processor

 

Hello,

 

Would you be able to share your code for PutFileAzureDLStore so we can help identify if there is a concurrency problem?

 

-Bryan

 

On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada <Kumiko.Yada@ds-iq.com> wrote:

Hello,

 

I wrote the following custom service control and processor.  When the custom processor is running concurrently, it’s failing often with several different errors.  Are there any special handlings for concurrently that I need to add in the custom processor?  I wrote the sample Java program which does the same thing as the custom processor (authenticate every time the file is created/create a file, create 2 threads and run concurrently), it’s working fine.  The custom processor also fine when this is not running concurrently.

 

Custom service control – set the properties for the Microsoft Azure Datalake Store

Custom processor – authenticate, then create a file in Microsoft Azure Datalake Store

 

Error1:

2016-06-03 12:29:31,942 INFO [pool-2815-thread-1] c.m.aad.adal4j.AuthenticationAuthority [Correlation ID: 64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful

2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10] n.a.d.processors.PutFileAzureDLStore

java.util.ConcurrentModificationException: null

                at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[na:1.8.0_77]

                at java.util.HashMap$EntryIterator.next(HashMap.java:1463) ~[na:1.8.0_77]

                at java.util.HashMap$EntryIterator.next(HashMap.java:1461) ~[na:1.8.0_77]

                at com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]

                at retrofit2.RequestFactory.create(RequestFactory.java:50) ~[na:na]

                at retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181) ~[na:na]

                at retrofit2.OkHttpCall.execute(OkHttpCall.java:165) ~[na:na]

                at com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432) ~[na:na]

                at nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252) ~[na:na]

                at nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218) ~[na:na]

                at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]

                at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]

                at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]

                at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]

                at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]

                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77]

                at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]

                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_77]

                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_77]

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_77]

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77]

                at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]

 

Error2:

2016-06-03 12:29:24,913 WARN [Timer-Driven Process Thread-5] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due to uncaught Exception: com.microsoft.rest.ServiceException: Invalid status code 403

2016-06-03 12:29:24,915 WARN [Timer-Driven Process Thread-5] o.a.n.c.t.ContinuallyRunProcessorTask

com.microsoft.rest.ServiceException: Invalid status code 403

                at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_77]

                at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_77]

                at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_77]

                at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_77]

                at com.microsoft.rest.ServiceResponseBuilder.build(ServiceResponseBuilder.java:147) ~[na:na]

                at com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.createDelegate(FileSystemOperationsImpl.java:1491) ~[na:na]

                at com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432) ~[na:na]

                at nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252) ~[na:na]

                at nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218) ~[na:na]

                at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]

                at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057) ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]

                at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]

                at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]

                at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]

                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77]

                at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]

                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_77]

                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_77]

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_77]

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77]

 

Thanks

Kumiko