kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [1/3] kudu git commit: KUDU-2249 Avoid sharing the client between the InputFormat and RecordReader
Date Mon, 08 Jan 2018 19:38:45 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 391e3255d -> 1277f69a1


KUDU-2249 Avoid sharing the client between the InputFormat and RecordReader

This commit prevents a possible race condition between getSplits() method and
TableRecordReader in the KuduTableInputFormat, when both try to access and
shutdown the KuduClient.

Both are sharing the same client and shut it down after use. In some scenarios
the client might still be accessed after that and throwing an error.
So the TableRecordReader gets its own client with this commit. This increases
the number of opened Kudu clients by a MR application at most by one (The one
that was shared by getSplits() with a TableRecordReader)
Also clarified the behaviour of MR applications and how many open Kudu clients
one might have to expect in total.

Change-Id: I24f45ee9253790c5348cabd0afe6c6a4b6d3f3d4
Reviewed-on: http://gerrit.cloudera.org:8080/8921
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <davidralves@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7f4157c4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7f4157c4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7f4157c4

Branch: refs/heads/master
Commit: 7f4157c416c1f17e6326753f2138bd17fa79ad66
Parents: 391e325
Author: Clemens Valiente <clemens.valiente@trivago.com>
Authored: Thu Dec 28 10:34:26 2017 +0100
Committer: David Ribeiro Alves <davidralves@gmail.com>
Committed: Mon Jan 8 18:57:11 2018 +0000

----------------------------------------------------------------------
 .../kudu/mapreduce/KuduTableInputFormat.java    | 66 +++++++++++++-------
 1 file changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7f4157c4/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
index fcbf10e..a018ae2 100644
--- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
@@ -70,9 +70,24 @@ import org.apache.kudu.client.RowResultIterator;
  *
  * <p>
  * Hadoop doesn't have the concept of "closing" the input format so in order to release the
- * resources we assume that once either {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)}
- * or {@link KuduTableInputFormat.TableRecordReader#close()} have been called that
- * the object won't be used again and the AsyncKuduClient is shut down.
+ * resources (mainly, the Kudu client) we assume that once either
+ * {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)}
+ * or {@link KuduTableInputFormat.TableRecordReader#close()}
+ * have been called that the object won't be used again and the AsyncKuduClient is shut down.
+ *
+ * To prevent a premature shutdown of the client, the KuduTableInputFormat and the
+ * TableRecordReader both get their own client that they don't share.
+ * </p>
+ *
+ * <p>
+ * Default behavior of hadoop is to call {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)}
+ * in the MRAppMaster and for each inputSplit (in our case, Kudu tablet) will spawn one Mapper
+ * with a TableRecordReader reading one Tablet.
+ *
+ * Therefore, total number of Kudu clients opened over the course of a MR application can
be
+ * estimated by (#Tablets +1). To reduce the number of concurrent open clients, it might
be
+ * advisable to restrict resources of the MR application or implement the
+ * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat} over this InputFormat.
  * </p>
  */
 @InterfaceAudience.Public
@@ -161,15 +176,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable,
RowResult>
       }
       return splits;
     } finally {
-      shutdownClient();
-    }
-  }
-
-  private void shutdownClient() throws IOException {
-    try {
-      client.shutdown();
-    } catch (Exception e) {
-      throw new IOException(e);
+      shutdownClient(client);
     }
   }
 
@@ -214,12 +221,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable,
RowResult>
 
     String tableName = conf.get(INPUT_TABLE_KEY);
     String masterAddresses = conf.get(MASTER_ADDRESSES_KEY);
-    this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY,
-                                           AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
-    this.client = new KuduClient.KuduClientBuilder(masterAddresses)
-                                .defaultOperationTimeoutMs(operationTimeoutMs)
-                                .build();
-    KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(client);
+    this.client = buildKuduClient();
     this.nameServer = conf.get(NAME_SERVER_KEY);
     this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false);
     this.isFaultTolerant = conf.getBoolean(FAULT_TOLERANT_SCAN, false);
@@ -263,6 +265,26 @@ public class KuduTableInputFormat extends InputFormat<NullWritable,
RowResult>
     }
   }
 
+  private KuduClient buildKuduClient() {
+
+    String masterAddresses = conf.get(MASTER_ADDRESSES_KEY);
+    this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY,
+        AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
+    KuduClient kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)
+        .defaultOperationTimeoutMs(operationTimeoutMs)
+        .build();
+    KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(kuduClient);
+    return kuduClient;
+  }
+
+  private void shutdownClient(KuduClient kuduClient) throws IOException {
+    try {
+      kuduClient.shutdown();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
   /**
    * Given a PTR string generated via reverse DNS lookup, return everything
    * except the trailing period. Example for host.example.com., return
@@ -384,6 +406,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable,
RowResult>
     private RowResultIterator iterator;
     private KuduScanner scanner;
     private TableSplit split;
+    private KuduClient kuduClient;
 
     @Override
     public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
@@ -393,9 +416,10 @@ public class KuduTableInputFormat extends InputFormat<NullWritable,
RowResult>
       }
 
       split = (TableSplit) inputSplit;
+      kuduClient = buildKuduClient();
       LOG.debug("Creating scanner for token: {}",
-                KuduScanToken.stringifySerializedToken(split.getScanToken(), client));
-      scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), client);
+                KuduScanToken.stringifySerializedToken(split.getScanToken(), kuduClient));
+      scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), kuduClient);
 
       // Calling this now to set iterator.
       tryRefreshIterator();
@@ -452,7 +476,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable,
RowResult>
       } catch (Exception e) {
         throw new IOException(e);
       }
-      shutdownClient();
+      shutdownClient(kuduClient);
     }
   }
 }


Mime
View raw message