metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [metron] mmiklavc commented on a change in pull request #1365: Metron-2050: Automatically populate a list of enrichments from HBase
Date Fri, 05 Apr 2019 22:44:34 GMT
mmiklavc commented on a change in pull request #1365: Metron-2050: Automatically populate a
list of enrichments from HBase
URL: https://github.com/apache/metron/pull/1365#discussion_r272763803
 
 

 ##########
 File path: metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessor.java
 ##########
 @@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase.coprocessor;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheWriter;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handle collecting a list of enrichment coprocessors.
+ * <p>
+ * Configuration supplied via Metron global config from Zookeeper. Requires one property
on startup
+ * - "zookeeperUrl" - which can be provided via HBase shell or hbase-site.xml. The typical
installation
+ * mechanism provided by Metron will leverage the HBase shell.
+ * <p>
+ * <b>Note:</b> We need to be careful of our exception handling so as not to
inadvertantly get our
+ * coprocessor disabled by the RegionServer. From the HBase documentation:
+ * <p>
+ * For all functions, exception handling is done as follows:
+ * <ul>
+ *   <li>Exceptions of type IOException are reported back to client.</li>
+ *   <li>For any other kind of exception:</li>
+ *   <ul>
+ *     <li>If the configuration CoprocessorHost.ABORT_ON_ERROR_KEY is set to true,
then the server aborts.</li>
+ *     <li>Otherwise, coprocessor is removed from the server and DoNotRetryIOException
is returned to the client.</li>
+ *   </ul>
+ * </ul>
+ *
+ * @see <a href="https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html">https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html</a>
+ * @see EnrichmentConfigurations Available options.
+ */
+public class EnrichmentCoprocessor extends BaseRegionObserver {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  // pass in via coprocessor config options - via hbase shell or hbase-site.xml
+  // see more here - https://hbase.apache.org/1.1/book.html#load_coprocessor_in_shell
+  public static final String ZOOKEEPER_URL = "zookeeperUrl";
+  public static final String COLUMN_QUALIFIER = "v";
+  private Cache<String, String> cache;
+  private GlobalConfigService globalConfigService;
+  private RegionCoprocessorEnvironment coprocessorEnv;
+  private Map<String, Object> globalConfig;
+
+  /**
+   * HBase requires a no-arg constructor.
+   */
+  public EnrichmentCoprocessor() {
+  }
+
+  /**
+   * Allow test dep injection.
+   */
+  public EnrichmentCoprocessor(CacheWriter<String, String> cacheWriter,
+      GlobalConfigService globalConfigService) {
+    this.cache = Caffeine.newBuilder().writer(cacheWriter).build();
+    this.globalConfigService = globalConfigService;
+  }
+
+  /**
+   * Allow test dep injection.
+   */
+  public EnrichmentCoprocessor(GlobalConfigService globalConfigService) {
+    this.globalConfigService = globalConfigService;
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment ce) throws IOException {
+    LOG.info("Starting enrichment coprocessor");
+    if (ce instanceof RegionCoprocessorEnvironment) {
+      this.coprocessorEnv = (RegionCoprocessorEnvironment) ce;
+    } else {
+      throw new CoprocessorException("Enrichment coprocessor must be loaded on a table region.");
+    }
+    LOG.info("Checking if internal cache initialized");
+    if (null == this.cache) {
+      LOG.info("Cache null, initializing");
+      LOG.info("Getting global config from Zookeeper");
+      String zkUrl = getZookeeperUrl(this.coprocessorEnv.getConfiguration());
+      if (null == globalConfigService) {
+        globalConfigService = getGlobalConfigService(zkUrl);
+      }
+      globalConfig = globalConfigService.get();
+      Configuration config = this.coprocessorEnv.getConfiguration();
+      CacheWriter<String, String> cacheWriter = null;
+      try {
+        String hbaseTableProviderName = (String) globalConfig
+            .get(EnrichmentConfigurations.TABLE_PROVIDER);
+        String tableName = (String) globalConfig.get(EnrichmentConfigurations.TABLE_NAME);
+        String columnFamily = (String) globalConfig.get(EnrichmentConfigurations.COLUMN_FAMILY);
+        cacheWriter = new HBaseCacheWriter(config, TableProvider
+            .create(hbaseTableProviderName, HTableProvider::new), tableName, columnFamily,
+            COLUMN_QUALIFIER);
+      } catch (ClassNotFoundException | InstantiationException | InvocationTargetException
| IllegalAccessException | NoSuchMethodException e) {
+        throw new IOException("Unable to instantiate cache writer", e);
+      }
+      this.cache = Caffeine.newBuilder().writer(cacheWriter).build();
+      LOG.info("Finished initializing cache");
+    }
+    LOG.info("Finished starting enrichment coprocessor");
+  }
+
+  private String getZookeeperUrl(Configuration config) {
+    String zkUrl = config.get(ZOOKEEPER_URL);
+    if (null == zkUrl) {
+      throw new IllegalStateException(
+          "Enrichment coprocessor requires property '" + ZOOKEEPER_URL
+              + "' to be provided at startup.");
+    }
+    return zkUrl;
+  }
+
+  private GlobalConfigService getGlobalConfigService(String zkUrl) {
 
 Review comment:
   I had considered that, but I wanted to be careful because, as you mentioned, this is a
one time load and there could be adverse side effects without explicitly changing it to handle
changes to the configured parameters. The other thing is that the ZKConfigurationsCache abstraction
is tightly coupled to the concept of our topology types, e.g. 
   * ENRICHMENT
   * PARSER
   * INDEXING
   * PROFILER
   
   Any reference to the global config has to be through one of those configuration types.
There is probably some refactoring that could be done there if we wanted, but it felt a bit
ham-fisted to shoehorn that in for what amounts to about 10 fairly concise lines of code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message