flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
Date Mon, 20 May 2019 00:48:06 GMT
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog
and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285403193
 
 

 ##########
 File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##########
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
 	private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
+	private static final String DEFAULT_DB = "default";
 
-	public HiveCatalog(String catalogName, String hivemetastoreURI) {
-		super(catalogName, hivemetastoreURI);
+	// Prefix used to distinguish properties created by Hive and Flink,
+	// as Hive metastore has its own properties created upon table creation and migration between
different versions of metastore.
+	private static final String FLINK_PROPERTY_PREFIX = "flink.";
+	private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic";
 
-		LOG.info("Created HiveCatalog '{}'", catalogName);
+	protected final String catalogName;
+	protected final HiveConf hiveConf;
+
+	private final String defaultDatabase;
+	protected IMetaStoreClient client;
+
+	public HiveCatalog(String catalogName, String hivemetastoreURI) {
+		this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
 	}
 
 	public HiveCatalog(String catalogName, HiveConf hiveConf) {
-		super(catalogName, hiveConf);
+		this(catalogName, DEFAULT_DB, hiveConf);
+	}
+
+	public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be
null or empty");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot
be null or empty");
+		this.catalogName = catalogName;
+		this.defaultDatabase = defaultDatabase;
+		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
 
 		LOG.info("Created HiveCatalog '{}'", catalogName);
 	}
 
+	private static HiveConf getHiveConf(String hiveMetastoreURI) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI
cannot be null or empty");
+
+		HiveConf hiveConf = new HiveConf();
+		hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
+		return hiveConf;
+	}
+
+	private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+		try {
+			return RetryingMetaStoreClient.getProxy(
+				hiveConf,
+				null,
+				null,
+				HiveMetaStoreClient.class.getName(),
+				true);
+		} catch (MetaException e) {
+			throw new CatalogException("Failed to create Hive metastore client", e);
+		}
+	}
+
+	@Override
+	public void open() throws CatalogException {
+		if (client == null) {
+			client = getMetastoreClient(hiveConf);
+			LOG.info("Connected to Hive metastore");
+		}
+
+		if (!databaseExists(defaultDatabase)) {
+			throw new CatalogException(String.format("Configured default database %s doesn't exist
in catalog %s.",
+				defaultDatabase, catalogName));
+		}
+	}
+
+	@Override
+	public void close() throws CatalogException {
+		if (client != null) {
+			client.close();
+			client = null;
+			LOG.info("Close connection to Hive metastore");
+		}
+	}
+
 	// ------ databases ------
 
+	public String getDefaultDatabase() throws CatalogException {
+		return defaultDatabase;
+	}
+
 	@Override
-	protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-		return new HiveCatalogDatabase(
-			hiveDatabase.getParameters(),
-			hiveDatabase.getLocationUri(),
-			hiveDatabase.getDescription());
+	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException,
CatalogException {
+		Database hiveDatabase = getHiveDatabase(databaseName);
+
+		Map<String, String> properties = hiveDatabase.getParameters();
+		boolean isGeneric = Boolean.valueOf(properties.get(GENERC_META_PROPERTY_KEY));
 
 Review comment:
   shall we remove the property before returning to users? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message