Repository: drill
Updated Branches:
refs/heads/master c1b847acd -> 403dc5cdc
DRILL-3147: tpcds-sf1-parquet query 73 causes memory leak
- each time a fragment A sends a "receiver finished" to fragment B, fragment B id will be
added to FragmentContext.ignoredSenders list
- refactored UnorderedReceiverBatch.informSenders() and MergingRecordBatch.informSenders()
by moving this method to FragmentContext
- DataServer.send() uses FragmentContext.ignoredSenders to decide if a batch should be
passed to the fragment or discarded right away
- BaseRawBatchBuffer methods enqueue() and kill() are now synchronized
- TestTpcdsSf1Leak test reproduces the leak, it's ignored by default because it requires
a large dataset
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/403dc5cd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/403dc5cd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/403dc5cd
Branch: refs/heads/master
Commit: 403dc5cdc6d34b24c1caca7cb574eb9e9727afe4
Parents: c1b847a
Author: adeneche <adeneche@gmail.com>
Authored: Mon May 18 18:02:12 2015 -0700
Committer: Jason Altekruse <altekrusejason@gmail.com>
Committed: Thu Jun 25 12:01:00 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/rpc/data/DataServer.java | 41 +++--------
.../exec/work/batch/BaseRawBatchBuffer.java | 11 ++-
.../exec/work/fragment/RootFragmentManager.java | 2 +-
.../java/org/apache/drill/BaseTestQuery.java | 14 ++--
.../drill/exec/server/TestTpcdsSf1Leaks.java | 75 ++++++++++++++++++++
.../src/test/resources/tpcds-sf1/q73.sql | 27 +++++++
6 files changed, 129 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 80d2d6e..4908c18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -122,7 +122,6 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
final FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER);
final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
- Pointer<DrillBuf> out = new Pointer<DrillBuf>();
AckSender ack = new AckSender(sender);
// increment so we don't get false returns.
ack.increment();
@@ -139,12 +138,8 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
}
}else{
- if (targetCount > 1) {
- for (int minor = 0; minor < targetCount; minor++) {
- send(fragmentBatch, (DrillBuf) body, minor, ack, true);
- }
- } else {
- send(fragmentBatch, (DrillBuf) body, 0, ack, false);
+ for (int minor = 0; minor < targetCount; minor++) {
+ send(fragmentBatch, (DrillBuf) body, minor, ack);
}
}
} catch (IOException | FragmentSetupException e) {
@@ -158,34 +153,23 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
// decrement the extra reference we grabbed at the top.
ack.sendOk();
- if(out != null && out.value != null){
- out.value.release();
- }
}
}
- private void send(final FragmentRecordBatch fragmentBatch, final DrillBuf body, final int
minor, final AckSender ack,
- final boolean shared)
+ private void send(final FragmentRecordBatch fragmentBatch, final DrillBuf body, final int
minor, final AckSender ack)
throws FragmentSetupException, IOException {
- FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
+ final FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
if (manager == null) {
return;
}
final BufferAllocator allocator = manager.getFragmentContext().getAllocator();
- final Pointer<DrillBuf> out = new Pointer<DrillBuf>();
+ final Pointer<DrillBuf> out = new Pointer<>();
final boolean withinMemoryEnvelope;
- final DrillBuf submitBody;
-
- if (shared) {
- withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body, out);
- submitBody = out.value;
- }else{
- withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body.unwrap());
- submitBody = body;
- }
+
+ withinMemoryEnvelope = allocator.takeOwnership(body, out);
if (!withinMemoryEnvelope) {
// if we over reserved, we need to add poison pill before batch.
@@ -193,14 +177,11 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
}
ack.increment();
- dataHandler.handle(manager, fragmentBatch, submitBody, ack);
-
- if (shared) {
- // make sure to release the reference count we have to the new buffer.
- // dataHandler.handle should have taken any ownership it needed.
- out.value.release();
- }
+ dataHandler.handle(manager, fragmentBatch, out.value, ack);
+ // make sure to release the reference count we have to the new buffer.
+ // dataHandler.handle should have taken any ownership it needed.
+ out.value.release();
}
private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 11b6cc8..fbffd87 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -26,9 +26,9 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RawFragmentBatch;
public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRawBatchBuffer.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRawBatchBuffer.class);
- private static enum BufferState {
+ private enum BufferState {
INIT,
STREAMS_FINISHED,
KILLED
@@ -61,7 +61,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer
{
}
@Override
- public void enqueue(final RawFragmentBatch batch) throws IOException {
+ public synchronized void enqueue(final RawFragmentBatch batch) throws IOException {
// if this fragment is already canceled or failed, we shouldn't need any or more stuff.
We do the null check to
// ensure that tests run.
@@ -113,8 +113,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer
{
public void close() {
if (!isTerminated() && context.shouldContinue()) {
final String msg = String.format("Cleanup before finished. %d out of %d strams have
finished", completedStreams(), fragmentCount);
- final IllegalStateException e = new IllegalStateException(msg);
- throw e;
+ throw new IllegalStateException(msg);
}
if (!bufferQueue.isEmpty()) {
@@ -127,7 +126,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer
{
}
@Override
- public void kill(final FragmentContext context) {
+ public synchronized void kill(final FragmentContext context) {
state = BufferState.KILLED;
clearBufferWithBody();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index b770a33..f4f76dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.work.batch.IncomingBuffers;
// TODO a lot of this is the same as NonRootFragmentManager
public class RootFragmentManager implements FragmentManager {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
private final IncomingBuffers buffers;
private final FragmentExecutor runner;
http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 3d09d6a..46186df 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URL;
-import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -47,11 +46,8 @@ import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.util.JsonStringArrayList;
-import org.apache.drill.exec.util.JsonStringHashMap;
import org.apache.drill.exec.util.TestUtilities;
import org.apache.drill.exec.util.VectorUtil;
-import org.apache.hadoop.io.Text;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.rules.TestRule;
@@ -65,6 +61,7 @@ import com.google.common.io.Resources;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
public class BaseTestQuery extends ExecTest {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
@@ -393,6 +390,15 @@ public class BaseTestQuery extends ExecTest {
return dir.getAbsolutePath() + File.separator + dirName;
}
+
+ protected static void setSessionOption(final String option, final String value) {
+ try {
+ runSQL(String.format("alter session set `%s` = %s", option, value));
+ } catch(final Exception e) {
+ fail(String.format("Failed to set session option `%s` = %s, Error: %s", option, value,
e.toString()));
+ }
+ }
+
private static class SilentListener implements UserResultsListener {
private volatile UserException exception;
private AtomicInteger count = new AtomicInteger();
http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java
new file mode 100644
index 0000000..ba19e0d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import org.apache.drill.BaseTestQuery;
+
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
+import static org.junit.Assert.fail;
+
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+
+/**
+ * To run this unit class you need to download the following data file:
+ * http://apache-drill.s3.amazonaws.com/files/tpcds-sf1-parquet.tgz
+ * and untar it in a some folder (e.g. /tpcds-sf1-parquet) then add the following workspace
to
+ * exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+ *
+ * ,"tpcds" : {
+ * location: "/tpcds-sf1-parquet",
+ * writable: false
+ * }
+ *
+ */
+@Ignore
+public class TestTpcdsSf1Leaks extends BaseTestQuery {
+
+ @Rule
+ final public TestRule TIMEOUT = new Timeout(0); // wait forever
+
+ @BeforeClass
+ public static void initCluster() {
+ updateTestCluster(3, null);
+ }
+
+ @Test
+ public void test() throws Exception {
+ setSessionOption(SLICE_TARGET, "10");
+ try {
+ final String query = getFile("tpcds-sf1/q73.sql");
+ for (int i = 0; i < 20; i++) {
+ System.out.printf("%nRun #%d%n", i+1);
+
+ try {
+ runSQL(query);
+ } catch (final Exception e) {
+ fail("query failed: " + e.getMessage());
+ }
+ }
+ }finally {
+ setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql b/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql
new file mode 100644
index 0000000..094ca2b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql
@@ -0,0 +1,27 @@
+select c.c_last_name,
+ c.c_first_name,
+ c.c_salutation,
+ c.c_preferred_cust_flag,
+ dj.sstn,
+ dj.cnt
+from (
+ select ss.ss_ticket_number as sstn, ss.ss_customer_sk as sscsk, count(*) cnt
+ from dfs_test.tpcds.store_sales as ss,
+ dfs_test.tpcds.date_dim as d,
+ dfs_test.tpcds.store as s,
+ dfs_test.tpcds.household_demographics as hd
+ where ss.ss_sold_date_sk = d.d_date_sk
+ and ss.ss_store_sk = s.s_store_sk
+ and ss.ss_hdemo_sk = hd.hd_demo_sk
+ and (hd.hd_buy_potential = '>10000' or hd.hd_buy_potential = 'unknown')
+ and hd.hd_vehicle_count > 0
+ and case when hd.hd_vehicle_count > 0 then hd.hd_dep_count / hd.hd_vehicle_count else
null end > 1
+ and s.s_county in ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County')
+ and ss.ss_sold_date_sk between 2451180 and 2451269
+ group by ss.ss_ticket_number, ss.ss_customer_sk
+) dj,
+ dfs_test.tpcds.customer as c
+where dj.sscsk = c.c_customer_sk
+ and dj.cnt between 1 and 5
+order by dj.cnt desc
+limit 1000
\ No newline at end of file
|