atlas-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nixon Rodrigues (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (ATLAS-1944) NotificationHookConsumer throws exception while shutting down the consumer thread
Date Tue, 01 Aug 2017 11:13:00 GMT

     [ https://issues.apache.org/jira/browse/ATLAS-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Nixon Rodrigues updated ATLAS-1944:
-----------------------------------
    Attachment: ATLAS-1944.1.patch

Background: Stoping NotificationHookConsumer was throwing ConcurrentModificationException
exception while shutting down the consumer thread. The stop method of thread was calling the
kafkacosumer.stop method while in run method of HookConsumer which is busy in consuming the
kafka message by polling kafka server. This simultaneous act of stopping and polling causes
ConcurrentModificationException.

Fix:- 

Added try finally block in run method which wraps the while loop and when loop ends the finally
blocks calls the consumer close method which ensures that close is called after consuming
is ended.
Replaced the ShutdownThread interface which was delaying the shutdown process of consumer
with Runnable interface used earlier.


Reference : - https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Review Request
https://reviews.apache.org/r/61274/diff/1#index_header

> NotificationHookConsumer throws exception while shutting down the consumer thread
> ---------------------------------------------------------------------------------
>
>                 Key: ATLAS-1944
>                 URL: https://issues.apache.org/jira/browse/ATLAS-1944
>             Project: Atlas
>          Issue Type: Bug
>          Components:  atlas-core
>    Affects Versions: 0.9-incubating
>            Reporter: Ayub Pathan
>            Assignee: Ashutosh Mestry
>            Priority: Critical
>             Fix For: 0.9-incubating, 0.8.1-incubating
>
>         Attachments: ATLAS-1944.1.patch, ATLAS-1944.patch
>
>
> NotificationHookConsumer throws below exception while shutting down the consumer thread,
this issue is possibly after this commit.
> https://github.com/apache/incubator-atlas/commit/0e7f8ea4603c858cc295259bbd1a22314b732f62
> CC [~nixonrodrigues]
> {noformat}
> 2017-07-12 01:26:09,743 WARN  - [pool-1-thread-1:] ~ Error stopping service org.apache.atlas.notification.NotificationHookConsumer
(Services:69)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
access
>         at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1467)
>         at org.apache.atlas.kafka.AtlasKafkaConsumer.close(AtlasKafkaConsumer.java:88)
>         at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.stop(NotificationHookConsumer.java:384)
>         at org.apache.atlas.notification.NotificationHookConsumer.stopConsumerThreads(NotificationHookConsumer.java:172)
>         at org.apache.atlas.notification.NotificationHookConsumer.stop(NotificationHookConsumer.java:155)
>         at org.apache.atlas.service.Services.stop(Services.java:67)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366)
>         at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeDestroyMethods(InitDestroyAnnotationBeanPostProcessor.java:325)
>         at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeDestruction(InitDestroyAnnotationBeanPostProcessor.java:154)
>         at org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:253)
>         at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:578)
>         at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:554)
>         at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:961)
>         at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:523)
>         at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingletons(DefaultListableBeanFactory.java:968)
>         at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1033)
>         at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1009)
>         at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:961)
>         at org.springframework.web.context.ContextLoader.closeWebApplicationContext(ContextLoader.java:583)
>         at org.springframework.web.context.ContextLoaderListener.contextDestroyed(ContextLoaderListener.java:116)
>         at org.eclipse.jetty.server.handler.ContextHandler.callContextDestroyed(ContextHandler.java:808)
>         at org.eclipse.jetty.servlet.ServletContextHandler.callContextDestroyed(ServletContextHandler.java:457)
>         at org.eclipse.jetty.server.handler.ContextHandler.doStop(ContextHandler.java:842)
>         at org.eclipse.jetty.servlet.ServletContextHandler.doStop(ServletContextHandler.java:215)
>         at org.eclipse.jetty.webapp.WebAppContext.doStop(WebAppContext.java:529)
>         at org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
>         at org.eclipse.jetty.util.component.ContainerLifeCycle.stop(ContainerLifeCycle.java:143)
>         at org.eclipse.jetty.util.component.ContainerLifeCycle.doStop(ContainerLifeCycle.java:162)
>         at org.eclipse.jetty.server.handler.AbstractHandler.doStop(AbstractHandler.java:73)
>         at org.eclipse.jetty.server.Server.doStop(Server.java:456)
>         at org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message