nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Foley <mfo...@hortonworks.com>
Subject Re: Custom processor is failing for concurrency
Date Sun, 19 Jun 2016 05:47:47 GMT
Answering off-line. --Matt

________________________________
From: Kumiko Yada <Kumiko.Yada@ds-iq.com>
Sent: Friday, June 17, 2016 3:20 PM
To: users@nifi.apache.org
Cc: Kevin Verhoeven; Ki Kang
Subject: RE: Custom processor is failing for concurrency

Hello,

I wrote the wrapper class, but I’m getting the following errors.  How can I wrap this method
correctly?

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile
(default-compile) on project nifi-azure-dlstore-processors: Compilation failure: Compilation
failure:
[ERROR] /E:/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/AzureSDKWrapper.java:[29,130]
';' expected
[ERROR] /E:/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/AzureSDKWrapper.java:[29,138]
not a statement
[ERROR] /E:/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/AzureSDKWrapper.java:[29,149]
';' expected
[ERROR] /E:/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/AzureSDKWrapper.java:[29,151]
not a statement
[ERROR] /E:/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/AzureSDKWrapper.java:[29,165]
';' expected

______
public final class AzureSDKWrapper {

                private static Object lockAB = new Object();

                public static com.microsoft.rest.ServiceResponse<Void> AzureDLCreateFile(DataLakeStoreAccountManagementClient
adlsClient,
                                                                                         
                      DataLakeStoreFileSystemManagementClient adlsFileSystemClient,
                                                                                         
                      String adlsAccountName,
                                                                                         
                      String path, String contents, boolean force) {
                                synchronized(lockAB) {
                                                byte[] bytesContents = contents.getBytes();

                                                // Create file with contents
                                                return adlsFileSystemClient.getFileSystemOperations().create(adlsAccountName,
path, bytesContents, force) throws IOException, CloudException;
                                }
                }
}


https://azure.github.io/azure-sdk-for-java/com/microsoft/azure/management/datalake/store/FileSystemOperations.html#create-java.lang.String-java.lang.String-byte:A-java.lang.Boolean-
create
com.microsoft.rest.ServiceResponse<Void<http://docs.oracle.com/javase/8/docs/api/java/lang/Void.html?is-external=true>>
create(String<http://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true>
accountName,
                                                String<http://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true>
directFilePath,
                                                byte[] streamContents,
                                                Boolean<http://docs.oracle.com/javase/8/docs/api/java/lang/Boolean.html?is-external=true>
overwrite)
                                         throws com.microsoft.azure.CloudException,
                                                IOException<http://docs.oracle.com/javase/8/docs/api/java/io/IOException.html?is-external=true>,
                                                IllegalArgumentException<http://docs.oracle.com/javase/8/docs/api/java/lang/IllegalArgumentException.html?is-external=true>
Creates a file with optionally specified content.
Parameters:
accountName - The Azure Data Lake Store account to execute filesystem operations on.
directFilePath - The Data Lake Store path (starting with '/') of the file to create.
streamContents - The file contents to include when creating the file. This parameter is optional,
resulting in an empty file if not specified.
overwrite - The indication of if the file should be overwritten.
Returns:
the ServiceResponse object if successful.
Throws:
com.microsoft.azure.CloudException - exception thrown from REST call
IOException<http://docs.oracle.com/javase/8/docs/api/java/io/IOException.html?is-external=true>
- exception thrown from serialization/deserialization
IllegalArgumentException<http://docs.oracle.com/javase/8/docs/api/java/lang/IllegalArgumentException.html?is-external=true>
- exception thrown from invalid parameters

Thanks
Kumiko

From: Matt Foley [mailto:mfoley@hortonworks.com]
Sent: Friday, June 10, 2016 12:53 AM
To: users@nifi.apache.org
Cc: Kevin Verhoeven <Kevin.Verhoeven@ds-iq.com>; Ki Kang <Ki.Kang@ds-iq.com>
Subject: Re: Custom processor is failing for concurrency


Hi Kumiko,

since the Azure SDK calls are controlling an external resource (not part of your PutFileAzureDLStore
class) in a non-thread-safe way, the safest thing to do is lock on an object of static scope,
hence guaranteed to be shared by all threads wherever spawned, rather than "this".  (Using
"this" as the lock object is only useful when all the threads are sharing a single instance
object referenced by "this".  If the instances of "this" are themselves spawned per-thread,
you don't get the desired mutex.)



Furthermore, you may find that you want to make other SDK calls from classes other than PutFileAzureDLStore,
and they may interact in a non-thread-safe way too.  Therefore I would wrap all offending
calls in a separate class of static methods, callable from any other classes, like this:



​// Suppose SDK.methodA and SDK.methodB interact with each other non-thread-safely,

// while SDK.methodC is non-thread-safe but doesn't interact with methodA or methodB.



import com.ms.azure.SDK.*;



public final class WrapSDK {



​private static Object lockAB = new Object();​

private static Object lockC = new Object();​​



public static <returntypeA> methodA( <signatureA> ... ) {

​synchronized(lockAB) {

​return SDK.methodA( <argvals> ... );
​}
​}


public static <returntypeB> methodB( <signatureB> ... ) {

​synchronized(lockAB) {

​return SDK.methodB( <argvals> ... );
​​}
​}


public static <returntypeC> methodC( <signatureC> ... ) {

​synchronized(lockC) {

​return SDK.methodC( <argvals> ... );
​​}
​}

}



Now, wherever you would have called SDK.methodA(), just call WrapSDK.methodA() instead, with
the same args.  All the complexity stays in the wrapper class.



This approach requires you to explicitly wrap every method signature you wish to use.  The
alternative is complex and almost certainly not worth the effort for your use case.  (See
http://stackoverflow.com/questions/14714368/java-reflection-with-object-as-a-parameter for
a hint, if you really want to pursue it.)

​

Hope this helps,

--Matt

​

________________________________
From: Kumiko Yada <Kumiko.Yada@ds-iq.com<mailto:Kumiko.Yada@ds-iq.com>>
Sent: Thursday, June 09, 2016 1:54 PM
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Cc: Kevin Verhoeven; Ki Kang
Subject: RE: Custom processor is failing for concurrency

Here is the code:  https://github.com/kyada1/PutFileAzureDLStore/blob/master/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/PutFileAzureDLStore.java

The problems are :
Line 209 - SetupClients(creds);
Line 217 - CreateFile(_path + _filename, value.get(), true);

To wrap the call in synchronized method, are these looked correct?  Or for the line 217, does
the method have to return something?

For the line 209:
        synchronized (this) {
            SetupClients(creds);
            if (creds!= null) {
                return creds;
            }


For the line 217:
        synchronized {
CreateFile(_path + _filename, value.get(), true);
        }

Thanks
Kumiko

From: Matt Foley [mailto:mfoley@hortonworks.com]
Sent: Thursday, June 9, 2016 12:27 PM
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Cc: Kevin Verhoeven <Kevin.Verhoeven@ds-iq.com<mailto:Kevin.Verhoeven@ds-iq.com>>;
Ki Kang <Ki.Kang@ds-iq.com<mailto:Ki.Kang@ds-iq.com>>
Subject: Re: Custom processor is failing for concurrency


​If there are multiple SDK calls that share the same problematic code (that is, multiple
SDK methods that would interact with each other in a non-thread-safe way), then one must synchronize
their calls to a shared lock object, which only requires a couple more lines of code.

________________________________
From: Oleg Zhurakousky <ozhurakousky@hortonworks.com<mailto:ozhurakousky@hortonworks.com>>
Sent: Thursday, June 09, 2016 12:15 PM
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Cc: Kevin Verhoeven; Ki Kang
Subject: Re: Custom processor is failing for concurrency

+1, was just responding with the same.

On Jun 9, 2016, at 3:11 PM, Matt Foley <mfoley@hortonworks.com<mailto:mfoley@hortonworks.com>>
wrote:

Kumiko, would it be sufficient to just wrap your call to the non-thread-safe MS SDK routine,
in a 'synchronized' method?  You could then use the standard NiFi thread management and avoid
a lot of complexity.  And the result should be >= efficiency of having a dedicated thread
to manage the problem action.
--Matt F
________________________________
From: Kumiko Yada <Kumiko.Yada@ds-iq.com<mailto:Kumiko.Yada@ds-iq.com>>
Sent: Thursday, June 09, 2016 11:49 AM
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Cc: Kevin Verhoeven; Ki Kang
Subject: Re: Custom processor is failing for concurrency

Hi Bryan,

Does this mean that even I create the multiple threads in onTriger, I will still hit the Microsoft
SDK issue where it's not a thread safe?  Sounds like basically what I am trying to do and
creating the  multiple threads via UI might be the same thing.

Thanks
Kumiko
________________________________
From: Bryan Bende <bbende@gmail.com<mailto:bbende@gmail.com>>
Sent: Thursday, June 9, 2016 11:26:10 AM
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Cc: Kevin Verhoeven; Ki Kang
Subject: Re: Custom processor is failing for concurrency

Kumiko,

In general you shouldn't have to create threads in your processors, with the exception of
some special cases.
The framework has a thread pool and it takes one of those threads and calls the onTrigger
method of your processor.

If you want multiple threads to call onTrigger, then each processor has a Concurrent Tasks
property in the UI on the scheduling tab,
which equates to the number of threads that will concurrently call onTrigger.

A processor developer needs to only worry about the business logic in the onTrigger method,
and needs to ensure
thread-safe access to any member variables or state stored in the processor.

Hope that helps.

-Bryan


On Thu, Jun 9, 2016 at 2:11 PM, Kumiko Yada <Kumiko.Yada@ds-iq.com<mailto:Kumiko.Yada@ds-iq.com>>
wrote:
Microsoft found this is an issue with the SDK, they are working on a fix, they do not have
the ETA for the fix.  To workaround this issue, I’m trying to create the multiple threads
in using AbstractSessionFactoryProcessor and handle the Create a file in a single thread.
  I’m having a problem that the single thread is not working correctly.  The processor is
still acting like a single thread.

When I create a thread to handle the create a file, do I have to call this method using java.util.concurrent.ExecutorService?

Are there any sample processors that I can take a look?

Thanks
Kumiko

From: Kumiko Yada [mailto:Kumiko.Yada@ds-iq.com<mailto:Kumiko.Yada@ds-iq.com>]
Sent: Sunday, June 5, 2016 6:28 PM
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Cc: Ki Kang <Ki.Kang@ds-iq.com<mailto:Ki.Kang@ds-iq.com>>; Kevin Verhoeven <Kevin.Verhoeven@ds-iq.com<mailto:Kevin.Verhoeven@ds-iq.com>>
Subject: RE: Custom processor is failing for concurrency

Thank you, Bryan.  I’m working with Microsoft on this issue.  Will keep you guys updated.

Thanks
Kumiko

From: Bryan Bende [mailto:bbende@gmail.com]
Sent: Friday, June 3, 2016 2:32 PM
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Subject: Re: Custom processor is failing for concurrency

It is hard to say for sure, but I think your NiFi processor is generally ok regarding thread
safety, but I think there could be a problem in the Azure SDK code...

RequestFactory has an instance of BaseUrl and every time RequestFactory.create() is called,
it calls BaseUrl.url().

The implementation of BaseUrl is the following (according to my IntelliJ attaching the sources...):

public class AutoRestBaseUrl implements BaseUrl {
/** A template based URL with variables wrapped in {}s. */
private String template;
/** a mapping from {} wrapped variables in the template and their actual values. */
private Map<CharSequence, String> mappings;

@Override
public HttpUrl url() {
String url = template;
for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {
url = url.replace(entry.getKey(), entry.getValue());
}
mappings.clear();
return HttpUrl.parse(url);
}

/**
* Creates an instance of a template based URL.
*
* @param url the template based URL to use.
*/
public AutoRestBaseUrl(String url) {
this.template = url;
this.mappings = new HashMap<>();
}

/**
* Sets the value for the {} wrapped variables in the template URL.
* @param matcher the {} wrapped variable to replace.
* @param value the value to set for the variable.
*/
public void set(CharSequence matcher, String value) {
this.mappings.put(matcher, value);
}
}

The exception is coming from the line where it is looping over the entryset:

for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {

Right after that loop it calls mappings.clear() so if the RequestFactory is shared by multiple
threads (which I think it is), then one thread could be iterating over the set, which another
calls mappings.clear().


On Fri, Jun 3, 2016 at 5:09 PM, Oleg Zhurakousky <ozhurakousky@hortonworks.com<mailto:ozhurakousky@hortonworks.com>>
wrote:
Kumiko

It appears that the current state of the source you linked in is not in sync with what is
in the stack trace. Perhaps you have made some code modifications (e.g., line 218 is an empty
line in code while it has a pointer in the star trace).
In any event, from what I can see the error is coming from Azure libraries (not NiFi). Specifically
‘com.microsoft.rest.AutoRestBaseUrl.url(..)’ seems to be doing some iteration where I
presume the remove is called. Perhaps it is not a thread safe class after all. What does Microsoft
documentation says? Have you looked at the source to see what’s going on there? If its open
please link and we can tale a look.

Cheers
Oleg

On Jun 3, 2016, at 4:58 PM, Kumiko Yada <Kumiko.Yada@ds-iq.com<mailto:Kumiko.Yada@ds-iq.com>>
wrote:

Here is the code, https://github.com/kyada1/PutFileAzureDLStore.

Thanks
Kumiko

From: Bryan Bende [mailto:bbende@gmail.com]
Sent: Friday, June 3, 2016 12:57 PM
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Subject: Re: Custom processor is failing for the custom processor

Hello,

Would you be able to share your code for PutFileAzureDLStore so we can help identify if there
is a concurrency problem?

-Bryan

On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada <Kumiko.Yada@ds-iq.com<mailto:Kumiko.Yada@ds-iq.com>>
wrote:
Hello,

I wrote the following custom service control and processor.  When the custom processor is
running concurrently, it’s failing often with several different errors.  Are there any special
handlings for concurrently that I need to add in the custom processor?  I wrote the sample
Java program which does the same thing as the custom processor (authenticate every time the
file is created/create a file, create 2 threads and run concurrently), it’s working fine.
 The custom processor also fine when this is not running concurrently.

Custom service control – set the properties for the Microsoft Azure Datalake Store
Custom processor – authenticate, then create a file in Microsoft Azure Datalake Store

Error1:
2016-06-03 12:29:31,942 INFO [pool-2815-thread-1] c.m.aad.adal4j.AuthenticationAuthority [Correlation
ID: 64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful
2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10] n.a.d.processors.PutFileAzureDLStore
java.util.ConcurrentModificationException: null
                at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[na:1.8.0_77]
                at java.util.HashMap$EntryIterator.next(HashMap.java:1463) ~[na:1.8.0_77]
                at java.util.HashMap$EntryIterator.next(HashMap.java:1461) ~[na:1.8.0_77]
                at com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]
                at retrofit2.RequestFactory.create(RequestFactory.java:50) ~[na:na]
                at retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181) ~[na:na]
                at retrofit2.OkHttpCall.execute(OkHttpCall.java:165) ~[na:na]
                at com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
~[na:na]
                at nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
~[na:na]
                at nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
~[na:na]
                at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
                at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
                at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
                at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
                at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_77]
                at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_77]
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_77]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_77]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_77]
                at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]

Error2:
2016-06-03 12:29:24,913 WARN [Timer-Driven Process Thread-5] o.a.n.c.t.ContinuallyRunProcessorTask
Administratively Yielding PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due
to uncaught Exception: com.microsoft.rest.ServiceException: Invalid status code 403
2016-06-03 12:29:24,915 WARN [Timer-Driven Process Thread-5] o.a.n.c.t.ContinuallyRunProcessorTask
com.microsoft.rest.ServiceException: Invalid status code 403
                at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_77]
                at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
~[na:1.8.0_77]
                at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[na:1.8.0_77]
                at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_77]
                at com.microsoft.rest.ServiceResponseBuilder.build(ServiceResponseBuilder.java:147)
~[na:na]
                at com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.createDelegate(FileSystemOperationsImpl.java:1491)
~[na:na]
                at com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
~[na:na]
                at nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
~[na:na]
                at nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
~[na:na]
                at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
                at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
                at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
                at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
                at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_77]
                at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_77]
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_77]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_77]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_77]

Thanks
Kumiko





Mime
View raw message