knox-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smol...@apache.org
Subject [knox] branch master updated: KNOX-2186 - Advanced service discovery configuration handling (#238)
Date Fri, 17 Jan 2020 15:35:02 GMT
This is an automated email from the ASF dual-hosted git repository.

smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new 69b08af  KNOX-2186 - Advanced service discovery configuration handling (#238)
69b08af is described below

commit 69b08afa9080c58ea83ee331f1e6e6b3c4be1392
Author: Sandor Molnar <smolnar@apache.org>
AuthorDate: Fri Jan 17 16:34:53 2020 +0100

    KNOX-2186 - Advanced service discovery configuration handling (#238)
    
    * KNOX-2186 - Added support for services without url/version/parameters
    
    * KNOX-2186 - Advanced service discovery configuration handling
---
 .../ClouderaManagerIntegrationMessages.java        |  6 ++
 .../ClouderaManagerDescriptorMonitor.java          | 25 ++++--
 .../ClouderaManagerDescriptorParser.java           | 94 +++++++++++++++-------
 ...vanceServiceDiscoveryConfigurationMessages.java | 36 +++++++++
 .../advanced/AdvancedServiceDiscoveryConfig.java   | 72 +++++++++++++++++
 ...vancedServiceDiscoveryConfigChangeListener.java | 28 +++++++
 ...vancedServiceDiscoveryConfigurationMonitor.java | 91 +++++++++++++++++++++
 .../ClouderaManagerDescriptorParserTest.java       | 61 ++++++++++++--
 .../src/test/resources/testDescriptor.xml          |  7 +-
 ...tDescriptorConfigurationWithWrongDescriptor.xml |  2 +-
 .../org/apache/knox/gateway/GatewayServer.java     |  8 +-
 .../gateway/config/impl/GatewayConfigImpl.java     |  7 ++
 .../apache/knox/gateway/config/GatewayConfig.java  |  5 ++
 .../org/apache/knox/gateway/GatewayTestConfig.java |  5 ++
 .../topology/simple/SimpleDescriptorImpl.java      |  5 ++
 15 files changed, 404 insertions(+), 48 deletions(-)

diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/ClouderaManagerIntegrationMessages.java
b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/ClouderaManagerIntegrationMessages.java
index 1312bc8..4909533 100644
--- a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/ClouderaManagerIntegrationMessages.java
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/ClouderaManagerIntegrationMessages.java
@@ -44,4 +44,10 @@ public interface ClouderaManagerIntegrationMessages {
 
   @Message(level = MessageLevel.ERROR, text = "Error while producing Knox descriptor: {0}")
   void failedToProduceKnoxDescriptor(String errorMessage, @StackTrace(level = MessageLevel.DEBUG)
Exception e);
+
+  @Message(level = MessageLevel.WARN, text = "Service {0} is disabled. It will NOT be added
in {1}")
+  void serviceDisabled(String serviceName, String descriptorName);
+
+  @Message(level = MessageLevel.INFO, text = "Updated advanced service discovery configuration.")
+  void updatedAdvanceServiceDiscoverytConfiguration();
 }
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorMonitor.java
b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorMonitor.java
index a727839..efc1471 100644
--- a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorMonitor.java
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorMonitor.java
@@ -24,6 +24,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.attribute.FileTime;
+import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -34,22 +35,25 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.knox.gateway.ClouderaManagerIntegrationMessages;
 import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.topology.discovery.advanced.AdvancedServiceDiscoveryConfigChangeListener;
 import org.apache.knox.gateway.util.JsonUtils;
 
 /**
  * Monitoring KNOX_DESCRIPTOR_DIR for *.cm files - which is a Hadoop XML configuration -
and processing those files if they were modified
  * since the last time it they were processed
  */
-public class ClouderaManagerDescriptorMonitor {
+public class ClouderaManagerDescriptorMonitor implements AdvancedServiceDiscoveryConfigChangeListener
{
 
   private static final String CM_DESCRIPTOR_FILE_EXTENSION = ".cm";
   private static final ClouderaManagerIntegrationMessages LOG = MessagesFactory.get(ClouderaManagerIntegrationMessages.class);
   private final String descriptorsDir;
   private final long monitoringInterval;
   private final ScheduledExecutorService executorService;
+  private final ClouderaManagerDescriptorParser cmDescriptorParser;
   private FileTime lastReloadTime;
 
-  public ClouderaManagerDescriptorMonitor(GatewayConfig gatewayConfig) {
+  public ClouderaManagerDescriptorMonitor(GatewayConfig gatewayConfig, ClouderaManagerDescriptorParser
cmDescriptorParser) {
+    this.cmDescriptorParser = cmDescriptorParser;
     this.descriptorsDir = gatewayConfig.getGatewayDescriptorsDir();
     this.monitoringInterval = gatewayConfig.getClouderaManagerDescriptorsMonitoringInterval();
     this.executorService = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("ClouderaManagerDescriptorMonitor-%d").build());
@@ -57,23 +61,23 @@ public class ClouderaManagerDescriptorMonitor {
 
   public void setupMonitor() {
     if (monitoringInterval > 0) {
-      executorService.scheduleAtFixedRate(() -> monitorClouderaManagerDescriptors(), 0,
monitoringInterval, TimeUnit.MILLISECONDS);
+      executorService.scheduleAtFixedRate(() -> monitorClouderaManagerDescriptors(false),
0, monitoringInterval, TimeUnit.MILLISECONDS);
       LOG.monitoringClouderaManagerDescriptor(descriptorsDir);
     }
   }
 
-  private void monitorClouderaManagerDescriptors() {
+  private void monitorClouderaManagerDescriptors(boolean force) {
     final File[] clouderaManagerDescriptorFiles = new File(descriptorsDir).listFiles((FileFilter)
new SuffixFileFilter(CM_DESCRIPTOR_FILE_EXTENSION));
     for (File clouderaManagerDescriptorFile : clouderaManagerDescriptorFiles) {
-      monitorClouderaManagerDescriptor(Paths.get(clouderaManagerDescriptorFile.getAbsolutePath()));
+      monitorClouderaManagerDescriptor(Paths.get(clouderaManagerDescriptorFile.getAbsolutePath()),
force);
     }
   }
 
-  private void monitorClouderaManagerDescriptor(Path clouderaManagerDescriptorFile) {
+  private void monitorClouderaManagerDescriptor(Path clouderaManagerDescriptorFile, boolean
force) {
     try {
       if (Files.isReadable(clouderaManagerDescriptorFile)) {
         final FileTime lastModifiedTime = Files.getLastModifiedTime(clouderaManagerDescriptorFile);
-        if (lastReloadTime == null || lastReloadTime.compareTo(lastModifiedTime) < 0)
{
+        if (force || lastReloadTime == null || lastReloadTime.compareTo(lastModifiedTime)
< 0) {
           lastReloadTime = lastModifiedTime;
           processClouderaManagerDescriptor(clouderaManagerDescriptorFile.toString());
         }
@@ -86,7 +90,7 @@ public class ClouderaManagerDescriptorMonitor {
   }
 
   private void processClouderaManagerDescriptor(String descriptorFilePath) {
-    ClouderaManagerDescriptorParser.parse(descriptorFilePath).forEach(simpleDescriptor ->
{
+    cmDescriptorParser.parse(descriptorFilePath).forEach(simpleDescriptor -> {
       try {
         final File knoxDescriptorFile = new File(descriptorsDir, simpleDescriptor.getName()
+ ".json");
         FileUtils.writeStringToFile(knoxDescriptorFile, JsonUtils.renderAsJsonString(simpleDescriptor),
StandardCharsets.UTF_8);
@@ -95,4 +99,9 @@ public class ClouderaManagerDescriptorMonitor {
       }
     });
   }
+
+  @Override
+  public void onAdvancedServiceDiscoveryConfigurationChange(Properties newConfiguration)
{
+    monitorClouderaManagerDescriptors(true);
+  }
 }
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParser.java
b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParser.java
index 44075fa..2986136 100644
--- a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParser.java
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParser.java
@@ -19,18 +19,21 @@ package org.apache.knox.gateway.cm.descriptor;
 import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.LinkedHashSet;
+import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.knox.gateway.ClouderaManagerIntegrationMessages;
 import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.topology.discovery.advanced.AdvancedServiceDiscoveryConfig;
+import org.apache.knox.gateway.topology.discovery.advanced.AdvancedServiceDiscoveryConfigChangeListener;
 import org.apache.knox.gateway.topology.simple.SimpleDescriptor;
 import org.apache.knox.gateway.topology.simple.SimpleDescriptorImpl;
 import org.apache.knox.gateway.topology.simple.SimpleDescriptorImpl.ApplicationImpl;
 import org.apache.knox.gateway.topology.simple.SimpleDescriptorImpl.ServiceImpl;
 
-public class ClouderaManagerDescriptorParser {
+public class ClouderaManagerDescriptorParser implements AdvancedServiceDiscoveryConfigChangeListener
{
   private static final ClouderaManagerIntegrationMessages log = MessagesFactory.get(ClouderaManagerIntegrationMessages.class);
   private static final String CONFIG_NAME_DISCOVERY_TYPE = "discoveryType";
   private static final String CONFIG_NAME_DISCOVERY_ADDRESS = "discoveryAddress";
@@ -42,6 +45,12 @@ public class ClouderaManagerDescriptorParser {
   private static final String CONFIG_NAME_SERVICE_URL = "url";
   private static final String CONFIG_NAME_SERVICE_VERSION = "version";
 
+  private AdvancedServiceDiscoveryConfig advancedServiceDiscoveryConfig;
+
+  public ClouderaManagerDescriptorParser() {
+    advancedServiceDiscoveryConfig = new AdvancedServiceDiscoveryConfig();
+  }
+
   /**
    * Produces a set of {@link SimpleDescriptor}s from the specified file.
    *
@@ -49,7 +58,7 @@ public class ClouderaManagerDescriptorParser {
    *          The path to the configuration file which holds descriptor information in a
pre-defined format.
    * @return A SimpleDescriptor based on the contents of the given file.
    */
-  public static Set<SimpleDescriptor> parse(String path) {
+  public Set<SimpleDescriptor> parse(String path) {
     try {
       log.parseClouderaManagerDescriptor(path);
       final Configuration xmlConfiguration = new Configuration(false);
@@ -64,7 +73,7 @@ public class ClouderaManagerDescriptorParser {
     }
   }
 
-  private static Set<SimpleDescriptor> parseXmlConfig(Configuration xmlConfiguration)
{
+  private Set<SimpleDescriptor> parseXmlConfig(Configuration xmlConfiguration) {
     final Set<SimpleDescriptor> descriptors = new LinkedHashSet<>();
     xmlConfiguration.forEach(xmlDescriptor -> {
       SimpleDescriptor descriptor = parseXmlDescriptor(xmlDescriptor.getKey(), xmlDescriptor.getValue());
@@ -75,7 +84,7 @@ public class ClouderaManagerDescriptorParser {
     return descriptors;
   }
 
-  private static SimpleDescriptor parseXmlDescriptor(String name, String xmlValue) {
+  private SimpleDescriptor parseXmlDescriptor(String name, String xmlValue) {
     try {
       final SimpleDescriptorImpl descriptor = new SimpleDescriptorImpl();
       descriptor.setName(name);
@@ -111,13 +120,29 @@ public class ClouderaManagerDescriptorParser {
           break;
         }
       }
+      if (advancedServiceDiscoveryConfig.getExpectedTopologyNames().contains(name)) {
+        addEnabledServices(descriptor);
+      }
       return descriptor;
-    } catch(Exception e) {
+    } catch (Exception e) {
       log.failedToParseDescriptor(name, e.getMessage(), e);
       return null;
     }
   }
 
+  /*
+   * Adds any enabled service which is not listed in the CM descriptor
+   */
+  private void addEnabledServices(SimpleDescriptorImpl descriptor) {
+    advancedServiceDiscoveryConfig.getEnabledServiceNames().forEach(enabledServiceName ->
{
+      if(descriptor.getService(enabledServiceName) == null) {
+        ServiceImpl service = new ServiceImpl();
+        service.setName(enabledServiceName);
+        descriptor.addService(service);
+      }
+    });
+  }
+
   /**
    * An application consists of the following parts: <code>app:$APPLICATION_NAME[:$PARAMETER_NAME=$PARAMETER_VALUE]</code>.
Parameters are
    * optional. <br>
@@ -127,7 +152,7 @@ public class ClouderaManagerDescriptorParser {
    * <li>app:knoxauth:param1.name=param1.value</li>
    * </ul>
    */
-  private static void parseApplication(SimpleDescriptorImpl descriptor, String configurationPair)
{
+  private void parseApplication(SimpleDescriptorImpl descriptor, String configurationPair)
{
     final String[] applicationParts = configurationPair.split(":");
     final String applicationName = applicationParts[1].trim();
     ApplicationImpl application = (ApplicationImpl) descriptor.getApplication(applicationName);
@@ -148,6 +173,7 @@ public class ClouderaManagerDescriptorParser {
   /**
    * A service consists of the following parts:
    * <ul>
+   * <li><code>$SERVICE_NAME</code></li>
    * <li><code>$SERVICE_NAME:url=$URL</code></li>
    * <li><code>$SERVICE_NAME:version=$VERSION</code> (optional)</li>
    * <li><code>$SERVICE_NAME[:$PARAMETER_NAME=$PARAMETER_VALUE] (optional)</code></li>
@@ -159,32 +185,44 @@ public class ClouderaManagerDescriptorParser {
    * <li>HIVE:param1.name=param1.value</li>
    * </ul>
    */
-  private static void parseService(SimpleDescriptorImpl descriptor, String configurationPair)
{
+  private void parseService(SimpleDescriptorImpl descriptor, String configurationPair) {
     final String[] serviceParts = configurationPair.split(":");
     final String serviceName = serviceParts[0].trim();
-    ServiceImpl service = (ServiceImpl) descriptor.getService(serviceName);
-    if (service == null) {
-      service = new ServiceImpl();
-      service.setName(serviceName);
-      descriptor.addService(service);
-    }
+    if (advancedServiceDiscoveryConfig.isServiceEnabled(serviceName)) {
+      ServiceImpl service = (ServiceImpl) descriptor.getService(serviceName);
+      if (service == null) {
+        service = new ServiceImpl();
+        service.setName(serviceName);
+        descriptor.addService(service);
+      }
 
-    // configuration value may contain ":" (for instance http://host:port) -> considering
a configuration name/value pair everything after '$SERVICE_NAME:'
-    final String serviceConfiguration = configurationPair.substring(serviceName.length()
+ 1).trim();
-    final String[] serviceConfigurationParts = serviceConfiguration.split("=", 2);
-    final String serviceConfigurationName = serviceConfigurationParts[0].trim();
-    final String serviceConfigurationValue = serviceConfigurationParts[1].trim();
-    switch (serviceConfigurationName) {
-    case CONFIG_NAME_SERVICE_URL:
-      service.addUrl(serviceConfigurationValue);
-      break;
-    case CONFIG_NAME_SERVICE_VERSION:
-      service.setVersion(serviceConfigurationValue);
-      break;
-    default:
-      service.addParam(serviceConfigurationName, serviceConfigurationValue);
-      break;
+      if (serviceParts.length > 1) {
+        // configuration value may contain ":" (for instance http://host:port) -> considering
a configuration name/value pair everything after '$SERVICE_NAME:'
+        final String serviceConfiguration = configurationPair.substring(serviceName.length()
+ 1).trim();
+        final String[] serviceConfigurationParts = serviceConfiguration.split("=", 2);
+        final String serviceConfigurationName = serviceConfigurationParts[0].trim();
+        final String serviceConfigurationValue = serviceConfigurationParts[1].trim();
+        switch (serviceConfigurationName) {
+        case CONFIG_NAME_SERVICE_URL:
+          service.addUrl(serviceConfigurationValue);
+          break;
+        case CONFIG_NAME_SERVICE_VERSION:
+          service.setVersion(serviceConfigurationValue);
+          break;
+        default:
+          service.addParam(serviceConfigurationName, serviceConfigurationValue);
+          break;
+        }
+      }
+    } else {
+      log.serviceDisabled(serviceName, descriptor.getName());
     }
   }
 
+  @Override
+  public void onAdvancedServiceDiscoveryConfigurationChange(Properties newConfiguration)
{
+    advancedServiceDiscoveryConfig = new AdvancedServiceDiscoveryConfig(newConfiguration);
+    log.updatedAdvanceServiceDiscoverytConfiguration();
+  }
+
 }
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvanceServiceDiscoveryConfigurationMessages.java
b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvanceServiceDiscoveryConfigurationMessages.java
new file mode 100644
index 0000000..e51f0d2
--- /dev/null
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvanceServiceDiscoveryConfigurationMessages.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.knox.gateway.topology.discovery.advanced;
+
+import org.apache.knox.gateway.i18n.messages.Message;
+import org.apache.knox.gateway.i18n.messages.MessageLevel;
+import org.apache.knox.gateway.i18n.messages.Messages;
+import org.apache.knox.gateway.i18n.messages.StackTrace;
+
+@Messages(logger = "org.apache.knox.gateway.topology.discovery.advanced")
+public interface AdvanceServiceDiscoveryConfigurationMessages {
+
+  @Message(level = MessageLevel.INFO, text = "Monitoring {0} for changes.")
+  void monitorStarted(String path);
+
+  @Message(level = MessageLevel.ERROR, text = "Error while monitoring CM advanced configuration:
{1}")
+  void failedToMonitorClouderaManagerAdvancedConfiguration(String errorMessage, @StackTrace(level
= MessageLevel.DEBUG) Exception e);
+
+  @Message(level = MessageLevel.INFO, text = "Notifying listeners due to advanced service
discovery configuration changes...")
+  void notifyListeners();
+
+}
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfig.java
b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfig.java
new file mode 100644
index 0000000..11fc56e
--- /dev/null
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.knox.gateway.topology.discovery.advanced;
+
+import static java.util.stream.Collectors.toSet;
+
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.stream.Stream;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Wrapper class providing useful methods on properties coming from
+ * <code>$KNOX_CONF_DIR/auto-discovery-advanced-configuration.properties</code>
+ */
+public class AdvancedServiceDiscoveryConfig {
+
+  public static final String PARAMETER_NAME_PREFIX_ENABLED_SERVICE = "gateway.auto.discovery.enabled.";
+  public static final String PARAMETER_NAME_EXPECTED_TOPOLOGIES = "gateway.auto.discovery.expected.topology.names";
+
+  private final Properties properties;
+
+  public AdvancedServiceDiscoveryConfig() {
+    this(null);
+  }
+
+  public AdvancedServiceDiscoveryConfig(Properties properties) {
+    this.properties = properties == null ? new Properties() : properties;
+  }
+
+  public boolean isServiceEnabled(String serviceName) {
+    return Boolean.valueOf(getPropertyIgnoreCase(PARAMETER_NAME_PREFIX_ENABLED_SERVICE +
serviceName, "true"));
+  }
+
+  public Set<String> getEnabledServiceNames() {
+    return properties.entrySet().stream().filter(keyValuePair -> Boolean.valueOf((String)
keyValuePair.getValue()))
+        .map(keyValuePair -> ((String) keyValuePair.getKey()).substring(PARAMETER_NAME_PREFIX_ENABLED_SERVICE.length()).toUpperCase(Locale.getDefault())).collect(toSet());
+  }
+
+  public Set<String> getExpectedTopologyNames() {
+    return Stream.of(properties.getProperty(PARAMETER_NAME_EXPECTED_TOPOLOGIES, "").split(",")).map(expectedToplogyName
-> expectedToplogyName.trim()).collect(toSet());
+  }
+
+  private String getPropertyIgnoreCase(String propertyName, String defaultValue) {
+    final String property = properties.getProperty(propertyName);
+    if (property != null) {
+      return property;
+    } else {
+      for (Entry<Object, Object> entry : properties.entrySet()) {
+        if (propertyName.equalsIgnoreCase((String) entry.getKey())) {
+          return (String) entry.getValue();
+        }
+      }
+      return defaultValue;
+    }
+  }
+}
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigChangeListener.java
b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigChangeListener.java
new file mode 100644
index 0000000..21061ec
--- /dev/null
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigChangeListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.knox.gateway.topology.discovery.advanced;
+
+import java.util.Properties;
+
+/**
+ * The listener interface for receiving service discovery configuration events.
+ */
+public interface AdvancedServiceDiscoveryConfigChangeListener {
+
+  void onAdvancedServiceDiscoveryConfigurationChange(Properties newConfiguration);
+
+}
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigurationMonitor.java
b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigurationMonitor.java
new file mode 100644
index 0000000..2f17a06
--- /dev/null
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigurationMonitor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.knox.gateway.topology.discovery.advanced;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+
+/**
+ * Monitoring <code>$KNOX_CONF_DIR/auto-discovery-advanced-configuration.properties</code>
(if exists) and notifies any
+ * {@link AdvancedServiceDiscoveryConfigChangeListener} if the file is changed since the
last time it was loaded
+ *
+ */
+public class AdvancedServiceDiscoveryConfigurationMonitor {
+
+  private static final String ADVANCED_CONFIGURATION_FILE_NAME = "auto-discovery-advanced-configuration.properties";
+  private static final AdvanceServiceDiscoveryConfigurationMessages LOG = MessagesFactory.get(AdvanceServiceDiscoveryConfigurationMessages.class);
+
+  private final List<AdvancedServiceDiscoveryConfigChangeListener> listeners;
+
+  private ScheduledExecutorService executorService;
+  private FileTime lastReloadTime;
+
+  public AdvancedServiceDiscoveryConfigurationMonitor(GatewayConfig gatewayConfig) {
+    final long monitoringInterval = gatewayConfig.getClouderaManagerAdvancedServiceDiscoveryConfigurationMonitoringInterval();
+    if (monitoringInterval > 0) {
+      this.executorService = newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("AdvancedServiceDiscoveryConfigurationMonitor-%d").build());
+      final Path resourcePath = Paths.get(gatewayConfig.getGatewayConfDir(), ADVANCED_CONFIGURATION_FILE_NAME);
+      executorService.scheduleAtFixedRate(() -> monitorAdvancedServiceConfiguration(resourcePath),
0, monitoringInterval, TimeUnit.MILLISECONDS);
+      LOG.monitorStarted(resourcePath.toString());
+    }
+
+    listeners = new ArrayList<>();
+  }
+
+  public void registerListener(AdvancedServiceDiscoveryConfigChangeListener listener) {
+    listeners.add(listener);
+  }
+
+  private void monitorAdvancedServiceConfiguration(Path resourcePath) {
+    try {
+      if (Files.exists(resourcePath) && Files.isReadable(resourcePath)) {
+        FileTime lastModifiedTime = Files.getLastModifiedTime(resourcePath);
+        if (lastReloadTime == null || lastReloadTime.compareTo(lastModifiedTime) < 0)
{
+          lastReloadTime = lastModifiedTime;
+          try (InputStream advanceconfigurationFileInputStream = Files.newInputStream(resourcePath))
{
+            Properties properties = new Properties();
+            properties.load(advanceconfigurationFileInputStream);
+            notifyListeners(properties);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOG.failedToMonitorClouderaManagerAdvancedConfiguration(e.getMessage(), e);
+    }
+  }
+
+  private void notifyListeners(Properties properties) {
+    LOG.notifyListeners();
+    listeners.forEach(listener -> listener.onAdvancedServiceDiscoveryConfigurationChange(properties));
+  }
+
+}
diff --git a/gateway-cm-integration/src/test/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParserTest.java
b/gateway-cm-integration/src/test/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParserTest.java
index 84d825f..023d7be 100644
--- a/gateway-cm-integration/src/test/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParserTest.java
+++ b/gateway-cm-integration/src/test/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParserTest.java
@@ -25,21 +25,31 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.knox.gateway.topology.discovery.advanced.AdvancedServiceDiscoveryConfig;
 import org.apache.knox.gateway.topology.simple.SimpleDescriptor;
 import org.apache.knox.gateway.topology.simple.SimpleDescriptor.Application;
 import org.apache.knox.gateway.topology.simple.SimpleDescriptor.Service;
+import org.junit.Before;
 import org.junit.Test;
 
 public class ClouderaManagerDescriptorParserTest {
 
+  private ClouderaManagerDescriptorParser cmDescriptorParser;
+
+  @Before
+  public void setUp() {
+    cmDescriptorParser = new ClouderaManagerDescriptorParser();
+  }
+
   @Test
-  public void testXmlParser() throws Exception {
+  public void testCMDescriptorParser() throws Exception {
     final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptor.xml").getPath();
-    final Set<SimpleDescriptor> descriptors = ClouderaManagerDescriptorParser.parse(testConfigPath);
+    final Set<SimpleDescriptor> descriptors = cmDescriptorParser.parse(testConfigPath);
     assertEquals(2, descriptors.size());
     final Iterator<SimpleDescriptor> descriptorsIterator = descriptors.iterator();
     validateTopology1(descriptorsIterator.next());
@@ -47,21 +57,55 @@ public class ClouderaManagerDescriptorParserTest {
   }
 
   @Test
-  public void testXmlParserWrongDescriptorContent() throws Exception {
+  public void testCMDescriptorParserWrongDescriptorContent() throws Exception {
     final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptorConfigurationWithWrongDescriptor.xml").getPath();
-    final Set<SimpleDescriptor> descriptors = ClouderaManagerDescriptorParser.parse(testConfigPath);
+    final Set<SimpleDescriptor> descriptors = cmDescriptorParser.parse(testConfigPath);
     assertEquals(1, descriptors.size());
     final Iterator<SimpleDescriptor> descriptorsIterator = descriptors.iterator();
     validateTopology1(descriptorsIterator.next());
   }
 
   @Test
-  public void testXmlParserWrongXMLContent() throws Exception {
+  public void testCMDescriptorParserWrongXMLContent() throws Exception {
     final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptorConfigurationWithNonHadoopStyleConfiguration.xml").getPath();
-    final Set<SimpleDescriptor> descriptors = ClouderaManagerDescriptorParser.parse(testConfigPath);
+    final Set<SimpleDescriptor> descriptors = cmDescriptorParser.parse(testConfigPath);
     assertTrue(descriptors.isEmpty());
   }
 
+  @Test
+  public void testCMDescriptorParserWithNotEnabledServices() throws Exception {
+    final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptor.xml").getPath();
+    final Properties advancedConfiguration = new Properties();
+    advancedConfiguration.put(AdvancedServiceDiscoveryConfig.PARAMETER_NAME_PREFIX_ENABLED_SERVICE
+ "HIVE", "false");
+    cmDescriptorParser.onAdvancedServiceDiscoveryConfigurationChange(advancedConfiguration);
+    final Set<SimpleDescriptor> descriptors = cmDescriptorParser.parse(testConfigPath);
+    assertEquals(2, descriptors.size());
+    final Iterator<SimpleDescriptor> descriptorsIterator = descriptors.iterator();
+    SimpleDescriptor descriptor = descriptorsIterator.next();
+    assertNotNull(descriptor);
+    // topology1 comes with HIVE which is disabled
+    assertTrue(descriptor.getServices().isEmpty());
+  }
+
+  @Test
+  public void testCMDescriptorParserWithEnabledNotListedServiceInTopology1() throws Exception
{
+    final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptor.xml").getPath();
+    final Properties advancedConfiguration = new Properties();
+    advancedConfiguration.put(AdvancedServiceDiscoveryConfig.PARAMETER_NAME_PREFIX_ENABLED_SERVICE
+ "OOZIE", "true");
+    advancedConfiguration.put(AdvancedServiceDiscoveryConfig.PARAMETER_NAME_EXPECTED_TOPOLOGIES,
"topology1, topology100");
+    cmDescriptorParser.onAdvancedServiceDiscoveryConfigurationChange(advancedConfiguration);
+    final Set<SimpleDescriptor> descriptors = cmDescriptorParser.parse(testConfigPath);
+    final Iterator<SimpleDescriptor> descriptorsIterator = descriptors.iterator();
+    SimpleDescriptor descriptor = descriptorsIterator.next();
+    assertNotNull(descriptor);
+    // topology1 comes without OOZIE but it's enabled and topology1 is expected -> OOZIE
should be added without any url/version/parameter
+    assertService(descriptor, "OOZIE", null, null, null);
+
+    descriptor = descriptorsIterator.next();
+    validateTopology2(descriptor);
+    assertNull(descriptor.getService("OOZIE"));
+  }
+
   private void validateTopology1(SimpleDescriptor descriptor) {
     assertEquals("topology1", descriptor.getName());
     assertEquals("ClouderaManager", descriptor.getDiscoveryType());
@@ -90,7 +134,8 @@ public class ClouderaManagerDescriptorParserTest {
 
     final Map<String, String> expectedServiceParameters = Stream.of(new String[][]
{ { "httpclient.connectionTimeout", "5m" }, { "httpclient.socketTimeout", "100m" }, })
         .collect(Collectors.toMap(data -> data[0], data -> data[1]));
-    assertService(descriptor, "HDFS", null, Collections.singletonList("http://localhost:456"),
expectedServiceParameters);
+    assertService(descriptor, "ATLAS-API", null, Collections.singletonList("http://localhost:456"),
expectedServiceParameters);
+    assertService(descriptor, "NIFI", null, null, null);
   }
 
   private void assertApplication(SimpleDescriptor descriptor, String expectedApplicationName,
Map<String, String> expectedParams) {
@@ -114,6 +159,8 @@ public class ClouderaManagerDescriptorParserTest {
 
     if (expectedUrls != null) {
       assertTrue(service.getURLs().containsAll(expectedUrls));
+    } else {
+      assertNull(service.getURLs());
     }
 
     if (expectedParams != null) {
diff --git a/gateway-cm-integration/src/test/resources/testDescriptor.xml b/gateway-cm-integration/src/test/resources/testDescriptor.xml
index c853600..a2593df 100644
--- a/gateway-cm-integration/src/test/resources/testDescriptor.xml
+++ b/gateway-cm-integration/src/test/resources/testDescriptor.xml
@@ -39,9 +39,10 @@ limitations under the License.
         discoveryAddress=http://host:456;
         cluster=Cluster 2;
         providerConfigRef=topology2-provider;
-        HDFS:url=http://localhost:456;
-        HDFS:httpclient.connectionTimeout=5m;
-        HDFS:httpclient.socketTimeout=100m
+        ATLAS-API:url=http://localhost:456;
+        ATLAS-API:httpclient.connectionTimeout=5m;
+        ATLAS-API:httpclient.socketTimeout=100m;
+        NIFI
     </value>
   </property>
 </configuration>
\ No newline at end of file
diff --git a/gateway-cm-integration/src/test/resources/testDescriptorConfigurationWithWrongDescriptor.xml
b/gateway-cm-integration/src/test/resources/testDescriptorConfigurationWithWrongDescriptor.xml
index ffa8c25..a249162 100644
--- a/gateway-cm-integration/src/test/resources/testDescriptorConfigurationWithWrongDescriptor.xml
+++ b/gateway-cm-integration/src/test/resources/testDescriptorConfigurationWithWrongDescriptor.xml
@@ -40,7 +40,7 @@ limitations under the License.
         cluster=Cluster 2;
         providerConfigRef=topology2-provider;
         HDFS:url=http://localhost:456;
-        HDFS <!-- can not be parsed -->
+        HDFS:noValueParam <!-- can not be parsed -->
     </value>
   </property>
 </configuration>
\ No newline at end of file
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java b/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java
index 82c8bbb..1e7e727 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java
@@ -28,6 +28,7 @@ import org.apache.knox.gateway.audit.api.Auditor;
 import org.apache.knox.gateway.audit.api.ResourceType;
 import org.apache.knox.gateway.audit.log4j.audit.AuditConstants;
 import org.apache.knox.gateway.cm.descriptor.ClouderaManagerDescriptorMonitor;
+import org.apache.knox.gateway.cm.descriptor.ClouderaManagerDescriptorParser;
 import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.knox.gateway.config.GatewayConfigurationException;
 import org.apache.knox.gateway.config.impl.GatewayConfigImpl;
@@ -48,6 +49,7 @@ import org.apache.knox.gateway.topology.Application;
 import org.apache.knox.gateway.topology.Topology;
 import org.apache.knox.gateway.topology.TopologyEvent;
 import org.apache.knox.gateway.topology.TopologyListener;
+import org.apache.knox.gateway.topology.discovery.advanced.AdvancedServiceDiscoveryConfigurationMonitor;
 import org.apache.knox.gateway.trace.AccessHandler;
 import org.apache.knox.gateway.trace.KnoxErrorHandler;
 import org.apache.knox.gateway.trace.TraceHandler;
@@ -622,8 +624,12 @@ public class GatewayServer {
         "org.eclipse.jetty.webapp.JettyWebXmlConfiguration",
         "org.eclipse.jetty.annotations.AnnotationConfiguration" );
 
-    final ClouderaManagerDescriptorMonitor cmDescriptorMonitor = new ClouderaManagerDescriptorMonitor(config);
+    final ClouderaManagerDescriptorParser cmDescriptorParser = new ClouderaManagerDescriptorParser();
+    final ClouderaManagerDescriptorMonitor cmDescriptorMonitor = new ClouderaManagerDescriptorMonitor(config,
cmDescriptorParser);
     cmDescriptorMonitor.setupMonitor();
+    final AdvancedServiceDiscoveryConfigurationMonitor advancedServiceDiscoveryConfigurationMonitor
= new AdvancedServiceDiscoveryConfigurationMonitor(config);
+    advancedServiceDiscoveryConfigurationMonitor.registerListener(cmDescriptorParser);
+    advancedServiceDiscoveryConfigurationMonitor.registerListener(cmDescriptorMonitor);
 
     // Load the current topologies.
     // Redeploy autodeploy topologies.
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
index 6c1ccaa..36ba1ee 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
@@ -242,6 +242,8 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig
{
 
   private static final String CLOUDERA_MANAGER_DESCRIPTORS_MONITOR_INTERVAL = GATEWAY_CONFIG_FILE_PREFIX
+ ".cloudera.manager.descriptors.monitor.interval";
   private static final long DEFAULT_CLOUDERA_MANAGER_DESCRIPTORS_MONITOR_INTERVAL = 30000L;
+  private static final String CLOUDERA_MANAGER_ADVANCED_SERVICE_DISCOVERY_CONF_MONITOR_INTERVAL
= GATEWAY_CONFIG_FILE_PREFIX + ".cloudera.manager.advanced.service.discovery.config.monitor.interval";
+  private static final long DEFAULT_CLOUDERA_MANAGER_ADVANCED_SERVICE_DISCOVERY_CONF_MONITOR_INTERVAL
= 30000L;
 
   public GatewayConfigImpl() {
     init();
@@ -1102,4 +1104,9 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig
{
   public long getClouderaManagerDescriptorsMonitoringInterval() {
     return getLong(CLOUDERA_MANAGER_DESCRIPTORS_MONITOR_INTERVAL, DEFAULT_CLOUDERA_MANAGER_DESCRIPTORS_MONITOR_INTERVAL);
   }
+
+  @Override
+  public long getClouderaManagerAdvancedServiceDiscoveryConfigurationMonitoringInterval()
{
+    return getLong(CLOUDERA_MANAGER_ADVANCED_SERVICE_DISCOVERY_CONF_MONITOR_INTERVAL, DEFAULT_CLOUDERA_MANAGER_ADVANCED_SERVICE_DISCOVERY_CONF_MONITOR_INTERVAL);
+  }
 }
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
index 94e7642..4aa7192 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
@@ -647,4 +647,9 @@ public interface GatewayConfig {
    * @return the monitoring interval (in milliseconds) of Cloudera Manager descriptors
    */
   long getClouderaManagerDescriptorsMonitoringInterval();
+
+  /**
+   * @return the monitoring interval (in milliseconds) of Cloudera Manager advanced service
discovery configuration
+   */
+  long getClouderaManagerAdvancedServiceDiscoveryConfigurationMonitoringInterval();
 }
diff --git a/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
b/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
index 6170550..f7df9a3 100644
--- a/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
+++ b/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
@@ -769,4 +769,9 @@ public class GatewayTestConfig extends Configuration implements GatewayConfig
{
   public long getClouderaManagerDescriptorsMonitoringInterval() {
     return 0;
   }
+
+  @Override
+  public long getClouderaManagerAdvancedServiceDiscoveryConfigurationMonitoringInterval()
{
+    return 0;
+  }
 }
diff --git a/gateway-topology-simple/src/main/java/org/apache/knox/gateway/topology/simple/SimpleDescriptorImpl.java
b/gateway-topology-simple/src/main/java/org/apache/knox/gateway/topology/simple/SimpleDescriptorImpl.java
index e8ed738..41e7f9b 100644
--- a/gateway-topology-simple/src/main/java/org/apache/knox/gateway/topology/simple/SimpleDescriptorImpl.java
+++ b/gateway-topology-simple/src/main/java/org/apache/knox/gateway/topology/simple/SimpleDescriptorImpl.java
@@ -20,6 +20,7 @@ package org.apache.knox.gateway.topology.simple;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -122,6 +123,10 @@ public class SimpleDescriptorImpl implements SimpleDescriptor {
       services.add(service);
     }
 
+    public void setServices(Collection<Service> services) {
+      this.services = new ArrayList<>(services);
+    }
+
     @Override
     public List<Service> getServices() {
         List<Service> result = new ArrayList<>();


Mime
View raw message