hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (Jira)" <j...@apache.org>
Subject [jira] [Work logged] (HIVE-24918) Handle failover case during Repl Dump
Date Tue, 06 Jul 2021 00:52:00 GMT

     [ https://issues.apache.org/jira/browse/HIVE-24918?focusedWorklogId=618812&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-618812
]

ASF GitHub Bot logged work on HIVE-24918:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Jul/21 00:51
            Start Date: 06/Jul/21 00:51
    Worklog Time Spent: 10m 
      Work Description: hmangla98 commented on a change in pull request #2121:
URL: https://github.com/apache/hive/pull/2121#discussion_r664170277



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,226 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START
+ "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc
" +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\",
" +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after
first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**Open transactions can be of two types:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns
would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part
of dump operation.
+     */
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName
+ "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLock().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsAborted(txnsWithNoLocks, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    Path dbRootDir = new Path(dumpData.dumpLocation).getParent();
+    long prevDumpDirModifTime = getLatestDumpDirModifTime(dbRootDir);
+    primary.run("REPL DUMP " + primaryDbName + " with ('" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START
+ "' = 'true')");
+    assert dumpData.dumpLocation.equals(ReplUtils.getLatestDumpPath(dbRootDir, conf).toString());
+    assert prevDumpDirModifTime == getLatestDumpDirModifTime(dbRootDir);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    replica.load(replicatedDbName, primaryDbName);
+
+    fs.create(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()));
+    fs.create(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+
+    //Since the failover start config is disabled and previous valid dump directory contains
_failover_ready marker file
+    //So, this dump iteration will perform bootstrap dump instead of incremental.
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+  }
+
+  private long getLatestDumpDirModifTime(Path dumpRoot) throws Exception {
+    FileSystem fs = dumpRoot.getFileSystem(conf);
+    long latestModifTime = -1;
+    if (fs.exists(dumpRoot)) {
+      for (FileStatus status : fs.listStatus(dumpRoot)) {
+        if (status.getModificationTime() > latestModifTime) {
+          latestModifTime = status.getModificationTime();
+        }
+      }
+    }
+    return latestModifTime;
+  }
+
+  @Test
+  public void testFailoverDuringDumpWithPreviousFailed() throws Throwable {
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START
+ "'='true'",
+            "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc
" +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\",
" +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after
first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    dumpData = primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData previousFmd = new FailoverMetaData(dumpPath, conf);
+    Long failoverEventId = previousFmd.getFailoverEventId();
+    assertTrue(failoverEventId >= Long.parseLong(dumpData.lastReplicationId));
+
+    fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()));
+
+    dumpData = primary.run("use " + primaryDbName)
+            .run("insert into t2 partition(name='Carl') values(10)")
+            .dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    assertTrue(failoverEventId >= Long.parseLong(dumpData.lastReplicationId));

Review comment:
       failoverEventId is captured prior to insert operation. So, dump operation will dump
only the events till the eventId present in _failover_metadata file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 618812)
    Time Spent: 2.5h  (was: 2h 20m)

> Handle failover case during Repl Dump
> -------------------------------------
>
>                 Key: HIVE-24918
>                 URL: https://issues.apache.org/jira/browse/HIVE-24918
>             Project: Hive
>          Issue Type: New Feature
>            Reporter: Haymant Mangla
>            Assignee: Haymant Mangla
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> To handle:
>  a) Whenever user wants to go ahead with failover, during the next or subsequent repl
dump operation upon confirming that there are no pending open transaction events, It should
create a _failover_ready marker file in the dump dir. This marker file would contain scheduled
query name
> that has generated this dump.
> b) Skip next repl dump instances once we have the marker file placed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message