flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
Date Mon, 20 May 2019 00:48:07 GMT
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog
and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285404214
 
 

 ##########
 File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##########
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
 	private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
+	private static final String DEFAULT_DB = "default";
 
-	public HiveCatalog(String catalogName, String hivemetastoreURI) {
-		super(catalogName, hivemetastoreURI);
+	// Prefix used to distinguish properties created by Hive and Flink,
+	// as Hive metastore has its own properties created upon table creation and migration between
different versions of metastore.
+	private static final String FLINK_PROPERTY_PREFIX = "flink.";
+	private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic";
 
-		LOG.info("Created HiveCatalog '{}'", catalogName);
+	protected final String catalogName;
+	protected final HiveConf hiveConf;
+
+	private final String defaultDatabase;
+	protected IMetaStoreClient client;
+
+	public HiveCatalog(String catalogName, String hivemetastoreURI) {
+		this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
 	}
 
 	public HiveCatalog(String catalogName, HiveConf hiveConf) {
-		super(catalogName, hiveConf);
+		this(catalogName, DEFAULT_DB, hiveConf);
+	}
+
+	public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be
null or empty");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot
be null or empty");
+		this.catalogName = catalogName;
+		this.defaultDatabase = defaultDatabase;
+		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
 
 		LOG.info("Created HiveCatalog '{}'", catalogName);
 	}
 
+	private static HiveConf getHiveConf(String hiveMetastoreURI) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI
cannot be null or empty");
+
+		HiveConf hiveConf = new HiveConf();
+		hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
+		return hiveConf;
+	}
+
+	private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+		try {
+			return RetryingMetaStoreClient.getProxy(
+				hiveConf,
+				null,
+				null,
+				HiveMetaStoreClient.class.getName(),
+				true);
+		} catch (MetaException e) {
+			throw new CatalogException("Failed to create Hive metastore client", e);
+		}
+	}
+
+	@Override
+	public void open() throws CatalogException {
+		if (client == null) {
+			client = getMetastoreClient(hiveConf);
+			LOG.info("Connected to Hive metastore");
+		}
+
+		if (!databaseExists(defaultDatabase)) {
+			throw new CatalogException(String.format("Configured default database %s doesn't exist
in catalog %s.",
+				defaultDatabase, catalogName));
+		}
+	}
+
+	@Override
+	public void close() throws CatalogException {
+		if (client != null) {
+			client.close();
+			client = null;
+			LOG.info("Close connection to Hive metastore");
+		}
+	}
+
 	// ------ databases ------
 
+	public String getDefaultDatabase() throws CatalogException {
+		return defaultDatabase;
+	}
+
 	@Override
-	protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-		return new HiveCatalogDatabase(
-			hiveDatabase.getParameters(),
-			hiveDatabase.getLocationUri(),
-			hiveDatabase.getDescription());
+	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException,
CatalogException {
+		Database hiveDatabase = getHiveDatabase(databaseName);
+
+		Map<String, String> properties = hiveDatabase.getParameters();
+		boolean isGeneric = Boolean.valueOf(properties.get(GENERC_META_PROPERTY_KEY));
+		return !isGeneric ? new HiveCatalogDatabase(properties, hiveDatabase.getLocationUri(),
hiveDatabase.getDescription()) :
+			new GenericCatalogDatabase(retrieveFlinkProperties(properties), hiveDatabase.getDescription());
 	}
 
 	@Override
-	protected Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase)
{
-		HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) catalogDatabase;
+	public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)
+			throws DatabaseAlreadyExistException, CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be
null or empty");
+		checkNotNull(database, "database cannot be null");
+
+		Database hiveDatabase;
+		if (database instanceof HiveCatalogDatabase) {
+			hiveDatabase = instantiateHiveDatabase(databaseName, (HiveCatalogDatabase) database);
+		} else if (database instanceof GenericCatalogDatabase) {
+			hiveDatabase = instantiateHiveDatabase(databaseName, (GenericCatalogDatabase) database);
+		} else {
+			throw new CatalogException(String.format("Unsupported catalog database type %s", database.getClass()),
null);
+		}
 
-		return new Database(
-			databaseName,
-			catalogDatabase.getComment(),
-			hiveCatalogDatabase.getLocation(),
-			hiveCatalogDatabase.getProperties());
+		createHiveDatabase(hiveDatabase, ignoreIfExists);
 	}
 
-	// ------ tables and views------
+	private static Database instantiateHiveDatabase(String databaseName, HiveCatalogDatabase
database) {
+		return new Database(databaseName,
+			database.getComment(),
+			database.getLocation(),
+			database.getProperties());
+	}
+
+	private static Database instantiateHiveDatabase(String databaseName, GenericCatalogDatabase
database) {
+		// Add a property to make it as a generic catalog database
+		Map<String, String> properties = database.getProperties();
+
+		return new Database(databaseName,
+			database.getComment(),
+			// HDFS location URI which GenericCatalogDatabase shouldn't care
+			null,
+			maskFlinkProperties(properties));
+	}
 
 	@Override
-	protected void validateCatalogBaseTable(CatalogBaseTable table)
-			throws CatalogException {
-		if (!(table instanceof HiveCatalogTable) && !(table instanceof HiveCatalogView))
{
+	public void alterDatabase(String databaseName, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+			throws DatabaseNotExistException, CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be
null or empty");
+		checkNotNull(newDatabase, "newDatabase cannot be null");
+
+		if (newDatabase instanceof HiveCatalogDatabase) {
+			alterHiveDatabase(databaseName, instantiateHiveDatabase(databaseName, (HiveCatalogDatabase)
newDatabase), ignoreIfNotExists);
+		} else if (newDatabase instanceof GenericCatalogDatabase) {
+			alterHiveDatabase(databaseName, instantiateHiveDatabase(databaseName, (GenericCatalogDatabase)
newDatabase), ignoreIfNotExists);
+		} else {
+			throw new CatalogException(String.format("Unsupported catalog database type %s", newDatabase.getClass()),
null);
+		}
+	}
+
+	@Override
+	public List<String> listDatabases() throws CatalogException {
+		try {
+			return client.getAllDatabases();
+		} catch (TException e) {
 			throw new CatalogException(
-				"HiveCatalog can only operate on HiveCatalogTable and HiveCatalogView.");
+				String.format("Failed to list all databases in %s", catalogName), e);
 		}
 	}
 
 	@Override
-	protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
-		// Table schema
-		TableSchema tableSchema =
-			HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+	public boolean databaseExists(String databaseName) throws CatalogException {
+		try {
+			return client.getDatabase(databaseName) != null;
+		} catch (NoSuchObjectException e) {
+			return false;
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to determine whether database %s exists or not", databaseName),
e);
+		}
+	}
+
+	@Override
+	public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException,
+			DatabaseNotEmptyException, CatalogException {
+		try {
+			client.dropDatabase(name, true, ignoreIfNotExists);
+		} catch (NoSuchObjectException e) {
+			if (!ignoreIfNotExists) {
+				throw new DatabaseNotExistException(catalogName, name);
+			}
+		} catch (InvalidOperationException e) {
+			throw new DatabaseNotEmptyException(catalogName, name);
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to drop database %s", name), e);
+		}
+	}
+
+	private Database getHiveDatabase(String databaseName) throws DatabaseNotExistException {
+		try {
+			return client.getDatabase(databaseName);
+		} catch (NoSuchObjectException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to get database %s from %s", databaseName, catalogName), e);
+		}
+	}
+
+	private void createHiveDatabase(Database hiveDatabase, boolean ignoreIfExists)
+			throws DatabaseAlreadyExistException, CatalogException {
+		try {
+			client.createDatabase(hiveDatabase);
+		} catch (AlreadyExistsException e) {
+			if (!ignoreIfExists) {
+				throw new DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+			}
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to create database %s", hiveDatabase.getName()),
e);
+		}
+	}
+
+	private void alterHiveDatabase(String name, Database newHiveDatabase, boolean ignoreIfNotExists)
+			throws DatabaseNotExistException, CatalogException {
+		try {
+			if (databaseExists(name)) {
+				client.alterDatabase(name, newHiveDatabase);
+			} else if (!ignoreIfNotExists) {
+				throw new DatabaseNotExistException(catalogName, name);
+			}
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to alter database %s", name), e);
+		}
+	}
+
+	// ------ tables ------
+
+	@Override
+	public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException
{
+		checkNotNull(tablePath, "tablePath cannot be null");
+
+		Table hiveTable = getHiveTable(tablePath);
+		return instantiateHiveCatalogTable(hiveTable);
+	}
+
+	@Override
+	public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+			throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+		checkNotNull(table, "table cannot be null");
+
+		if (!databaseExists(tablePath.getDatabaseName())) {
+			throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
+		}
+
+		Table hiveTable = instantiateHiveTable(tablePath, table);
+
+		try {
+			client.createTable(hiveTable);
+		} catch (AlreadyExistsException e) {
+			if (!ignoreIfExists) {
+				throw new TableAlreadyExistException(catalogName, tablePath);
+			}
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()),
e);
+		}
+	}
+
+	@Override
+	public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+			throws TableNotExistException, TableAlreadyExistException, CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(newTableName), "newTableName cannot be
null or empty");
+
+		try {
+			// alter_table() doesn't throw a clear exception when target table doesn't exist.
+			// Thus, check the table existence explicitly
+			if (tableExists(tablePath)) {
+				ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
+				// alter_table() doesn't throw a clear exception when new table already exists.
+				// Thus, check the table existence explicitly
+				if (tableExists(newPath)) {
+					throw new TableAlreadyExistException(catalogName, newPath);
+				} else {
+					Table table = getHiveTable(tablePath);
+					table.setTableName(newTableName);
+					client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
+				}
+			} else if (!ignoreIfNotExists) {
+				throw new TableNotExistException(catalogName, tablePath);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to rename table %s", tablePath.getFullName()), e);
+		}
+	}
+
+	@Override
+	public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists)
+			throws TableNotExistException, CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+		checkNotNull(newCatalogTable, "newCatalogTable cannot be null");
+
+		if (!tableExists(tablePath)) {
+			if (!ignoreIfNotExists) {
+				throw new TableNotExistException(catalogName, tablePath);
+			}
+			return;
+		}
+
+		Table oldTable = getHiveTable(tablePath);
+		TableType oldTableType = TableType.valueOf(oldTable.getTableType());
+
+		if (oldTableType == TableType.VIRTUAL_VIEW) {
+			if (!(newCatalogTable instanceof CatalogView)) {
+				throw new CatalogException(
+					String.format("Table types don't match. The existing table is a view, but the new catalog
base table is not."));
+			}
+			// Else, do nothing
+		} else if ((oldTableType == TableType.MANAGED_TABLE)) {
+			if (!(newCatalogTable instanceof CatalogTable)) {
+				throw new CatalogException(
+					String.format("Table types don't match. The existing table is a table, but the new catalog
base table is not."));
+			}
+			// Else, do nothing
+		} else {
+			throw new CatalogException(
+				String.format("Hive table type '%s' is not supported yet.",
+					oldTableType.name()));
+		}
+
+		Table newTable = instantiateHiveTable(tablePath, newCatalogTable);
+
+		// client.alter_table() requires a valid location
+		// thus, if new table doesn't have that, it reuses location of the old table
+		if (!newTable.getSd().isSetLocation()) {
+			newTable.getSd().setLocation(oldTable.getSd().getLocation());
+		}
+
+		try {
+			client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable);
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to rename table %s", tablePath.getFullName()),
e);
+		}
+	}
+
+	@Override
+	public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException,
CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+
+		try {
+			client.dropTable(
+				tablePath.getDatabaseName(),
+				tablePath.getObjectName(),
+				// Indicate whether associated data should be deleted.
+				// Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed
later if necessary
+				true,
+				ignoreIfNotExists);
+		} catch (NoSuchObjectException e) {
+			if (!ignoreIfNotExists) {
+				throw new TableNotExistException(catalogName, tablePath);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to drop table %s", tablePath.getFullName()), e);
+		}
+	}
+
+	@Override
+	public List<String> listTables(String databaseName) throws DatabaseNotExistException,
CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be
null or empty");
+
+		try {
+			return client.getAllTables(databaseName);
+		} catch (UnknownDBException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list tables in database %s", databaseName), e);
+		}
+	}
+
+	@Override
+	public List<String> listViews(String databaseName) throws DatabaseNotExistException,
CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be
null or empty");
+
+		try {
+			return client.getTables(
+				databaseName,
+				null, // table pattern
+				TableType.VIRTUAL_VIEW);
+		} catch (UnknownDBException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list views in database %s", databaseName), e);
+		}
+	}
+
+	@Override
+	public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+
+		try {
+			return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
+		} catch (UnknownDBException e) {
+			return false;
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()),
e);
+		}
+	}
+
+	private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
+		try {
+			return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+		} catch (NoSuchObjectException e) {
+			throw new TableNotExistException(catalogName, tablePath);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()),
e);
+		}
+	}
+
+	private static CatalogBaseTable instantiateHiveCatalogTable(Table hiveTable) {
+		boolean isView = TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW;
 
 		// Table properties
 		Map<String, String> properties = hiveTable.getParameters();
-
-		// Table comment
+		boolean isGeneric = Boolean.valueOf(properties.get(GENERC_META_PROPERTY_KEY));
+		if (isGeneric) {
+			properties = retrieveFlinkProperties(properties);
+		}
 		String comment = properties.remove(HiveTableConfig.TABLE_COMMENT);
 
+		// Table schema
+		TableSchema tableSchema =
+			HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+
 		// Partition keys
 		List<String> partitionKeys = new ArrayList<>();
-
 		if (!hiveTable.getPartitionKeys().isEmpty()) {
-			partitionKeys = hiveTable.getPartitionKeys().stream()
-				.map(fs -> fs.getName())
-				.collect(Collectors.toList());
+			partitionKeys = hiveTable.getPartitionKeys().stream().map(fs -> fs.getName()).collect(Collectors.toList());
 		}
 
-		if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
-			return new HiveCatalogView(
-				hiveTable.getViewOriginalText(),
-				hiveTable.getViewExpandedText(),
-				tableSchema,
-				properties,
-				comment
-			);
+		if (isView) {
+			if (isGeneric) {
+				return new GenericCatalogView(
+					hiveTable.getViewOriginalText(),
+					hiveTable.getViewExpandedText(),
+					tableSchema,
+					properties,
+					comment
+				);
+			} else {
+				return new HiveCatalogView(
+					hiveTable.getViewOriginalText(),
+					hiveTable.getViewExpandedText(),
+					tableSchema,
+					properties,
+					comment
+				);
+			}
 		} else {
-			return new HiveCatalogTable(
-				tableSchema, partitionKeys, properties, comment);
+			if (isGeneric) {
+				return new GenericCatalogTable(tableSchema, partitionKeys, properties, comment);
+			} else {
+				return new HiveCatalogTable(tableSchema, partitionKeys, properties, comment);
+			}
 		}
 	}
 
-	@Override
-	protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
-		Map<String, String> properties = new HashMap<>(table.getProperties());
-
-		// Table comment
-		properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());
-
+	private  static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table)
{
 		Table hiveTable = new Table();
 		hiveTable.setDbName(tablePath.getDatabaseName());
 		hiveTable.setTableName(tablePath.getObjectName());
 		hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
 
+		Map<String, String> properties = new HashMap<>(table.getProperties());
+		// Table comment
+		properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());
+		if (table instanceof GenericCatalogTable || table instanceof GenericCatalogView) {
+			properties = maskFlinkProperties(properties);
 
 Review comment:
   do we need to add the "flink.is_generic" key?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message