From commits-return-4273-apmail-drill-commits-archive=drill.apache.org@drill.apache.org Tue Oct 18 23:45:23 2016 Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 08A4819F5F for ; Tue, 18 Oct 2016 23:45:23 +0000 (UTC) Received: (qmail 56852 invoked by uid 500); 18 Oct 2016 23:45:22 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 56761 invoked by uid 500); 18 Oct 2016 23:45:22 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 56318 invoked by uid 99); 18 Oct 2016 23:45:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Oct 2016 23:45:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0CCADE0FC4; Tue, 18 Oct 2016 23:45:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: parthc@apache.org To: commits@drill.apache.org Date: Tue, 18 Oct 2016 23:45:29 -0000 Message-Id: <04cfb39c3b5e447aaa1410d6bde18d29@git.apache.org> In-Reply-To: <817bb67ea86343ecbecfd5a59f901225@git.apache.org> References: <817bb67ea86343ecbecfd5a59f901225@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/11] drill git commit: DRILL-4726: Dynamic UDF Support DRILL-4726: Dynamic UDF Support 1) Configuration / parsing / options / protos 2) Zookeeper integration 3) Registration / unregistration / lazy-init 4) Unit tests This closes #574 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/89f2633f Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/89f2633f Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/89f2633f Branch: refs/heads/master Commit: 89f2633f612a645666de8f51dcb19c6f8044a95e Parents: 8461d10 Author: Arina Ielchiieva Authored: Mon Aug 22 13:29:30 2016 +0000 Committer: Parth Chandra Committed: Tue Oct 18 10:47:52 2016 -0700 ---------------------------------------------------------------------- .../apache/drill/common/config/DrillConfig.java | 9 + .../drill/common/scanner/RunTimeScan.java | 21 + distribution/src/resources/drill-config.sh | 18 + distribution/src/resources/sqlline.bat | 5 + exec/java-exec/src/main/codegen/data/Parser.tdd | 7 +- .../src/main/codegen/includes/parserImpls.ftl | 40 + .../org/apache/drill/exec/ExecConstants.java | 27 +- .../drill/exec/coord/zk/ZkEphemeralStore.java | 17 +- .../drill/exec/coord/zk/ZookeeperClient.java | 101 +- .../exception/FunctionNotFoundException.java | 27 + .../exception/FunctionValidationException.java | 28 + .../exec/exception/JarValidationException.java | 28 + .../exception/VersionMismatchException.java | 33 + .../drill/exec/expr/fn/DrillFuncHolder.java | 28 + .../exec/expr/fn/DrillFunctionRegistry.java | 221 --- .../exec/expr/fn/DrillSimpleFuncHolder.java | 6 +- .../drill/exec/expr/fn/FunctionConverter.java | 4 +- .../expr/fn/FunctionImplementationRegistry.java | 342 ++++- .../drill/exec/expr/fn/FunctionInitializer.java | 21 +- .../exec/expr/fn/registry/FunctionHolder.java | 54 + .../fn/registry/FunctionRegistryHolder.java | 377 +++++ .../drill/exec/expr/fn/registry/JarScan.java | 53 + .../expr/fn/registry/LocalFunctionRegistry.java | 329 ++++ .../fn/registry/RemoteFunctionRegistry.java | 269 ++++ .../org/apache/drill/exec/ops/QueryContext.java | 5 + .../exec/planner/sql/DrillOperatorTable.java | 24 +- .../drill/exec/planner/sql/DrillSqlWorker.java | 27 +- .../drill/exec/planner/sql/SqlConverter.java | 9 + .../sql/handlers/CreateFunctionHandler.java | 328 ++++ .../sql/handlers/DropFunctionHandler.java | 167 ++ .../sql/parser/CompoundIdentifierConverter.java | 2 + .../planner/sql/parser/SqlCreateFunction.java | 79 + .../planner/sql/parser/SqlDropFunction.java | 79 + .../rpc/user/InboundImpersonationManager.java | 6 +- .../org/apache/drill/exec/server/Drillbit.java | 1 + .../drill/exec/server/DrillbitContext.java | 5 + .../exec/server/options/OptionValidator.java | 14 + .../server/options/SystemOptionManager.java | 3 +- .../exec/server/options/TypeValidators.java | 51 +- .../exec/store/sys/BasePersistentStore.java | 18 +- .../drill/exec/store/sys/PersistentStore.java | 21 + .../exec/store/sys/store/DataChangeVersion.java | 32 + .../sys/store/ZookeeperPersistentStore.java | 36 +- .../exec/testing/store/NoWriteLocalStore.java | 61 +- .../org/apache/drill/exec/util/JarUtil.java | 33 + .../src/main/resources/drill-module.conf | 29 + .../java/org/apache/drill/BaseTestQuery.java | 14 - .../org/apache/drill/TestDynamicUDFSupport.java | 801 ++++++++++ .../java/org/apache/drill/exec/ExecTest.java | 22 +- .../exec/coord/zk/TestZookeeperClient.java | 49 +- .../fn/registry/FunctionRegistryHolderTest.java | 279 ++++ .../exec/physical/impl/TestSimpleFunctions.java | 20 +- .../resources/jars/DrillUDF-1.0-sources.jar | Bin 0 -> 1892 bytes .../src/test/resources/jars/DrillUDF-1.0.jar | Bin 0 -> 3146 bytes .../resources/jars/DrillUDF-2.0-sources.jar | Bin 0 -> 1891 bytes .../src/test/resources/jars/DrillUDF-2.0.jar | Bin 0 -> 3142 bytes .../jars/DrillUDF_Copy-1.0-sources.jar | Bin 0 -> 1892 bytes .../test/resources/jars/DrillUDF_Copy-1.0.jar | Bin 0 -> 3185 bytes .../jars/DrillUDF_DupFunc-1.0-sources.jar | Bin 0 -> 1888 bytes .../resources/jars/DrillUDF_DupFunc-1.0.jar | Bin 0 -> 3201 bytes .../jars/DrillUDF_Empty-1.0-sources.jar | Bin 0 -> 536 bytes .../test/resources/jars/DrillUDF_Empty-1.0.jar | Bin 0 -> 1863 bytes .../jars/DrillUDF_NoMarkerFile-1.0-sources.jar | Bin 0 -> 1715 bytes .../jars/DrillUDF_NoMarkerFile-1.0.jar | Bin 0 -> 3084 bytes .../resources/jars/v2/DrillUDF-1.0-sources.jar | Bin 0 -> 1899 bytes .../src/test/resources/jars/v2/DrillUDF-1.0.jar | Bin 0 -> 3215 bytes .../org/apache/drill/jdbc/ITTestShadedJar.java | 19 + .../drill/exec/proto/SchemaUserBitShared.java | 231 +++ .../apache/drill/exec/proto/UserBitShared.java | 1439 +++++++++++++++++- .../org/apache/drill/exec/proto/beans/Jar.java | 195 +++ .../apache/drill/exec/proto/beans/Registry.java | 175 +++ protocol/src/main/protobuf/UserBitShared.proto | 20 + 72 files changed, 5986 insertions(+), 373 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/common/src/main/java/org/apache/drill/common/config/DrillConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java index 43d05c3..6828718 100644 --- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java +++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java @@ -164,6 +164,15 @@ public class DrillConfig extends NestedConfig { } /** + * Creates a drill configuration using the provided config file. + * @param config custom configuration file + * @return {@link DrillConfig} instance + */ + public static DrillConfig create(Config config) { + return new DrillConfig(config.resolve(), true); + } + + /** * @param overrideFileResourcePathname * see {@link #create(String)}'s {@code overrideFileResourcePathname} * @param overriderProps http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java b/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java index 1d95b04..7faa0fb 100644 --- a/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java +++ b/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java @@ -20,7 +20,9 @@ package org.apache.drill.common.scanner; import java.net.URL; import java.util.Collection; import java.util.List; +import java.util.Set; +import com.google.common.collect.Lists; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.scanner.persistence.ScanResult; @@ -75,4 +77,23 @@ public class RunTimeScan { } } + /** + * Scans packages retrieved from config. + * Returns scan result with list of packages, classes and annotations found. + * Is used to scan specific jars not associated with classpath at runtime. + * + * @param config to retrieve the packages to scan + * @param markedPath list of paths where to scan + * @return the scan result with list of packages, classes and annotations found + */ + public static ScanResult dynamicPackageScan(DrillConfig config, Set markedPath) { + List packagePrefixes = ClassPathScanner.getPackagePrefixes(config); + return ClassPathScanner.scan( + markedPath, + packagePrefixes, + Lists.newArrayList(), + PRESCANNED.getScannedAnnotations(), + ClassPathScanner.emptyResult()); + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/distribution/src/resources/drill-config.sh ---------------------------------------------------------------------- diff --git a/distribution/src/resources/drill-config.sh b/distribution/src/resources/drill-config.sh index 737be36..7a72e27 100644 --- a/distribution/src/resources/drill-config.sh +++ b/distribution/src/resources/drill-config.sh @@ -324,6 +324,22 @@ if [ -n "$DRILL_CLASSPATH" ]; then CP="$CP:$DRILL_CLASSPATH" fi +# Drill temporary directory is used as base for temporary storage of Dynamic UDF jars. +# If tmp dir is given, it must exist. +if [ -n "$DRILL_TMP_DIR" ]; then + if [[ ! -d "$DRILL_TMP_DIR" ]]; then + fatal_error "Temporary dir does not exist:" $DRILL_TMP_DIR + fi +else + # Otherwise, use the default + DRILL_TMP_DIR="/tmp" +fi + +mkdir -p "$DRILL_TMP_DIR" +if [[ ! -d "$DRILL_TMP_DIR" || ! -w "$DRILL_TMP_DIR" ]]; then + fatal_error "Temporary directory does not exist or is not writable: $DRILL_TMP_DIR" +fi + # Test for cygwin is_cygwin=false case "`uname`" in @@ -371,6 +387,7 @@ if $is_cygwin; then DRILL_HOME=`cygpath -w "$DRILL_HOME"` DRILL_CONF_DIR=`cygpath -w "$DRILL_CONF_DIR"` DRILL_LOG_DIR=`cygpath -w "$DRILL_LOG_DIR"` + DRILL_TMP_DIR=`cygpath -w "$DRILL_TMP_DIR"` CP=`cygpath -w -p "$CP"` if [ -z "$HADOOP_HOME" ]; then export HADOOP_HOME=${DRILL_HOME}/winutils @@ -391,6 +408,7 @@ export is_cygwin export DRILL_HOME export DRILL_CONF_DIR export DRILL_LOG_DIR +export DRILL_TMP_DIR export CP # DRILL-4870: Don't export JAVA_HOME. Java can find it just fine from the java # command. If we attempt to work it out, we do so incorrectly for the Mac. http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/distribution/src/resources/sqlline.bat ---------------------------------------------------------------------- diff --git a/distribution/src/resources/sqlline.bat b/distribution/src/resources/sqlline.bat index a0efdf1..f008604 100755 --- a/distribution/src/resources/sqlline.bat +++ b/distribution/src/resources/sqlline.bat @@ -114,6 +114,11 @@ if "test%DRILL_LOG_DIR%" == "test" ( set DRILL_LOG_DIR=%DRILL_HOME%\log ) +@rem Drill temporary directory is used as base for temporary storage of Dynamic UDF jars. +if "test%DRILL_TMP_DIR%" == "test" ( + set DRILL_TMP_DIR=%TEMP% +) + rem ---- rem Deal with Hadoop JARs, if HADOOP_HOME was specified rem ---- http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/codegen/data/Parser.tdd ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd index ce3ee4c..6c23808 100644 --- a/exec/java-exec/src/main/codegen/data/Parser.tdd +++ b/exec/java-exec/src/main/codegen/data/Parser.tdd @@ -38,7 +38,8 @@ "REFRESH", "METADATA", "DATABASE", - "IF" + "IF", + "JAR" ] # List of methods for parsing custom SQL statements. @@ -53,7 +54,9 @@ "SqlShowFiles()", "SqlCreateTable()", "SqlDropTable()", - "SqlRefreshMetadata()" + "SqlRefreshMetadata()", + "SqlCreateFunction()", + "SqlDropFunction()" ] # List of methods for parsing custom literals. http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/codegen/includes/parserImpls.ftl ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl index 9901098..0017446 100644 --- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl +++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl @@ -297,4 +297,44 @@ SqlNode SqlDescribeSchema() : { return new SqlDescribeSchema(pos, schema); } +} + +/** +* Parse create UDF statement +* CREATE FUNCTION USING JAR 'jar_name' +*/ +SqlNode SqlCreateFunction() : +{ + SqlParserPos pos; + SqlNode jar; +} +{ + { pos = getPos(); } + + + + jar = StringLiteral() + { + return new SqlCreateFunction(pos, jar); + } +} + +/** +* Parse drop UDF statement +* DROP FUNCTION USING JAR 'jar_name' +*/ +SqlNode SqlDropFunction() : +{ + SqlParserPos pos; + SqlNode jar; +} +{ + { pos = getPos(); } + + + + jar = StringLiteral() + { + return new SqlDropFunction(pos, jar); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index d6a210a..0f2321b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -20,7 +20,6 @@ package org.apache.drill.exec; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.rpc.user.InboundImpersonationManager; import org.apache.drill.exec.server.options.OptionValidator; -import org.apache.drill.exec.server.options.TypeValidators.AdminOptionValidator; import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator; import org.apache.drill.exec.server.options.TypeValidators.DoubleValidator; import org.apache.drill.exec.server.options.TypeValidators.EnumeratedStringValidator; @@ -106,10 +105,23 @@ public interface ExecConstants { String RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS = "drill.exec.debug.return_error_for_failure_in_cancelled_fragments"; + String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types"; + /** + * Configuration properties connected with dynamic UDFs support + */ + String UDF_RETRY_ATTEMPTS = "drill.exec.udf.retry-attempts"; + String UDF_DIRECTORY_FS = "drill.exec.udf.directory.fs"; + String UDF_DIRECTORY_ROOT = "drill.exec.udf.directory.root"; + String UDF_DIRECTORY_LOCAL = "drill.exec.udf.directory.local"; + String UDF_DIRECTORY_STAGING = "drill.exec.udf.directory.staging"; + String UDF_DIRECTORY_REGISTRY = "drill.exec.udf.directory.registry"; + String UDF_DIRECTORY_TMP = "drill.exec.udf.directory.tmp"; - - String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types"; + /** + * Local temporary directory is used as base for temporary storage of Dynamic UDF jars. + */ + String DRILL_TMP_DIR = "drill.tmp-dir"; String OUTPUT_FORMAT_OPTION = "store.format"; OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet"); @@ -296,15 +308,13 @@ public interface ExecConstants { * such as changing system options. */ String ADMIN_USERS_KEY = "security.admin.users"; - StringValidator ADMIN_USERS_VALIDATOR = - new AdminOptionValidator(ADMIN_USERS_KEY, ImpersonationUtil.getProcessUserName()); + StringValidator ADMIN_USERS_VALIDATOR = new StringValidator(ADMIN_USERS_KEY, ImpersonationUtil.getProcessUserName(), true); /** * Option whose value is a comma separated list of admin usergroups. */ String ADMIN_USER_GROUPS_KEY = "security.admin.user_groups"; - StringValidator ADMIN_USER_GROUPS_VALIDATOR = new AdminOptionValidator(ADMIN_USER_GROUPS_KEY, ""); - + StringValidator ADMIN_USER_GROUPS_VALIDATOR = new StringValidator(ADMIN_USER_GROUPS_KEY, "", true); /** * Option whose value is a string representing list of inbound impersonation policies. * @@ -337,4 +347,7 @@ public interface ExecConstants { String CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS = "prepare.statement.create_timeout_ms"; OptionValidator CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR = new PositiveLongValidator(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS, Integer.MAX_VALUE, 10000); + + String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support"; + BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true); } http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java index 94e03ad..f485e9e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java @@ -88,16 +88,17 @@ public class ZkEphemeralStore extends BaseTransientStore { @Override public V putIfAbsent(final String key, final V value) { - final V old = get(key); - if (old == null) { - try { - final byte[] bytes = config.getSerializer().serialize(value); - getClient().put(key, bytes); - } catch (final IOException e) { - throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e); + try { + final InstanceSerializer serializer = config.getSerializer(); + final byte[] bytes = serializer.serialize(value); + final byte[] data = getClient().putIfAbsent(key, bytes); + if (data == null) { + return null; } + return serializer.deserialize(data); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e); } - return old; } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java index 2debf43..610a2b9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java @@ -31,8 +31,12 @@ import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.drill.common.collections.ImmutableEntry; import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.exception.VersionMismatchException; +import org.apache.drill.exec.store.sys.store.DataChangeVersion; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.data.Stat; /** * A namespace aware Zookeeper client. @@ -133,13 +137,52 @@ public class ZookeeperClient implements AutoCloseable { * the check is eventually consistent. * * @param path target path + * @param consistent consistency flag */ public byte[] get(final String path, final boolean consistent) { + return get(path, consistent, null); + } + + /** + * Returns the value corresponding to the given key, null otherwise. + * + * The check is consistent as it is made against Zookeeper directly. + * + * Passes version holder to get data change version. + * + * @param path target path + * @param version version holder + */ + public byte[] get(final String path, DataChangeVersion version) { + return get(path, true, version); + } + + /** + * Returns the value corresponding to the given key, null otherwise. + * + * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise, + * the check is eventually consistent. + * + * If consistency flag is set to true and version holder is not null, passes version holder to get data change version. + * Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed. + * Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes + * + * @param path target path + * @param consistent consistency check + * @param version version holder + */ + public byte[] get(final String path, final boolean consistent, final DataChangeVersion version) { Preconditions.checkNotNull(path, "path is required"); final String target = PathUtils.join(root, path); if (consistent) { try { + if (version != null) { + Stat stat = new Stat(); + final byte[] bytes = curator.getData().storingStatIn(stat).forPath(target); + version.setVersion(stat.getVersion()); + return bytes; + } return curator.getData().forPath(target); } catch (final Exception ex) { throw new DrillRuntimeException(String.format("error retrieving value for [%s]", path), ex); @@ -179,6 +222,26 @@ public class ZookeeperClient implements AutoCloseable { * @param data data to store */ public void put(final String path, final byte[] data) { + put(path, data, null); + } + + /** + * Puts the given byte sequence into the given path. + * + * If path does not exists, this call creates it. + * + * If version holder is not null and path already exists, passes given version for comparison. + * Zookeeper maintains stat structure that holds version number which increases each time znode data change is performed. + * If we pass version that doesn't match the actual version of the data, + * the update will fail {@link org.apache.zookeeper.KeeperException.BadVersionException}. + * We catch such exception and re-throw it as {@link VersionMismatchException}. + * Link to documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes + * + * @param path target path + * @param data data to store + * @param version version holder + */ + public void put(final String path, final byte[] data, DataChangeVersion version) { Preconditions.checkNotNull(path, "path is required"); Preconditions.checkNotNull(data, "data is required"); @@ -199,9 +262,45 @@ public class ZookeeperClient implements AutoCloseable { } } if (hasNode) { - curator.setData().forPath(target, data); + if (version != null) { + try { + curator.setData().withVersion(version.getVersion()).forPath(target, data); + } catch (final KeeperException.BadVersionException e) { + throw new VersionMismatchException("Unable to put data. Version mismatch is detected.", version.getVersion(), e); + } + } else { + curator.setData().forPath(target, data); + } } getCache().rebuildNode(target); + } catch (final VersionMismatchException e) { + throw e; + } catch (final Exception e) { + throw new DrillRuntimeException("unable to put ", e); + } + } + + /** + * Puts the given byte sequence into the given path if path is does not exist. + * + * @param path target path + * @param data data to store + * @return null if path was created, else data stored for the given path + */ + public byte[] putIfAbsent(final String path, final byte[] data) { + Preconditions.checkNotNull(path, "path is required"); + Preconditions.checkNotNull(data, "data is required"); + + final String target = PathUtils.join(root, path); + try { + try { + curator.create().withMode(mode).forPath(target, data); + getCache().rebuildNode(target); + return null; + } catch (NodeExistsException e) { + // do nothing + } + return curator.getData().forPath(target); } catch (final Exception e) { throw new DrillRuntimeException("unable to put ", e); } http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java new file mode 100644 index 0000000..0d59cc8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.exception; + +import org.apache.drill.common.exceptions.DrillRuntimeException; + +public class FunctionNotFoundException extends DrillRuntimeException { + + public FunctionNotFoundException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java new file mode 100644 index 0000000..7475e24 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.exception; + +import org.apache.drill.common.exceptions.DrillRuntimeException; + +public class FunctionValidationException extends DrillRuntimeException { + + public FunctionValidationException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java new file mode 100644 index 0000000..a6fa407 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.exception; + +import org.apache.drill.common.exceptions.DrillRuntimeException; + +public class JarValidationException extends DrillRuntimeException { + + public JarValidationException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java new file mode 100644 index 0000000..796f410 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.exception; + + +import org.apache.drill.common.exceptions.DrillRuntimeException; + +public class VersionMismatchException extends DrillRuntimeException { + + public VersionMismatchException(String message, int expectedVersion, Throwable cause) { + super(message + ". Expected version : " + expectedVersion, cause); + } + + public VersionMismatchException(String message, int expectedVersion) { + super(message + ". Expected version : " + expectedVersion); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java index 869a4ac..fc51d03 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java @@ -132,6 +132,34 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder { return attributes.isDeterministic(); } + /** + * Generates string representation of function input parameters: + * PARAMETER_TYPE_1-PARAMETER_MODE_1,PARAMETER_TYPE_2-PARAMETER_MODE_2 + * Example: VARCHAR-REQUIRED,VARCHAR-OPTIONAL + * Returns empty string if function has no input parameters. + * + * @return string representation of function input parameters + */ + public String getInputParameters() { + StringBuilder builder = new StringBuilder(); + builder.append(""); + for (ValueReference ref : parameters) { + final MajorType type = ref.getType(); + builder.append(","); + builder.append(type.getMinorType().toString()); + builder.append("-"); + builder.append(type.getMode().toString()); + } + return builder.length() == 0 ? builder.toString() : builder.substring(1); + } + + /** + * @return instance of class loader used to load function + */ + public ClassLoader getClassLoader() { + return initializer.getClassLoader(); + } + protected JVar[] declareWorkspaceVariables(ClassGenerator g) { JVar[] workspaceJVars = new JVar[workspaceVars.length]; for (int i = 0; i < workspaceVars.length; i++) { http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java deleted file mode 100644 index f58d5a5..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.expr.fn; - -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.calcite.sql.SqlOperator; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor; -import org.apache.drill.common.scanner.persistence.ScanResult; -import org.apache.drill.exec.planner.logical.DrillConstExecutor; -import org.apache.drill.exec.planner.sql.DrillOperatorTable; -import org.apache.drill.exec.planner.sql.DrillSqlAggOperator; -import org.apache.drill.exec.planner.sql.DrillSqlAggOperatorWithoutInference; -import org.apache.drill.exec.planner.sql.DrillSqlOperator; - -import com.google.common.collect.ArrayListMultimap; -import org.apache.drill.exec.planner.sql.DrillSqlOperatorWithoutInference; - -/** - * Registry of Drill functions. - */ -public class DrillFunctionRegistry { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFunctionRegistry.class); - - // key: function name (lowercase) value: list of functions with that name - private final ArrayListMultimap registeredFunctions = ArrayListMultimap.create(); - - private static final ImmutableMap> registeredFuncNameToArgRange = ImmutableMap.> builder() - // CONCAT is allowed to take [1, infinity) number of arguments. - // Currently, this flexibility is offered by DrillOptiq to rewrite it as - // a nested structure - .put("CONCAT", Pair.of(1, Integer.MAX_VALUE)) - - // When LENGTH is given two arguments, this function relies on DrillOptiq to rewrite it as - // another function based on the second argument (encodingType) - .put("LENGTH", Pair.of(1, 2)) - - // Dummy functions - .put("CONVERT_TO", Pair.of(2, 2)) - .put("CONVERT_FROM", Pair.of(2, 2)) - .put("FLATTEN", Pair.of(1, 1)).build(); - - public DrillFunctionRegistry(ScanResult classpathScan) { - FunctionConverter converter = new FunctionConverter(); - List providerClasses = classpathScan.getAnnotatedClasses(); - - // Hash map to prevent registering functions with exactly matching signatures - // key: Function Name + Input's Major Type - // value: Class name where function is implemented - // - final Map functionSignatureMap = new HashMap<>(); - for (AnnotatedClassDescriptor func : providerClasses) { - DrillFuncHolder holder = converter.getHolder(func); - if (holder != null) { - // register handle for each name the function can be referred to - String[] names = holder.getRegisteredNames(); - - // Create the string for input types - String functionInput = ""; - for (DrillFuncHolder.ValueReference ref : holder.parameters) { - functionInput += ref.getType().toString(); - } - for (String name : names) { - String functionName = name.toLowerCase(); - registeredFunctions.put(functionName, holder); - String functionSignature = functionName + functionInput; - String existingImplementation; - if ((existingImplementation = functionSignatureMap.get(functionSignature)) != null) { - throw new AssertionError( - String.format( - "Conflicting functions with similar signature found. Func Name: %s, Class name: %s " + - " Class name: %s", functionName, func.getClassName(), existingImplementation)); - } else if (holder.isAggregating() && !holder.isDeterministic() ) { - logger.warn("Aggregate functions must be deterministic, did not register function {}", func.getClassName()); - } else { - functionSignatureMap.put(functionSignature, func.getClassName()); - } - } - } else { - logger.warn("Unable to initialize function for class {}", func.getClassName()); - } - } - if (logger.isTraceEnabled()) { - StringBuilder allFunctions = new StringBuilder(); - for (DrillFuncHolder method: registeredFunctions.values()) { - allFunctions.append(method.toString()).append("\n"); - } - logger.trace("Registered functions: [\n{}]", allFunctions); - } - } - - public int size(){ - return registeredFunctions.size(); - } - - /** Returns functions with given name. Function name is case insensitive. */ - public List getMethods(String name) { - return this.registeredFunctions.get(name.toLowerCase()); - } - - public void register(DrillOperatorTable operatorTable) { - registerOperatorsWithInference(operatorTable); - registerOperatorsWithoutInference(operatorTable); - } - - private void registerOperatorsWithInference(DrillOperatorTable operatorTable) { - final Map map = Maps.newHashMap(); - final Map mapAgg = Maps.newHashMap(); - for (Entry> function : registeredFunctions.asMap().entrySet()) { - final ArrayListMultimap, DrillFuncHolder> functions = ArrayListMultimap.create(); - final ArrayListMultimap aggregateFunctions = ArrayListMultimap.create(); - final String name = function.getKey().toUpperCase(); - boolean isDeterministic = true; - for (DrillFuncHolder func : function.getValue()) { - final int paramCount = func.getParamCount(); - if(func.isAggregating()) { - aggregateFunctions.put(paramCount, func); - } else { - final Pair argNumberRange; - if(registeredFuncNameToArgRange.containsKey(name)) { - argNumberRange = registeredFuncNameToArgRange.get(name); - } else { - argNumberRange = Pair.of(func.getParamCount(), func.getParamCount()); - } - functions.put(argNumberRange, func); - } - - if(!func.isDeterministic()) { - isDeterministic = false; - } - } - for (Entry, Collection> entry : functions.asMap().entrySet()) { - final Pair range = entry.getKey(); - final int max = range.getRight(); - final int min = range.getLeft(); - if(!map.containsKey(name)) { - map.put(name, new DrillSqlOperator.DrillSqlOperatorBuilder() - .setName(name)); - } - - final DrillSqlOperator.DrillSqlOperatorBuilder drillSqlOperatorBuilder = map.get(name); - drillSqlOperatorBuilder - .addFunctions(entry.getValue()) - .setArgumentCount(min, max) - .setDeterministic(isDeterministic); - } - for (Entry> entry : aggregateFunctions.asMap().entrySet()) { - if(!mapAgg.containsKey(name)) { - mapAgg.put(name, new DrillSqlAggOperator.DrillSqlAggOperatorBuilder().setName(name)); - } - - final DrillSqlAggOperator.DrillSqlAggOperatorBuilder drillSqlAggOperatorBuilder = mapAgg.get(name); - drillSqlAggOperatorBuilder - .addFunctions(entry.getValue()) - .setArgumentCount(entry.getKey(), entry.getKey()); - } - } - - for(final Entry entry : map.entrySet()) { - operatorTable.addOperatorWithInference( - entry.getKey(), - entry.getValue().build()); - } - - for(final Entry entry : mapAgg.entrySet()) { - operatorTable.addOperatorWithInference( - entry.getKey(), - entry.getValue().build()); - } - } - - private void registerOperatorsWithoutInference(DrillOperatorTable operatorTable) { - SqlOperator op; - for (Entry> function : registeredFunctions.asMap().entrySet()) { - Set argCounts = Sets.newHashSet(); - String name = function.getKey().toUpperCase(); - for (DrillFuncHolder func : function.getValue()) { - if (argCounts.add(func.getParamCount())) { - if (func.isAggregating()) { - op = new DrillSqlAggOperatorWithoutInference(name, func.getParamCount()); - } else { - boolean isDeterministic; - // prevent Drill from folding constant functions with types that cannot be materialized - // into literals - if (DrillConstExecutor.NON_REDUCIBLE_TYPES.contains(func.getReturnType().getMinorType())) { - isDeterministic = false; - } else { - isDeterministic = func.isDeterministic(); - } - op = new DrillSqlOperatorWithoutInference(name, func.getParamCount(), func.getReturnType(), isDeterministic); - } - operatorTable.addOperatorWithoutInference(function.getKey(), op); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java index 78e4c62..655f571 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java @@ -40,10 +40,14 @@ public class DrillSimpleFuncHolder extends DrillFuncHolder { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSimpleFuncHolder.class); private final String drillFuncClass; + // each function should be wrapped unique class loader associated with its jar + // to prevent classpath collisions during loading and unloading jars + private final ClassLoader classLoader; public DrillSimpleFuncHolder(FunctionAttributes functionAttributes, FunctionInitializer initializer) { super(functionAttributes, initializer); drillFuncClass = checkNotNull(initializer.getClassName()); + classLoader = checkNotNull(initializer.getClassLoader()); } private String setupBody() { @@ -65,7 +69,7 @@ public class DrillSimpleFuncHolder extends DrillFuncHolder { } public DrillSimpleFunc createInterpreter() throws Exception { - return (DrillSimpleFunc)Class.forName(drillFuncClass).newInstance(); + return (DrillSimpleFunc)Class.forName(drillFuncClass, true, classLoader).newInstance(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java index 00be7aa..2f606e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java @@ -50,7 +50,7 @@ import com.google.common.collect.Lists; public class FunctionConverter { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionConverter.class); - public DrillFuncHolder getHolder(AnnotatedClassDescriptor func) { + public DrillFuncHolder getHolder(AnnotatedClassDescriptor func, ClassLoader classLoader) { FunctionTemplate template = func.getAnnotationProxy(FunctionTemplate.class); if (template == null) { return failure("Class does not declare FunctionTemplate annotation.", func); @@ -173,7 +173,7 @@ public class FunctionConverter { return failure("This function declares zero output fields. A function must declare one output field.", func); } - FunctionInitializer initializer = new FunctionInitializer(func.getClassName()); + FunctionInitializer initializer = new FunctionInitializer(func.getClassName(), classLoader); try{ // return holder ValueReference[] ps = params.toArray(new ValueReference[params.size()]); http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java index 5d26325..ede255a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java @@ -17,38 +17,69 @@ */ package org.apache.drill.exec.expr.fn; +import java.io.File; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.net.JarURLConnection; +import java.net.URL; +import java.net.URLClassLoader; +import java.net.URLConnection; +import java.util.Enumeration; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import com.typesafe.config.ConfigFactory; +import org.apache.commons.io.FileUtils; +import org.apache.drill.common.config.CommonConstants; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.fn.CastFunctions; import org.apache.drill.common.scanner.ClassPathScanner; +import org.apache.drill.common.scanner.RunTimeScan; import org.apache.drill.common.scanner.persistence.ScanResult; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.store.TransientStoreEvent; +import org.apache.drill.exec.coord.store.TransientStoreListener; +import org.apache.drill.exec.exception.FunctionValidationException; +import org.apache.drill.exec.exception.JarValidationException; +import org.apache.drill.exec.expr.fn.registry.LocalFunctionRegistry; +import org.apache.drill.exec.expr.fn.registry.JarScan; +import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry; import org.apache.drill.exec.planner.sql.DrillOperatorTable; +import org.apache.drill.exec.proto.UserBitShared.Jar; import org.apache.drill.exec.resolver.FunctionResolver; import org.apache.drill.exec.server.options.OptionManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import org.apache.drill.exec.util.JarUtil; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; /** * This class offers the registry for functions. Notably, in addition to Drill its functions - * (in {@link DrillFunctionRegistry}), other PluggableFunctionRegistry (e.g., {@link org.apache.drill.exec.expr.fn.HiveFunctionRegistry}) + * (in {@link LocalFunctionRegistry}), other PluggableFunctionRegistry (e.g., {@link org.apache.drill.exec.expr.fn.HiveFunctionRegistry}) * is also registered in this class */ -public class FunctionImplementationRegistry implements FunctionLookupContext { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionImplementationRegistry.class); +public class FunctionImplementationRegistry implements FunctionLookupContext, AutoCloseable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionImplementationRegistry.class); - private DrillFunctionRegistry drillFuncRegistry; + private final LocalFunctionRegistry localFunctionRegistry; + private final RemoteFunctionRegistry remoteFunctionRegistry; + private final Path localUdfDir; + private boolean deleteTmpDir = false; + private File tmpDir; private List pluggableFuncRegistries = Lists.newArrayList(); private OptionManager optionManager = null; @@ -61,7 +92,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext { Stopwatch w = Stopwatch.createStarted(); logger.debug("Generating function registry."); - drillFuncRegistry = new DrillFunctionRegistry(classpathScan); + localFunctionRegistry = new LocalFunctionRegistry(classpathScan); Set> registryClasses = classpathScan.getImplementations(PluggableFunctionRegistry.class); @@ -85,7 +116,9 @@ public class FunctionImplementationRegistry implements FunctionLookupContext { break; } } - logger.info("Function registry loaded. {} functions loaded in {} ms.", drillFuncRegistry.size(), w.elapsed(TimeUnit.MILLISECONDS)); + logger.info("Function registry loaded. {} functions loaded in {} ms.", localFunctionRegistry.size(), w.elapsed(TimeUnit.MILLISECONDS)); + this.remoteFunctionRegistry = new RemoteFunctionRegistry(new UnregistrationListener()); + this.localUdfDir = getLocalUdfDir(config); } public FunctionImplementationRegistry(DrillConfig config, ScanResult classpathScan, OptionManager optionManager) { @@ -99,7 +132,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext { */ public void register(DrillOperatorTable operatorTable) { // Register Drill functions first and move to pluggable function registries. - drillFuncRegistry.register(operatorTable); + localFunctionRegistry.register(operatorTable); for(PluggableFunctionRegistry registry : pluggableFuncRegistries) { registry.register(operatorTable); @@ -109,14 +142,26 @@ public class FunctionImplementationRegistry implements FunctionLookupContext { /** * Using the given functionResolver find Drill function implementation for given * functionCall - * - * @param functionResolver - * @param functionCall - * @return + * If function implementation was not found and in case if Dynamic UDF Support is enabled + * loads all missing remote functions and tries to find Drill implementation one more time. */ @Override public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall) { - return functionResolver.getBestMatch(drillFuncRegistry.getMethods(functionReplacement(functionCall)), functionCall); + return findDrillFunction(functionResolver, functionCall, true); + } + + private DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall, boolean retry) { + AtomicLong version = new AtomicLong(); + DrillFuncHolder holder = functionResolver.getBestMatch( + localFunctionRegistry.getMethods(functionReplacement(functionCall), version), functionCall); + if (holder == null && retry) { + if (optionManager != null && optionManager.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) { + if (loadRemoteFunctions(version.get())) { + findDrillFunction(functionResolver, functionCall, false); + } + } + } + return holder; } // Check if this Function Replacement is needed; if yes, return a new name. otherwise, return the original name @@ -138,18 +183,26 @@ public class FunctionImplementationRegistry implements FunctionLookupContext { /** * Find the Drill function implementation that matches the name, arg types and return type. - * @param name - * @param argTypes - * @param returnType - * @return + * If exact function implementation was not found and in case if Dynamic UDF Support is enabled + * loads all missing remote functions and tries to find Drill implementation one more time. */ public DrillFuncHolder findExactMatchingDrillFunction(String name, List argTypes, MajorType returnType) { - for (DrillFuncHolder h : drillFuncRegistry.getMethods(name)) { + return findExactMatchingDrillFunction(name, argTypes, returnType, true); + } + + private DrillFuncHolder findExactMatchingDrillFunction(String name, List argTypes, MajorType returnType, boolean retry) { + AtomicLong version = new AtomicLong(); + for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) { if (h.matches(returnType, argTypes)) { return h; } } + if (retry && optionManager != null && optionManager.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) { + if (loadRemoteFunctions(version.get())) { + findExactMatchingDrillFunction(name, argTypes, returnType, false); + } + } return null; } @@ -177,7 +230,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext { // Method to find if the output type of a drill function if of complex type public boolean isFunctionComplexOutput(String name) { - List methods = drillFuncRegistry.getMethods(name); + List methods = localFunctionRegistry.getMethods(name); for (DrillFuncHolder holder : methods) { if (holder.getReturnValue().isComplexWriter()) { return true; @@ -186,4 +239,257 @@ public class FunctionImplementationRegistry implements FunctionLookupContext { return false; } + public RemoteFunctionRegistry getRemoteFunctionRegistry() { + return remoteFunctionRegistry; + } + + /** + * Using given local path to jar creates unique class loader for this jar. + * Class loader is closed to release opened connection to jar when validation is finished. + * Scan jar content to receive list of all scanned classes + * and starts validation process against local function registry. + * Checks if received list of validated function is not empty. + * + * @param path local path to jar we need to validate + * @return list of validated function signatures + */ + public List validate(Path path) throws IOException { + URL url = path.toUri().toURL(); + URL[] urls = {url}; + try (URLClassLoader classLoader = new URLClassLoader(urls)) { + ScanResult jarScanResult = scan(classLoader, path, urls); + List functions = localFunctionRegistry.validate(path.getName(), jarScanResult); + if (functions.isEmpty()) { + throw new FunctionValidationException(String.format("Jar %s does not contain functions", path.getName())); + } + return functions; + } + } + + /** + * Attempts to load and register functions from remote function registry. + * First checks if there is no missing jars. + * If yes, enters synchronized block to prevent other loading the same jars. + * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock). + * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar. + * Jar registration timestamp represented in milliseconds is used as suffix. + * Then registers all jars at the same time. Returns true when finished. + * In case if any errors during jars coping or registration, logs errors and proceeds. + * + * If no missing jars are found, checks current local registry version. + * Returns false if versions match, true otherwise. + * + * @param version local function registry version + * @return true if new jars were registered or local function registry version is different, false otherwise + */ + public boolean loadRemoteFunctions(long version) { + List missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry); + if (!missingJars.isEmpty()) { + synchronized (this) { + missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry); + List jars = Lists.newArrayList(); + for (String jarName : missingJars) { + Path binary = null; + Path source = null; + URLClassLoader classLoader = null; + try { + binary = copyJarToLocal(jarName, remoteFunctionRegistry); + source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry); + URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()}; + classLoader = new URLClassLoader(urls); + ScanResult scanResult = scan(classLoader, binary, urls); + localFunctionRegistry.validate(jarName, scanResult); + jars.add(new JarScan(jarName, scanResult, classLoader)); + } catch (Exception e) { + deleteQuietlyLocalJar(binary); + deleteQuietlyLocalJar(source); + if (classLoader != null) { + try { + classLoader.close(); + } catch (Exception ex) { + logger.warn("Problem during closing class loader for {}", jarName, e); + } + } + logger.error("Problem during remote functions load from {}", jarName, e); + } + } + if (!jars.isEmpty()) { + localFunctionRegistry.register(jars); + return true; + } + } + } + return version != localFunctionRegistry.getVersion(); + } + + /** + * First finds path to marker file url, otherwise throws {@link JarValidationException}. + * Then scans jar classes according to list indicated in marker files. + * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}. + * This is extremely important for Windows users where system doesn't allow to delete file if it's being used. + * + * @param classLoader unique class loader for jar + * @param path local path to jar + * @param urls urls associated with the jar (ex: binary and source) + * @return scan result of packages, classes, annotations found in jar + */ + private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) throws IOException { + Enumeration markerFileEnumeration = classLoader.getResources( + CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME); + while (markerFileEnumeration.hasMoreElements()) { + URL markerFile = markerFileEnumeration.nextElement(); + if (markerFile.getPath().contains(path.toUri().getPath())) { + URLConnection markerFileConnection = null; + try { + markerFileConnection = markerFile.openConnection(); + DrillConfig drillConfig = DrillConfig.create(ConfigFactory.parseURL(markerFile)); + return RunTimeScan.dynamicPackageScan(drillConfig, Sets.newHashSet(urls)); + } finally { + if (markerFileConnection instanceof JarURLConnection) { + ((JarURLConnection) markerFile.openConnection()).getJarFile().close(); + } + } + } + } + throw new JarValidationException(String.format("Marker file %s is missing in %s", + CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, path.getName())); + } + + /** + * Return list of jars that are missing in local function registry + * but present in remote function registry. + * + * @param remoteFunctionRegistry remote function registry + * @param localFunctionRegistry local function registry + * @return list of missing jars + */ + private List getMissingJars(RemoteFunctionRegistry remoteFunctionRegistry, + LocalFunctionRegistry localFunctionRegistry) { + List remoteJars = remoteFunctionRegistry.getRegistry().getJarList(); + List localJars = localFunctionRegistry.getAllJarNames(); + List missingJars = Lists.newArrayList(); + for (Jar jar : remoteJars) { + if (!localJars.contains(jar.getName())) { + missingJars.add(jar.getName()); + } + } + return missingJars; + } + + /** + * Creates local udf directory, if it doesn't exist. + * Checks if local udf directory is a directory and if current application has write rights on it. + * Attempts to clean up local udf directory in case jars were left after previous drillbit run. + * Local udf directory path is concatenated from drill temporary directory and ${drill.exec.udf.directory.local}. + * + * @param config drill config + * @return path to local udf directory + */ + private Path getLocalUdfDir(DrillConfig config) { + tmpDir = getTmpDir(config); + File udfDir = new File(tmpDir, config.getString(ExecConstants.UDF_DIRECTORY_LOCAL)); + udfDir.mkdirs(); + String udfPath = udfDir.getPath(); + Preconditions.checkState(udfDir.exists(), "Local udf directory [%s] must exist", udfPath); + Preconditions.checkState(udfDir.isDirectory(), "Local udf directory [%s] must be a directory", udfPath); + Preconditions.checkState(udfDir.canWrite(), "Local udf directory [%s] must be writable for application user", udfPath); + try { + FileUtils.cleanDirectory(udfDir); + } catch (IOException e) { + throw new DrillRuntimeException("Error during local udf directory clean up", e); + } + return new Path(udfDir.toURI()); + } + + /** + * First tries to get drill temporary directory value from environmental variable $DRILL_TMP_DIR, + * then from config ${drill.tmp-dir}. + * If value is still missing, generates directory using {@link Files#createTempDir()}. + * If temporary directory was generated, sets {@link #deleteTmpDir} to true + * to delete directory on drillbit exit. + * @return drill temporary directory path + */ + private File getTmpDir(DrillConfig config) { + String drillTempDir = System.getenv("DRILL_TMP_DIR"); + + if (drillTempDir == null && config.hasPath(ExecConstants.DRILL_TMP_DIR)) { + drillTempDir = config.getString(ExecConstants.DRILL_TMP_DIR); + } + + if (drillTempDir == null) { + deleteTmpDir = true; + return Files.createTempDir(); + } + + return new File(drillTempDir); + } + + /** + * Copies jar from remote udf area to local udf area. + * + * @param jarName jar name to be copied + * @param remoteFunctionRegistry remote function registry + * @return local path to jar that was copied + * @throws IOException in case of problems during jar coping process + */ + private Path copyJarToLocal(String jarName, RemoteFunctionRegistry remoteFunctionRegistry) throws IOException { + Path registryArea = remoteFunctionRegistry.getRegistryArea(); + FileSystem fs = remoteFunctionRegistry.getFs(); + Path remoteJar = new Path(registryArea, jarName); + Path localJar = new Path(localUdfDir, jarName); + try { + fs.copyToLocalFile(remoteJar, localJar); + } catch (IOException e) { + String message = String.format("Error during jar [%s] coping from [%s] to [%s]", + jarName, registryArea.toUri().getPath(), localUdfDir.toUri().getPath()); + throw new IOException(message, e); + } + return localJar; + } + + /** + * Deletes quietly local jar but first checks if path to jar is not null. + * + * @param jar path to jar + */ + private void deleteQuietlyLocalJar(Path jar) { + if (jar != null) { + FileUtils.deleteQuietly(new File(jar.toUri().getPath())); + } + } + + /** + * If {@link #deleteTmpDir} is set to true, deletes generated temporary directory. + * Otherwise cleans up {@link #localUdfDir}. + */ + @Override + public void close() { + if (deleteTmpDir) { + FileUtils.deleteQuietly(tmpDir); + } else { + try { + FileUtils.cleanDirectory(new File(localUdfDir.toUri().getPath())); + } catch (IOException e) { + logger.warn("Problems during local udf directory clean up", e); + } + } + } + + /** + * Fires when jar name is submitted for unregistration. + * Will unregister all functions associated with the jar name + * and delete binary and source associated with the jar from local udf directory + */ + public class UnregistrationListener implements TransientStoreListener { + + @Override + public void onChange(TransientStoreEvent event) { + String jarName = (String) event.getValue(); + localFunctionRegistry.unregister(jarName); + String localDir = localUdfDir.toUri().getPath(); + FileUtils.deleteQuietly(new File(localDir, jarName)); + FileUtils.deleteQuietly(new File(localDir, JarUtil.getSourceName(jarName))); + } + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java index 1007afc..4e5ee4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.expr.fn; import java.io.IOException; import java.io.InputStream; import java.io.StringReader; -import java.net.URL; import java.util.List; import java.util.Map; @@ -33,7 +32,6 @@ import org.codehaus.janino.Scanner; import org.mortbay.util.IO; import com.google.common.collect.Maps; -import com.google.common.io.Resources; /** * To avoid the cost of initializing all functions up front, @@ -42,8 +40,8 @@ import com.google.common.io.Resources; public class FunctionInitializer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionInitializer.class); - private String className; - + private final String className; + private final ClassLoader classLoader; private Map functionUnits = Maps.newHashMap(); private Map methods; private List imports; @@ -51,13 +49,21 @@ public class FunctionInitializer { /** * @param className the fully qualified name of the class implementing the function + * @param classLoader class loader associated with the function, is unique for each jar that holds function + * to prevent classpath collisions during loading an unloading jars */ - public FunctionInitializer(String className) { + public FunctionInitializer(String className, ClassLoader classLoader) { super(); this.className = className; + this.classLoader = classLoader; } /** + * @return returns class loader + */ + public ClassLoader getClassLoader() { return classLoader; } + + /** * @return the fully qualified name of the class implementing the function */ public String getClassName() { @@ -94,7 +100,7 @@ public class FunctionInitializer { // get function body. try { - final Class clazz = Class.forName(className); + final Class clazz = Class.forName(className, true, classLoader); final CompilationUnit cu = get(clazz); if (cu == null) { @@ -123,8 +129,7 @@ public class FunctionInitializer { return cu; } - URL u = Resources.getResource(c, path); - try (InputStream is = Resources.asByteSource(u).openStream()) { + try (InputStream is = c.getResourceAsStream(path)) { if (is == null) { throw new IOException(String.format( "Failure trying to located source code for Class %s, tried to read on classpath location %s", c.getName(), http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java new file mode 100644 index 0000000..4b93c88 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.fn.registry; + +import org.apache.drill.exec.expr.fn.DrillFuncHolder; + +/** + * Holder class that contains: + *

    + *
  1. function name
  2. + *
  3. function signature which is string representation of function name and its input parameters
  4. + *
  5. {@link DrillFuncHolder} associated with the function
  6. + *
+ */ +public class FunctionHolder { + + private final String name; + private final String signature; + private final DrillFuncHolder holder; + + public FunctionHolder(String name, String signature, DrillFuncHolder holder) { + this.name = name; + this.signature = signature; + this.holder = holder; + } + + public String getName() { + return name; + } + + public DrillFuncHolder getHolder() { + return holder; + } + + public String getSignature() { + return signature; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java new file mode 100644 index 0000000..005c4e5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.fn.registry; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import org.apache.drill.common.concurrent.AutoCloseableLock; +import org.apache.drill.exec.expr.fn.DrillFuncHolder; + +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Function registry holder stores function implementations by jar name, function name. + * Contains two maps that hold data by jars and functions respectively. + * Jars map contains each jar as a key and map of all its functions with collection of function signatures as value. + * Functions map contains function name as key and map of its signatures and function holder as value. + * All maps and collections used are concurrent to guarantee memory consistency effects. + * Such structure is chosen to achieve maximum speed while retrieving data by jar or by function name, + * since we expect infrequent registry changes. + * Holder is designed to allow concurrent reads and single writes to keep data consistent. + * This is achieved by {@link ReadWriteLock} implementation usage. + * Holder has number version which changes every time new jars are added or removed. Initial version number is 0. + * Also version is used when user needs data from registry with version it is based on. + * + * Structure example: + * + * JARS + * built-in -> upper -> upper(VARCHAR-REQUIRED) + * -> lower -> lower(VARCHAR-REQUIRED) + * + * First.jar -> upper -> upper(VARCHAR-OPTIONAL) + * -> custom_upper -> custom_upper(VARCHAR-REQUIRED) + * -> custom_upper(VARCHAR-OPTIONAL) + * + * Second.jar -> lower -> lower(VARCHAR-OPTIONAL) + * -> custom_upper -> custom_upper(VARCHAR-REQUIRED) + * -> custom_upper(VARCHAR-OPTIONAL) + * + * FUNCTIONS + * upper -> upper(VARCHAR-REQUIRED) -> function holder for upper(VARCHAR-REQUIRED) + * -> upper(VARCHAR-OPTIONAL) -> function holder for upper(VARCHAR-OPTIONAL) + * + * lower -> lower(VARCHAR-REQUIRED) -> function holder for lower(VARCHAR-REQUIRED) + * -> lower(VARCHAR-OPTIONAL) -> function holder for lower(VARCHAR-OPTIONAL) + * + * custom_upper -> custom_upper(VARCHAR-REQUIRED) -> function holder for custom_upper(VARCHAR-REQUIRED) + * -> custom_upper(VARCHAR-OPTIONAL) -> function holder for custom_upper(VARCHAR-OPTIONAL) + * + * custom_lower -> custom_lower(VARCHAR-REQUIRED) -> function holder for custom_lower(VARCHAR-REQUIRED) + * -> custom_lower(VARCHAR-OPTIONAL) -> function holder for custom_lower(VARCHAR-OPTIONAL) + * + * where + * First.jar is jar name represented by String + * upper is function name represented by String + * upper(VARCHAR-REQUIRED) is signature name represented by String which consist of function name, list of input parameters + * function holder for upper(VARCHAR-REQUIRED) is {@link DrillFuncHolder} initiated for each function. + * + */ +public class FunctionRegistryHolder { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionRegistryHolder.class); + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); + private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); + private long version = 0; + + // jar name, Map + private final Map>> jars; + + // function name, Map + private final Map> functions; + + public FunctionRegistryHolder() { + this.functions = Maps.newConcurrentMap(); + this.jars = Maps.newConcurrentMap(); + } + + /** + * This is read operation, so several users at a time can get this data. + * @return local function registry version number + */ + public long getVersion() { + try (AutoCloseableLock lock = readLock.open()) { + return version; + } + } + + /** + * Adds jars to the function registry. + * If jar with the same name already exists, it and its functions will be removed. + * Then jar will be added to {@link #jars} + * and each function will be added using {@link #addFunctions(Map, List)}. + * Function version registry will be incremented by 1 if at least one jar was added but not for each jar. + * This is write operation, so one user at a time can call perform such action, + * others will wait till first user completes his action. + * + * @param newJars jars and list of their function holders, each contains function name, signature and holder + */ + public void addJars(Map> newJars) { + try (AutoCloseableLock lock = writeLock.open()) { + for (Map.Entry> newJar : newJars.entrySet()) { + String jarName = newJar.getKey(); + removeAllByJar(jarName); + Map> jar = Maps.newConcurrentMap(); + jars.put(jarName, jar); + addFunctions(jar, newJar.getValue()); + } + if (!newJars.isEmpty()) { + version++; + } + } + } + + /** + * Removes jar from {@link #jars} and all associated with jar functions from {@link #functions} + * If jar was removed, function registry version will be incremented by 1. + * This is write operation, so one user at a time can call perform such action, + * others will wait till first user completes his action. + * + * @param jarName jar name to be removed + */ + public void removeJar(String jarName) { + try (AutoCloseableLock lock = writeLock.open()) { + if (removeAllByJar(jarName)) { + version++; + } + } + } + + /** + * Retrieves list of all jars name present in {@link #jars} + * This is read operation, so several users can get this data. + * + * @return list of all jar names + */ + public List getAllJarNames() { + try (AutoCloseableLock lock = readLock.open()) { + return Lists.newArrayList(jars.keySet()); + } + } + + /** + * Retrieves all function names associated with the jar from {@link #jars}. + * Returns empty list if jar is not registered. + * This is read operation, so several users can perform this operation at the same time. + * + * @param jarName jar name + * @return list of functions names associated from the jar + */ + public List getFunctionNamesByJar(String jarName) { + try (AutoCloseableLock lock = readLock.open()){ + Map> functions = jars.get(jarName); + return functions == null ? Lists.newArrayList() : Lists.newArrayList(functions.keySet()); + } + } + + /** + * Returns list of functions with list of function holders for each functions. + * Uses guava {@link ListMultimap} structure to return data. + * If no functions present, will return empty {@link ListMultimap}. + * If version holder is not null, updates it with current registry version number. + * This is read operation, so several users can perform this operation at the same time. + * + * @param version version holder + * @return all functions which their holders + */ + public ListMultimap getAllFunctionsWithHolders(AtomicLong version) { + try (AutoCloseableLock lock = readLock.open()) { + if (version != null) { + version.set(this.version); + } + ListMultimap functionsWithHolders = ArrayListMultimap.create(); + for (Map.Entry> function : functions.entrySet()) { + functionsWithHolders.putAll(function.getKey(), Lists.newArrayList(function.getValue().values())); + } + return functionsWithHolders; + } + } + + /** + * Returns list of functions with list of function holders for each functions without version number. + * This is read operation, so several users can perform this operation at the same time. + * + * @return all functions which their holders + */ + public ListMultimap getAllFunctionsWithHolders() { + return getAllFunctionsWithHolders(null); + } + + /** + * Returns list of functions with list of function signatures for each functions. + * Uses guava {@link ListMultimap} structure to return data. + * If no functions present, will return empty {@link ListMultimap}. + * This is read operation, so several users can perform this operation at the same time. + * + * @return all functions which their signatures + */ + public ListMultimap getAllFunctionsWithSignatures() { + try (AutoCloseableLock lock = readLock.open()) { + ListMultimap functionsWithSignatures = ArrayListMultimap.create(); + for (Map.Entry> function : functions.entrySet()) { + functionsWithSignatures.putAll(function.getKey(), Lists.newArrayList(function.getValue().keySet())); + } + return functionsWithSignatures; + } + } + + /** + * Returns all function holders associated with function name. + * If function is not present, will return empty list. + * If version holder is not null, updates it with current registry version number. + * This is read operation, so several users can perform this operation at the same time. + * + * @param functionName function name + * @param version version holder + * @return list of function holders + */ + public List getHoldersByFunctionName(String functionName, AtomicLong version) { + try (AutoCloseableLock lock = readLock.open()) { + if (version != null) { + version.set(this.version); + } + Map holders = functions.get(functionName); + return holders == null ? Lists.newArrayList() : Lists.newArrayList(holders.values()); + } + } + + /** + * Returns all function holders associated with function name without version number. + * This is read operation, so several users can perform this operation at the same time. + * + * @param functionName function name + * @return list of function holders + */ + public List getHoldersByFunctionName(String functionName) { + return getHoldersByFunctionName(functionName, null); + } + + /** + * Checks is jar is present in {@link #jars}. + * This is read operation, so several users can perform this operation at the same time. + * + * @param jarName jar name + * @return true if jar exists, else false + */ + public boolean containsJar(String jarName) { + try (AutoCloseableLock lock = readLock.open()) { + return jars.containsKey(jarName); + } + } + + /** + * Returns quantity of functions stored in {@link #functions}. + * This is read operation, so several users can perform this operation at the same time. + * + * @return quantity of functions + */ + public int functionsSize() { + try (AutoCloseableLock lock = readLock.open()) { + return functions.size(); + } + } + + /** + * Looks which jar in {@link #jars} contains passed function signature. + * First looks by function name and if found checks if such function has passed function signature. + * Returns jar name if found matching function signature, else null. + * This is read operation, so several users can perform this operation at the same time. + * + * @param functionName function name + * @param functionSignature function signature + * @return jar name + */ + public String getJarNameByFunctionSignature(String functionName, String functionSignature) { + try (AutoCloseableLock lock = readLock.open()) { + for (Map.Entry>> jar : jars.entrySet()) { + Queue functionSignatures = jar.getValue().get(functionName); + if (functionSignatures != null && functionSignatures.contains(functionSignature)) { + return jar.getKey(); + } + } + } + return null; + } + + /** + * Adds all function names and signatures to passed jar, + * adds all function names, their signatures and holders to {@link #functions}. + * + * @param jar jar where function to be added + * @param newFunctions collection of function holders, each contains function name, signature and holder. + */ + private void addFunctions(Map> jar, List newFunctions) { + for (FunctionHolder function : newFunctions) { + final String functionName = function.getName(); + Queue jarFunctions = jar.get(functionName); + if (jarFunctions == null) { + jarFunctions = Queues.newConcurrentLinkedQueue();; + jar.put(functionName, jarFunctions); + } + final String functionSignature = function.getSignature(); + jarFunctions.add(functionSignature); + + Map signatures = functions.get(functionName); + if (signatures == null) { + signatures = Maps.newConcurrentMap(); + functions.put(functionName, signatures); + } + signatures.put(functionSignature, function.getHolder()); + } + } + + /** + * Removes jar from {@link #jars} and all associated with jars functions from {@link #functions} + * Since each jar is loaded with separate class loader before + * removing we need to close class loader to release opened connection to jar. + * All jar functions have the same class loader, so we need to close only one time. + * + * @param jarName jar name to be removed + * @return true if jar was removed, false otherwise + */ + private boolean removeAllByJar(String jarName) { + Map> jar = jars.remove(jarName); + if (jar == null) { + return false; + } + + for (Map.Entry> functionEntry : jar.entrySet()) { + final String function = functionEntry.getKey(); + Map functionHolders = functions.get(function); + Queue functionSignatures = functionEntry.getValue(); + for (Map.Entry entry : functionHolders.entrySet()) { + if (functionSignatures.contains(entry.getKey())) { + ClassLoader classLoader = entry.getValue().getClassLoader(); + if (classLoader instanceof AutoCloseable) { + try { + ((AutoCloseable) classLoader).close(); + } catch (Exception e) { + logger.warn("Problem during closing class loader", e); + } + } + break; + } + } + functionHolders.keySet().removeAll(functionSignatures); + + if (functionHolders.isEmpty()) { + functions.remove(function); + } + } + return true; + } +}