DRILL-5516: Limit memory usage for Hbase reader
close apache/drill#839
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7f98400f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7f98400f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7f98400f
Branch: refs/heads/master
Commit: 7f98400f949b04eb06415aeb3a3265693629371c
Parents: aa39c66
Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
Authored: Mon May 15 15:51:02 2017 +0000
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Fri May 19 10:05:26 2017 -0700
----------------------------------------------------------------------
.../exec/store/hbase/HBaseRecordReader.java | 41 ++++++++++++++++----
1 file changed, 33 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/7f98400f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index b3a7039..3f308ce 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -59,6 +59,10 @@ import com.google.common.collect.Sets;
public class HBaseRecordReader extends AbstractRecordReader implements DrillHBaseConstants
{
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
+ // batch should not exceed this value to avoid OOM on a busy system
+ private static final int MAX_ALLOCATED_MEMORY_PER_BATCH = 64 * 1024 * 1024; // 64 mb in
bytes
+
+ // batch size should not exceed max allowed record count
private static final int TARGET_RECORD_COUNT = 4000;
private OutputMutator outputMutator;
@@ -134,7 +138,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException
{
this.operatorContext = context;
this.outputMutator = output;
- familyVectorMap = new HashMap<String, MapVector>();
+ familyVectorMap = new HashMap<>();
try {
hTable = connection.getTable(hbaseTableName);
@@ -187,8 +191,8 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
}
int rowCount = 0;
- done:
- for (; rowCount < TARGET_RECORD_COUNT; rowCount++) {
+ // if allocated memory for the first row is larger than allowed max in batch, it will
be added anyway
+ do {
Result result = null;
final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
try {
@@ -206,13 +210,17 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
throw new DrillRuntimeException(e);
}
if (result == null) {
- break done;
+ break;
}
// parse the result and populate the value vectors
Cell[] cells = result.rawCells();
if (rowKeyVector != null) {
- rowKeyVector.getMutator().setSafe(rowCount, cells[0].getRowArray(), cells[0].getRowOffset(),
cells[0].getRowLength());
+ rowKeyVector.getMutator().setSafe(
+ rowCount,
+ cells[0].getRowArray(),
+ cells[0].getRowOffset(),
+ cells[0].getRowLength());
}
if (!rowKeyOnly) {
for (final Cell cell : cells) {
@@ -224,7 +232,8 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
final int qualifierOffset = cell.getQualifierOffset();
final int qualifierLength = cell.getQualifierLength();
final byte[] qualifierArray = cell.getQualifierArray();
- final NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray,
qualifierOffset, qualifierLength));
+ final NullableVarBinaryVector v = getOrCreateColumnVector(mv,
+ new String(qualifierArray, qualifierOffset, qualifierLength));
final int valueOffset = cell.getValueOffset();
final int valueLength = cell.getValueLength();
@@ -232,7 +241,8 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
v.getMutator().setSafe(rowCount, valueArray, valueOffset, valueLength);
}
}
- }
+ rowCount++;
+ } while (canAddNewRow(rowCount));
setOutputRowCount(rowCount);
logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
@@ -289,4 +299,19 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
rowKeyVector.getMutator().setValueCount(count);
}
}
+
+ /**
+ * Checks if new row can be added in batch. Row can be added if:
+ * <ul>
+ * <li>current row count does not exceed max allowed one</li>
+ * <li>allocated memory does not exceed max allowed one</li>
+ * </ul>
+ *
+ * @param rowCount current row count
+ * @return true if new row can be added in batch, false otherwise
+ */
+ private boolean canAddNewRow(int rowCount) {
+ return rowCount < TARGET_RECORD_COUNT &&
+ operatorContext.getAllocator().getAllocatedMemory() < MAX_ALLOCATED_MEMORY_PER_BATCH;
+ }
}
|