nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From olegz <...@git.apache.org>
Subject [GitHub] nifi pull request: NIFI-1192 added support for dynamic properties ...
Date Mon, 23 Nov 2015 20:16:35 GMT
GitHub user olegz opened a pull request:

    https://github.com/apache/nifi/pull/131

    NIFI-1192 added support for dynamic properties to GetKafka

    Due to the fact that current component uses artificial names for properties set via UI
and then maps those properties to the actual names used by Kafka, we can not rely on NiFi
UI to display an error if user attempts to set a dynamic property which will eventually map
to the same Kafka property. So, I’ve decided that any dynamic property will simply override
an existing property with WARNING message displayed. It is actually consistent with how Kafka
does it and displayed the overrides in the console. Updated the relevant annotation description.
    It is also worth to mentioned that current code was using an old property from Kafka 0.7
(“zk.connectiontimeout.ms”) which is no longer present in Kafka 0.8 (WARN Timer-Driven
Process Thread-7 utils.VerifiableProperties:83 - Property zk.connectiontimeout.ms is not valid).
The add/override strategy would provide for more flexibility when dealing with Kafka volatile
configuration until things will settle down and we can get some sensible defaults in place.
    
    While doing it addressed the following issues that were discovered while making modification
and testing:
    ISSUE: When GetKafka started and there are no messages in Kafka topic the onTrigger(..)
method would block due to the fact that Kafka’s ConsumerIterator.hasNext() blocks. When
attempt was made to stop GetKafka would stops successfully due to the interrupt. However in
UI it would appear as ERROR based on the fact that InterruptException was not handled.
    RESOLUTION: After discussing it with @markap14 the the general desire is to let the task
exit as quick as possible and that the whole thread maintenance logic was there initially
due to the fact that there was no way to tell Kafka consumer to return immediately if there
are no events. In this patch we are now using ‘consumer.timeout.ms’ property of Kafka
and setting its value to 1 millisecond (default is -1 - always block infinitely). This ensures
that tasks that attempted to read an empty topic will exit immediately just to be rescheduled
by NiFi based on user configurations.
    
    ISSUE:  Kafka would not release FlowFile with events if it didn’t have enough to complete
the batch since it would block waiting for more messages (based on the blocking issue described
above).
    RESOLUTION: The invocation of hasNext() results in Kafka’s ConsumerTimeoutException
which is handled in the catch block where the FlowFile with partial batch will be released
to success. Not sure if we need to put a WARN message. In fact in my opinion we should not
as it may create unnecessary confusion.
    
    ISSUE: When configuring a consumer for topic and specifying multiple concurrent consumers
in ‘topicCountMap’ based on 'context.getMaxConcurrentTasks()’ each consumer would bind
to a topic partition. If you have less partitions then the value returned by 'context.getMaxConcurrentTasks()’
you would essentially allocate Kafka resources that would never get a chance to receive a
single message  (see more here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
    RESOLUTION: Logic was added to determine the amount of partitions for a topic and in the
event where 'context.getMaxConcurrentTasks()’ value is greater than the amount of partitions,
the partition count will be used to when creating ‘topicCountMap’ and WARNING message
will be displayed)see code). Unfortunately we can’t do anything with the actual tasks, but
based on current state of the code they will exit immediately just to be rescheduled where
the process will repeat. NOTE: That is not ideal as it will be rescheduling tasks that will
never have a chance to do anything, but at least it could be fixed on the user side after
reading the warning message.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/olegz/nifi NIFI-1192B

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #131
    
----
commit f9d1b2811a08c372baf660a8a5e8d0f73e1a23a2
Author: Oleg Zhurakousky <oleg@suitcase.io>
Date:   2015-11-23T20:15:03Z

    NIFI-1192 added support for dynamic properties to GetKafka
    Due to the fact that current component uses artificial names for properties set via UI
and then maps those properties to the actual names used by Kafka, we can not rely on NiFi
UI to display an error if user attempts to set a dynamic property which will eventually map
to the same Kafka property. So, I’ve decided that any dynamic property will simply override
an existing property with WARNING message displayed. It is actually consistent with how Kafka
does it and displayed the overrides in the console. Updated the relevant annotation description.
    It is also worth to mentioned that current code was using an old property from Kafka 0.7
(“zk.connectiontimeout.ms”) which is no longer present in Kafka 0.8 (WARN Timer-Driven
Process Thread-7 utils.VerifiableProperties:83 - Property zk.connectiontimeout.ms is not valid).
The add/override strategy would provide for more flexibility when dealing with Kafka volatile
configuration until things will settle down and we can get some sensible defaults in place.
    
    While doing it addressed the following issues that were discovered while making modification
and testing:
    ISSUE: When GetKafka started and there are no messages in Kafka topic the onTrigger(..)
method would block due to the fact that Kafka’s ConsumerIterator.hasNext() blocks. When
attempt was made to stop GetKafka would stops successfully due to the interrupt. However in
UI it would appear as ERROR based on the fact that InterruptException was not handled.
    RESOLUTION: After discussing it with @markap14 the the general desire is to let the task
exit as quick as possible and that the whole thread maintenance logic was there initially
due to the fact that there was no way to tell Kafka consumer to return immediately if there
are no events. In this patch we are now using ‘consumer.timeout.ms’ property of Kafka
and setting its value to 1 millisecond (default is -1 - always block infinitely). This ensures
that tasks that attempted to read an empty topic will exit immediately just to be rescheduled
by NiFi based on user configurations.
    
    ISSUE:  Kafka would not release FlowFile with events if it didn’t have enough to complete
the batch since it would block waiting for more messages (based on the blocking issue described
above).
    RESOLUTION: The invocation of hasNext() results in Kafka’s ConsumerTimeoutException
which is handled in the catch block where the FlowFile with partial batch will be released
to success. Not sure if we need to put a WARN message. In fact in my opinion we should not
as it may create unnecessary confusion.
    
    ISSUE: When configuring a consumer for topic and specifying multiple concurrent consumers
in ‘topicCountMap’ based on 'context.getMaxConcurrentTasks()’ each consumer would bind
to a topic partition. If you have less partitions then the value returned by 'context.getMaxConcurrentTasks()’
you would essentially allocate Kafka resources that would never get a chance to receive a
single message  (see more here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
    RESOLUTION: Logic was added to determine the amount of partitions for a topic and in the
event where 'context.getMaxConcurrentTasks()’ value is greater than the amount of partitions,
the partition count will be used to when creating ‘topicCountMap’ and WARNING message
will be displayed)see code). Unfortunately we can’t do anything with the actual tasks, but
based on current state of the code they will exit immediately just to be rescheduled where
the process will repeat. NOTE: That is not ideal as it will be rescheduling tasks that will
never have a chance to do anything, but at least it could be fixed on the user side after
reading the warning message.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message