flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10637) Start MiniCluster with random REST port
Date Wed, 24 Oct 2018 14:20:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662340#comment-16662340 ] 

ASF GitHub Bot commented on FLINK-10637:
----------------------------------------

asfgit closed pull request #6899: [FLINK-10637] Use MiniClusterResource for tests in flink-runtime
URL: https://github.com/apache/flink/pull/6899
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 86821a5dd2f..56c64aef9ed 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -24,6 +24,8 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -32,8 +34,6 @@
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 72962696945..d4feac177f2 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -25,8 +25,8 @@
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.modules.HadoopModule;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.test.util.TestingSecurityContext;
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 3fd687397a1..9a11e7f4834 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -24,6 +24,7 @@
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,8 +32,7 @@
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.AfterClass;
@@ -70,7 +70,7 @@
 	private static Properties standardProps;
 
 	@ClassRule
-	public static MiniClusterResource flink = new MiniClusterResource(
+	public static MiniClusterWithClientResource flink = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(NUM_TMS)
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 05307acfe59..1a20d7e82ad 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -24,10 +24,10 @@
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -81,7 +81,7 @@
 	protected static Properties standardProps;
 
 	@ClassRule
-	public static MiniClusterResource flink = new MiniClusterResource(
+	public static MiniClusterWithClientResource flink = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getFlinkConfiguration())
 			.setNumberTaskManagers(NUM_TMS)
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index ab8d6b99b4e..d5698661852 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.streaming.test.examples.windowing;
 
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
 import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -41,7 +41,7 @@
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	@ClassRule
-	public static MiniClusterResource miniClusterResource = new MiniClusterResource(
+	public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(1)
 			.setNumberSlotsPerTaskManager(1)
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
index 4c8440df933..79ffbe0c624 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
@@ -22,10 +22,10 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -71,7 +71,7 @@
 	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(1)
 			.setNumberSlotsPerTaskManager(1)
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index 194b657ec62..5ef129ad7d3 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.ml.util
 
-import org.apache.flink.test.util.{MiniClusterResource, MiniClusterResourceConfiguration}
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
+import org.apache.flink.test.util.MiniClusterWithClientResource
 import org.scalatest.{BeforeAndAfter, Suite}
 
 /** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
@@ -50,11 +51,11 @@ import org.scalatest.{BeforeAndAfter, Suite}
 trait FlinkTestBase extends BeforeAndAfter {
   that: Suite =>
 
-  var cluster: Option[MiniClusterResource] = None
+  var cluster: Option[MiniClusterWithClientResource] = None
   val parallelism = 4
 
   before {
-    val cl = new MiniClusterResource(
+    val cl = new MiniClusterWithClientResource(
       new MiniClusterResourceConfiguration.Builder()
         .setNumberTaskManagers(1)
         .setNumberSlotsPerTaskManager(parallelism)
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index eb8b56a4dc4..0ec76054633 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -30,6 +30,7 @@
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.ViewEntry;
@@ -40,8 +41,7 @@
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.TestLogger;
@@ -83,7 +83,7 @@
 	public static TemporaryFolder tempFolder = new TemporaryFolder();
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfig())
 			.setNumberTaskManagers(NUM_TMS)
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 179cf9c6de8..915c02b644d 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -37,8 +37,8 @@
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -62,7 +62,7 @@
 public class JMXJobManagerMetricTest extends TestLogger {
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberSlotsPerTaskManager(1)
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index ff84775f162..9755b52e2f1 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -27,8 +27,8 @@
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.apache.curator.test.TestingServer;
 import org.junit.AfterClass;
@@ -55,7 +55,7 @@
 
 	private static TestingServer zkServer;
 
-	private static MiniClusterResource miniClusterResource;
+	private static MiniClusterWithClientResource miniClusterResource;
 
 	@Override
 	protected AbstractStateBackend createStateBackend() throws Exception {
@@ -68,7 +68,7 @@ public static void setup() throws Exception {
 
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
-		miniClusterResource = new MiniClusterResource(
+		miniClusterResource = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(getConfig())
 				.setNumberTaskManagers(NUM_TMS)
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 828123ed2c9..94baaed3745 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -27,8 +27,8 @@
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.apache.curator.test.TestingServer;
 import org.junit.AfterClass;
@@ -55,7 +55,7 @@
 
 	private static TestingServer zkServer;
 
-	private static MiniClusterResource miniClusterResource;
+	private static MiniClusterWithClientResource miniClusterResource;
 
 	@Override
 	protected AbstractStateBackend createStateBackend() throws Exception {
@@ -68,7 +68,7 @@ public static void setup() throws Exception {
 
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
-		miniClusterResource = new MiniClusterResource(
+		miniClusterResource = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(getConfig())
 				.setNumberTaskManagers(NUM_TMS)
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index edf0b6224e8..9116304ebf5 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -26,8 +26,8 @@
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -52,7 +52,7 @@
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfig())
 			.setNumberTaskManagers(NUM_TMS)
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index b9fd4534db1..ce3e665fce6 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -26,8 +26,8 @@
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -52,7 +52,7 @@
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfig())
 			.setNumberTaskManagers(NUM_TMS)
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 3ae830d0618..7a287cf85be 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -30,9 +30,9 @@
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -79,7 +79,7 @@
 	private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration();
 
 	@ClassRule
-	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+	public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(CLUSTER_CONFIGURATION)
 			.setNumberTaskManagers(NUM_TASK_MANAGERS)
@@ -117,7 +117,7 @@ public void tearDown() {
 	@Test
 	public void getFrontPage() {
 		try {
-			String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/index.html");
+			String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/index.html");
 			String text = "Apache Flink Dashboard";
 			assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
 		} catch (Exception e) {
@@ -126,10 +126,14 @@ public void getFrontPage() {
 		}
 	}
 
+	private int getRestPort() {
+		return CLUSTER.getRestAddres().getPort();
+	}
+
 	@Test
 	public void testResponseHeaders() throws Exception {
 		// check headers for successful json response
-		URL taskManagersUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers");
+		URL taskManagersUrl = new URL("http://localhost:" + getRestPort() + "/taskmanagers");
 		HttpURLConnection taskManagerConnection = (HttpURLConnection) taskManagersUrl.openConnection();
 		taskManagerConnection.setConnectTimeout(100000);
 		taskManagerConnection.connect();
@@ -145,7 +149,7 @@ public void testResponseHeaders() throws Exception {
 		Assert.assertEquals("application/json; charset=UTF-8", taskManagerConnection.getContentType());
 
 		// check headers in case of an error
-		URL notFoundJobUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/jobs/dontexist");
+		URL notFoundJobUrl = new URL("http://localhost:" + getRestPort() + "/jobs/dontexist");
 		HttpURLConnection notFoundJobConnection = (HttpURLConnection) notFoundJobUrl.openConnection();
 		notFoundJobConnection.setConnectTimeout(100000);
 		notFoundJobConnection.connect();
@@ -161,7 +165,7 @@ public void testResponseHeaders() throws Exception {
 	@Test
 	public void getNumberOfTaskManagers() {
 		try {
-			String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
+			String json = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/");
 
 			ObjectMapper mapper = new ObjectMapper();
 			JsonNode response = mapper.readTree(json);
@@ -177,7 +181,7 @@ public void getNumberOfTaskManagers() {
 
 	@Test
 	public void getTaskmanagers() throws Exception {
-		String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
+		String json = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/");
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode parsed = mapper.readTree(json);
@@ -197,18 +201,18 @@ public void getLogAndStdoutFiles() throws Exception {
 		WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
 
 		FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
-		String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/log");
+		String logs = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/log");
 		assertTrue(logs.contains("job manager log"));
 
 		FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
-		logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/stdout");
+		logs = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/stdout");
 		assertTrue(logs.contains("job manager out"));
 	}
 
 	@Test
 	public void getTaskManagerLogAndStdoutFiles() {
 		try {
-			String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
+			String json = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/");
 
 			ObjectMapper mapper = new ObjectMapper();
 			JsonNode parsed = mapper.readTree(json);
@@ -220,11 +224,11 @@ public void getTaskManagerLogAndStdoutFiles() {
 
 			//we check for job manager log files, since no separate taskmanager logs exist
 			FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
-			String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/log");
+			String logs = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/" + id + "/log");
 			assertTrue(logs.contains("job manager log"));
 
 			FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
-			logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/stdout");
+			logs = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/" + id + "/stdout");
 			assertTrue(logs.contains("job manager out"));
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -235,7 +239,7 @@ public void getTaskManagerLogAndStdoutFiles() {
 	@Test
 	public void getConfiguration() {
 		try {
-			String config = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/config");
+			String config = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/config");
 
 			Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
 			assertEquals(
@@ -275,7 +279,7 @@ public void testStop() throws Exception {
 		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
 		final Deadline deadline = testTimeout.fromNow();
 
-		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
+		try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) {
 			// stop the job
 			client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
 			HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
@@ -291,7 +295,7 @@ public void testStop() throws Exception {
 		}
 
 		// ensure we can access job details when its finished (FLINK-4011)
-		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
+		try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) {
 			FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
 			client.sendGetRequest("/jobs/" + jid + "/config", timeout);
 			HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout);
@@ -334,7 +338,7 @@ public void testStopYarn() throws Exception {
 		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
 		final Deadline deadline = testTimeout.fromNow();
 
-		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
+		try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) {
 			// Request the file from the web server
 			client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index a2138e14383..5fc3ff5c067 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -26,9 +26,10 @@
 import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -43,7 +44,7 @@
 /**
  * Tests for the {@link JarRunHandler}.
  */
-public class JarRunHandlerTest {
+public class JarRunHandlerTest extends TestLogger {
 
 	@ClassRule
 	public static final TemporaryFolder TMP = new TemporaryFolder();
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 18dd76e05b0..fce2b6dcc31 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -26,9 +26,9 @@
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -63,7 +63,7 @@
 	@ClassRule
 	public static final TemporaryFolder TMP = new TemporaryFolder();
 
-	private MiniClusterResource cluster;
+	private MiniClusterWithClientResource cluster;
 	private File jmDirectory;
 	private File hsDirectory;
 
@@ -75,7 +75,7 @@ public void setUp() throws Exception {
 		Configuration clusterConfig = new Configuration();
 		clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
 
-		cluster = new MiniClusterResource(
+		cluster = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(clusterConfig)
 				.setNumberTaskManagers(1)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index f6689fe72d2..f32b8398c02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -31,13 +31,12 @@
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 /**
@@ -52,30 +51,20 @@
 
 	private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
 
-	private static MiniCluster flink;
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getFlinkConfiguration())
+			.setNumberTaskManagers(NUMBER_OF_TMS)
+			.setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
+			.build());
 
-	@BeforeClass
-	public static void setUp() throws Exception {
+	private static Configuration getFlinkConfiguration() {
 		final Configuration config = new Configuration();
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
 
-		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-			.setConfiguration(config)
-			.setNumTaskManagers(NUMBER_OF_TMS)
-			.setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
-			.build();
-
-		flink = new MiniCluster(miniClusterConfiguration);
-
-		flink.start();
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (flink != null) {
-			flink.close();
-		}
+		return config;
 	}
 
 	/**
@@ -111,7 +100,7 @@ public void testPartialConsumePipelinedResultReceiver() throws Exception {
 		sender.setSlotSharingGroup(slotSharingGroup);
 		receiver.setSlotSharingGroup(slotSharingGroup);
 
-		flink.executeJobBlocking(jobGraph);
+		MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph);
 	}
 
 	// ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 7782a8e2feb..9b16e86c422 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -20,7 +20,6 @@
 
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
@@ -30,14 +29,13 @@
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.BitSet;
@@ -56,30 +54,19 @@
 
 	public static final String JOB_NAME = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";
 
-	private static MiniCluster flink;
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getFlinkConfiguration())
+			.setNumberTaskManagers(NUMBER_OF_TMS)
+			.setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
+			.build());
 
-	@BeforeClass
-	public static void setUp() throws Exception {
+	private static Configuration getFlinkConfiguration() {
 		final Configuration config = new Configuration();
-		config.setInteger(RestOptions.PORT, 0);
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 
-		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-			.setConfiguration(config)
-			.setNumTaskManagers(NUMBER_OF_TMS)
-			.setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
-			.build();
-
-		flink = new MiniCluster(miniClusterConfiguration);
-
-		flink.start();
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (flink != null) {
-			flink.close();
-		}
+		return config;
 	}
 
 	@Test
@@ -106,7 +93,7 @@ public void testNoSlotSharingAndBlockingResultBoth() throws Exception {
 	// ---------------------------------------------------------------------------------------------
 
 	private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException, InterruptedException {
-		flink.executeJobBlocking(jobGraph);
+		MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph);
 	}
 
 	private JobGraph createTestJobGraph(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index eab4eabd6aa..f382f041006 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -20,7 +20,6 @@
 
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -29,16 +28,15 @@
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.List;
@@ -55,30 +53,19 @@
 	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
 	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
 
-	private static MiniCluster flink;
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getFlinkConfiguration())
+			.setNumberTaskManagers(NUMBER_OF_TMS)
+			.setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
+			.build());
 
-	@BeforeClass
-	public static void setUp() throws Exception {
+	private static Configuration getFlinkConfiguration() {
 		final Configuration config = new Configuration();
-		config.setInteger(RestOptions.PORT, 0);
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 
-		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-			.setConfiguration(config)
-			.setNumTaskManagers(NUMBER_OF_TMS)
-			.setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
-			.build();
-
-		flink = new MiniCluster(miniClusterConfiguration);
-
-		flink.start();
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (flink != null) {
-			flink.close();
-		}
+		return config;
 	}
 
 	/**
@@ -137,7 +124,7 @@ public void testMixedPipelinedAndBlockingResults() throws Exception {
 				pipelinedReceiver,
 				blockingReceiver);
 
-		flink.executeJobBlocking(jobGraph);
+		MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph);
 	}
 
 	// ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 561b81beb02..14aecc700ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -35,11 +35,13 @@
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.time.Duration;
@@ -61,6 +63,19 @@
 	private static volatile Thread ASYNC_PRODUCER_THREAD;
 	private static volatile Thread ASYNC_CONSUMER_THREAD;
 
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getFlinkConfiguration())
+			.build());
+
+	private static Configuration getFlinkConfiguration() {
+		Configuration config = new Configuration();
+		config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
+		return config;
+	}
+
 	/**
 	 * Tests that a task waiting on an async producer/consumer that is stuck
 	 * in a blocking buffer request can be properly cancelled.
@@ -73,105 +88,92 @@
 	public void testCancelAsyncProducerAndConsumer() throws Exception {
 		Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
 
-		// Cluster
-		Configuration config = new Configuration();
-		config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
+		// Job with async producer and consumer
+		JobVertex producer = new JobVertex("AsyncProducer");
+		producer.setParallelism(1);
+		producer.setInvokableClass(AsyncProducer.class);
 
-		MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-			.setConfiguration(config)
-			.setNumTaskManagers(1)
-			.setNumSlotsPerTaskManager(1)
-			.build();
-
-		try (MiniCluster flink = new MiniCluster(miniClusterConfiguration)) {
-			flink.start();
-
-			// Job with async producer and consumer
-			JobVertex producer = new JobVertex("AsyncProducer");
-			producer.setParallelism(1);
-			producer.setInvokableClass(AsyncProducer.class);
-
-			JobVertex consumer = new JobVertex("AsyncConsumer");
-			consumer.setParallelism(1);
-			consumer.setInvokableClass(AsyncConsumer.class);
-			consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-
-			SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID());
-			producer.setSlotSharingGroup(slot);
-			consumer.setSlotSharingGroup(slot);
-
-			JobGraph jobGraph = new JobGraph(producer, consumer);
-
-			// Submit job and wait until running
-			flink.runDetached(jobGraph);
-
-			FutureUtils.retrySuccesfulWithDelay(
-				() -> flink.getJobStatus(jobGraph.getJobID()),
-				Time.milliseconds(10),
-				deadline,
-				status -> status == JobStatus.RUNNING,
-				TestingUtils.defaultScheduledExecutor()
-			).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-			boolean producerBlocked = false;
-			for (int i = 0; i < 50; i++) {
-				Thread thread = ASYNC_PRODUCER_THREAD;
-
-				if (thread != null && thread.isAlive()) {
-					StackTraceElement[] stackTrace = thread.getStackTrace();
-					producerBlocked = isInBlockingBufferRequest(stackTrace);
-				}
+		JobVertex consumer = new JobVertex("AsyncConsumer");
+		consumer.setParallelism(1);
+		consumer.setInvokableClass(AsyncConsumer.class);
+		consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
-				if (producerBlocked) {
-					break;
-				} else {
-					// Retry
-					Thread.sleep(500L);
-				}
-			}
+		SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID());
+		producer.setSlotSharingGroup(slot);
+		consumer.setSlotSharingGroup(slot);
 
-			// Verify that async producer is in blocking request
-			assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace()), producerBlocked);
+		JobGraph jobGraph = new JobGraph(producer, consumer);
 
-			boolean consumerWaiting = false;
-			for (int i = 0; i < 50; i++) {
-				Thread thread = ASYNC_CONSUMER_THREAD;
+		final MiniCluster flink = MINI_CLUSTER_RESOURCE.getMiniCluster();
 
-				if (thread != null && thread.isAlive()) {
-					consumerWaiting = thread.getState() == Thread.State.WAITING;
-				}
+		// Submit job and wait until running
+		flink.runDetached(jobGraph);
 
-				if (consumerWaiting) {
-					break;
-				} else {
-					// Retry
-					Thread.sleep(500L);
-				}
+		FutureUtils.retrySuccesfulWithDelay(
+			() -> flink.getJobStatus(jobGraph.getJobID()),
+			Time.milliseconds(10),
+			deadline,
+			status -> status == JobStatus.RUNNING,
+			TestingUtils.defaultScheduledExecutor()
+		).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+		boolean producerBlocked = false;
+		for (int i = 0; i < 50; i++) {
+			Thread thread = ASYNC_PRODUCER_THREAD;
+
+			if (thread != null && thread.isAlive()) {
+				StackTraceElement[] stackTrace = thread.getStackTrace();
+				producerBlocked = isInBlockingBufferRequest(stackTrace);
 			}
 
-			// Verify that async consumer is in blocking request
-			assertTrue("Consumer thread is not blocked.", consumerWaiting);
+			if (producerBlocked) {
+				break;
+			} else {
+				// Retry
+				Thread.sleep(500L);
+			}
+		}
 
-			flink.cancelJob(jobGraph.getJobID())
-				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		// Verify that async producer is in blocking request
+		assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace()), producerBlocked);
 
-			// wait until the job is canceled
-			FutureUtils.retrySuccesfulWithDelay(
-				() -> flink.getJobStatus(jobGraph.getJobID()),
-				Time.milliseconds(10),
-				deadline,
-				status -> status == JobStatus.CANCELED,
-				TestingUtils.defaultScheduledExecutor()
-			).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		boolean consumerWaiting = false;
+		for (int i = 0; i < 50; i++) {
+			Thread thread = ASYNC_CONSUMER_THREAD;
 
-			// Verify the expected Exceptions
-			assertNotNull(ASYNC_PRODUCER_EXCEPTION);
-			assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
+			if (thread != null && thread.isAlive()) {
+				consumerWaiting = thread.getState() == Thread.State.WAITING;
+			}
 
-			assertNotNull(ASYNC_CONSUMER_EXCEPTION);
-			assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
+			if (consumerWaiting) {
+				break;
+			} else {
+				// Retry
+				Thread.sleep(500L);
+			}
 		}
+
+		// Verify that async consumer is in blocking request
+		assertTrue("Consumer thread is not blocked.", consumerWaiting);
+
+		flink.cancelJob(jobGraph.getJobID())
+			.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+		// wait until the job is canceled
+		FutureUtils.retrySuccesfulWithDelay(
+			() -> flink.getJobStatus(jobGraph.getJobID()),
+			Time.milliseconds(10),
+			deadline,
+			status -> status == JobStatus.CANCELED,
+			TestingUtils.defaultScheduledExecutor()
+		).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+		// Verify the expected Exceptions
+		assertNotNull(ASYNC_PRODUCER_EXCEPTION);
+		assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
+
+		assertNotNull(ASYNC_CONSUMER_EXCEPTION);
+		assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
new file mode 100644
index 00000000000..5cd1a50176d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Resource which starts a {@link MiniCluster} for testing purposes.
+ */
+public class MiniClusterResource extends ExternalResource {
+
+	private static final String DEFAULT_MANAGED_MEMORY_SIZE = "80m";
+
+	protected final Logger log = LoggerFactory.getLogger(getClass());
+
+	private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
+
+	private MiniCluster miniCluster = null;
+
+	private int numberSlots = -1;
+
+	private UnmodifiableConfiguration restClusterClientConfig;
+
+	public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
+	}
+
+	public int getNumberSlots() {
+		return numberSlots;
+	}
+
+	public MiniCluster getMiniCluster() {
+		return miniCluster;
+	}
+
+	public UnmodifiableConfiguration getClientConfiguration() {
+		return restClusterClientConfig;
+	}
+
+	public URI getRestAddres() {
+		return miniCluster.getRestAddress();
+	}
+
+	@Override
+	public void before() throws Exception {
+		temporaryFolder.create();
+
+		startMiniCluster();
+
+		numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();
+	}
+
+	@Override
+	public void after() {
+		temporaryFolder.delete();
+
+		Exception exception = null;
+
+		if (miniCluster != null) {
+			final CompletableFuture<?> terminationFuture = miniCluster.closeAsync();
+
+			try {
+				terminationFuture.get(
+					miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(),
+					TimeUnit.MILLISECONDS);
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			miniCluster = null;
+		}
+
+		if (exception != null) {
+			log.warn("Could not properly shut down the MiniClusterResource.", exception);
+		}
+	}
+
+	private void startMiniCluster() throws Exception {
+		final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
+		configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath());
+
+		// we need to set this since a lot of test expect this because TestBaseUtils.startCluster()
+		// enabled this by default
+		if (!configuration.contains(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE)) {
+			configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
+		}
+
+		if (!configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
+			configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, DEFAULT_MANAGED_MEMORY_SIZE);
+		}
+
+		// set rest and rpc port to 0 to avoid clashes with concurrent MiniClusters
+		configuration.setInteger(JobManagerOptions.PORT, 0);
+		configuration.setInteger(RestOptions.PORT, 0);
+
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(configuration)
+			.setNumTaskManagers(miniClusterResourceConfiguration.getNumberTaskManagers())
+			.setNumSlotsPerTaskManager(miniClusterResourceConfiguration.getNumberSlotsPerTaskManager())
+			.build();
+
+		miniCluster = new MiniCluster(miniClusterConfiguration);
+
+		miniCluster.start();
+
+		final URI restAddress = miniCluster.getRestAddress();
+		createClientConfiguration(restAddress);
+	}
+
+	private void createClientConfiguration(URI restAddress) {
+		Configuration restClientConfig = new Configuration();
+		restClientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost());
+		restClientConfig.setInteger(RestOptions.PORT, restAddress.getPort());
+		this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
new file mode 100644
index 00000000000..d54996d0fa0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Mini cluster resource configuration object.
+ */
+public class MiniClusterResourceConfiguration {
+
+	private final UnmodifiableConfiguration configuration;
+
+	private final int numberTaskManagers;
+
+	private final int numberSlotsPerTaskManager;
+
+	private final Time shutdownTimeout;
+
+	private final RpcServiceSharing rpcServiceSharing;
+
+	protected MiniClusterResourceConfiguration(
+		Configuration configuration,
+		int numberTaskManagers,
+		int numberSlotsPerTaskManager,
+		Time shutdownTimeout,
+		RpcServiceSharing rpcServiceSharing) {
+		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
+		this.numberTaskManagers = numberTaskManagers;
+		this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
+		this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
+		this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
+	}
+
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
+	public int getNumberTaskManagers() {
+		return numberTaskManagers;
+	}
+
+	public int getNumberSlotsPerTaskManager() {
+		return numberSlotsPerTaskManager;
+	}
+
+	public Time getShutdownTimeout() {
+		return shutdownTimeout;
+	}
+
+	public RpcServiceSharing getRpcServiceSharing() {
+		return rpcServiceSharing;
+	}
+
+	/**
+	 * Builder for {@link MiniClusterResourceConfiguration}.
+	 */
+	public static final class Builder {
+
+		private Configuration configuration = new Configuration();
+		private int numberTaskManagers = 1;
+		private int numberSlotsPerTaskManager = 1;
+		private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration);
+
+		private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
+
+		public Builder setConfiguration(Configuration configuration) {
+			this.configuration = configuration;
+			return this;
+		}
+
+		public Builder setNumberTaskManagers(int numberTaskManagers) {
+			this.numberTaskManagers = numberTaskManagers;
+			return this;
+		}
+
+		public Builder setNumberSlotsPerTaskManager(int numberSlotsPerTaskManager) {
+			this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
+			return this;
+		}
+
+		public Builder setShutdownTimeout(Time shutdownTimeout) {
+			this.shutdownTimeout = shutdownTimeout;
+			return this;
+		}
+
+		public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) {
+			this.rpcServiceSharing = rpcServiceSharing;
+			return this;
+		}
+
+		public MiniClusterResourceConfiguration build() {
+			return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
+		}
+	}
+}
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 731bbf6b288..337e4fb9be9 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -20,9 +20,11 @@ package org.apache.flink.api.scala
 
 import java.io._
 
-import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
+import org.apache.flink.runtime.minicluster.MiniCluster
+import org.apache.flink.runtime.testutils.{MiniClusterResource, MiniClusterResourceConfiguration}
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
 import org.apache.flink.util.TestLogger
 import org.junit._
 import org.junit.rules.TemporaryFolder
@@ -274,16 +276,15 @@ class ScalaShellITCase extends TestLogger {
     val dir = temporaryFolder.newFolder()
     BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
 
-    val args = cluster match {
-      case Some(_) =>
-        Array(
-          "remote",
-          hostname,
-          Integer.toString(port),
-          "--configDir",
-          dir.getAbsolutePath)
-      case None => throw new IllegalStateException("Cluster has not been started.")
-    }
+    val port: Int = clusterResource.getRestAddres.getPort
+    val hostname : String = clusterResource.getRestAddres.getHost
+
+    val args = Array(
+      "remote",
+      hostname,
+      Integer.toString(port),
+      "--configDir",
+      dir.getAbsolutePath)
 
     //start scala shell with initialized
     // buffered reader for testing
@@ -313,35 +314,19 @@ object ScalaShellITCase {
   val configuration = new Configuration()
   var cluster: Option[MiniCluster] = None
 
-  var port: Int = _
-  var hostname : String = _
   val parallelism: Int = 4
 
-  @BeforeClass
-  def beforeAll(): Unit = {
-    // set to different than default so not to interfere with ScalaShellLocalStartupITCase
-    configuration.setInteger(RestOptions.PORT, 8082)
-    val miniConfig = new MiniClusterConfiguration.Builder()
-      .setConfiguration(configuration)
-      .setNumSlotsPerTaskManager(parallelism)
-      .build()
-
-    val miniCluster = new MiniCluster(miniConfig)
-    miniCluster.start()
-    port = miniCluster.getRestAddress.getPort
-    hostname = miniCluster.getRestAddress.getHost
-
-    cluster = Some(miniCluster)
-  }
+  val _clusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder()
+      .setNumberSlotsPerTaskManager(parallelism)
+      .build())
+
+  @ClassRule
+  def clusterResource = _clusterResource
 
   @AfterClass
   def afterAll(): Unit = {
     // The Scala interpreter somehow changes the class loader. Therefore, we have to reset it
     Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
-
-    cluster.foreach {
-      miniCluster => miniCluster.close()
-    }
   }
 
   /**
@@ -358,45 +343,44 @@ object ScalaShellITCase {
     val oldOut = System.out
     System.setOut(new PrintStream(baos))
 
-    cluster match {
-      case Some(_) =>
-        val repl = externalJars match {
-          case Some(ej) => new FlinkILoop(
-            hostname,
-            port,
-            configuration,
-            Option(Array(ej)),
-            in, new PrintWriter(out))
-
-          case None => new FlinkILoop(
-            hostname,
-            port,
-            configuration,
-            in, new PrintWriter(out))
-        }
+    val port: Int = clusterResource.getRestAddres.getPort
+    val hostname : String = clusterResource.getRestAddres.getHost
 
-        repl.settings = new Settings()
+      val repl = externalJars match {
+        case Some(ej) => new FlinkILoop(
+          hostname,
+          port,
+          configuration,
+          Option(Array(ej)),
+          in, new PrintWriter(out))
 
-        // enable this line to use scala in intellij
-        repl.settings.usejavacp.value = true
+        case None => new FlinkILoop(
+          hostname,
+          port,
+          configuration,
+          in, new PrintWriter(out))
+      }
 
-        externalJars match {
-          case Some(ej) => repl.settings.classpath.value = ej
-          case None =>
-        }
+      repl.settings = new Settings()
+
+      // enable this line to use scala in intellij
+      repl.settings.usejavacp.value = true
+
+      externalJars match {
+        case Some(ej) => repl.settings.classpath.value = ej
+        case None =>
+      }
 
-        repl.process(repl.settings)
+      repl.process(repl.settings)
 
-        repl.closeInterpreter()
+      repl.closeInterpreter()
 
-        System.setOut(oldOut)
+      System.setOut(oldOut)
 
-        baos.flush()
+      baos.flush()
 
-        val stdout = baos.toString
+      val stdout = baos.toString
 
-        out.toString + stdout
-      case _ => throw new IllegalStateException("The cluster has not been started.")
-    }
+      out.toString + stdout
   }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
index 60ee66f4246..d9a257c9cb5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
@@ -21,14 +21,14 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.Iterator;
@@ -38,30 +38,12 @@
  */
 public class RemoteStreamExecutionEnvironmentTest extends TestLogger {
 
-	private static MiniCluster flink;
-
-	@BeforeClass
-	public static void setUp() throws Exception {
-		final Configuration config = new Configuration();
-		config.setInteger(RestOptions.PORT, 0);
-
-		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-			.setConfiguration(config)
-			.setNumTaskManagers(1)
-			.setNumSlotsPerTaskManager(1)
-			.build();
-
-		flink = new MiniCluster(miniClusterConfiguration);
-
-		flink.start();
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (flink != null) {
-			flink.close();
-		}
-	}
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(1)
+			.build());
 
 	/**
 	 * Verifies that the port passed to the RemoteStreamEnvironment is used for connecting to the cluster.
@@ -71,9 +53,10 @@ public void testPortForwarding() throws Exception {
 		final Configuration clientConfiguration = new Configuration();
 		clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
 
+		final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-			flink.getRestAddress().getHost(),
-			flink.getRestAddress().getPort(),
+			miniCluster.getRestAddress().getHost(),
+			miniCluster.getRestAddress().getPort(),
 			clientConfiguration);
 
 		final DataStream<Integer> resultStream = env.fromElements(1)
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 6a8e01700b2..6c32a436bab 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -50,6 +50,14 @@ under the License.
 			<scope>compile</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>compile</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-clients_${scala.binary.version}</artifactId>
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 0b7a3b3f93f..3ac2104b7b2 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.util;
 
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
@@ -58,7 +59,7 @@
 	private static final int DEFAULT_PARALLELISM = 4;
 
 	@ClassRule
-	public static MiniClusterResource miniClusterResource = new MiniClusterResource(
+	public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(1)
 			.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 9140bb4eff5..cbd36e4edde 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -18,166 +18,16 @@
 
 package org.apache.flink.test.util;
 
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.MiniClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.junit.rules.ExternalResource;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
 /**
- * Starts a Flink mini cluster as a resource and registers the respective
- * ExecutionEnvironment and StreamExecutionEnvironment.
+ * Mirror of the {@link MiniClusterWithClientResource} to avoid breaking
+ * changes when splitting up the original MiniClusterResource implementation
+ * with FLINK-10637.
+ *
+ * @deprecated This class should be replaced with {@link MiniClusterWithClientResource}.
  */
-public class MiniClusterResource extends ExternalResource {
-
-	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
-
-	private final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
-
-	private JobExecutorService jobExecutorService;
-
-	private ClusterClient<?> clusterClient;
-
-	private Configuration restClusterClientConfig;
-
-	private int numberSlots = -1;
-
-	private TestEnvironment executionEnvironment;
-
-	private int webUIPort = -1;
-
-	public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
-		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
-	}
-
-	public int getNumberSlots() {
-		return numberSlots;
-	}
-
-	public ClusterClient<?> getClusterClient() {
-		return clusterClient;
-	}
-
-	public Configuration getClientConfiguration() {
-		return restClusterClientConfig;
-	}
-
-	public TestEnvironment getTestEnvironment() {
-		return executionEnvironment;
-	}
-
-	public int getWebUIPort() {
-		return webUIPort;
-	}
-
-	@Override
-	public void before() throws Exception {
-		temporaryFolder.create();
-
-		startMiniCluster();
-
-		numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();
-
-		executionEnvironment = new TestEnvironment(jobExecutorService, numberSlots, false);
-		executionEnvironment.setAsContext();
-		TestStreamEnvironment.setAsContext(jobExecutorService, numberSlots);
-	}
-
-	@Override
-	public void after() {
-		temporaryFolder.delete();
-
-		TestStreamEnvironment.unsetAsContext();
-		TestEnvironment.unsetAsContext();
-
-		Exception exception = null;
-
-		if (clusterClient != null) {
-			try {
-				clusterClient.shutdown();
-			} catch (Exception e) {
-				exception = e;
-			}
-		}
-
-		clusterClient = null;
-
-		if (jobExecutorService != null) {
-			final CompletableFuture<?> terminationFuture = jobExecutorService.closeAsync();
-
-			try {
-				terminationFuture.get(
-					miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(),
-					TimeUnit.MILLISECONDS);
-			} catch (Exception e) {
-				exception = ExceptionUtils.firstOrSuppressed(e, exception);
-			}
-
-			jobExecutorService = null;
-		}
-
-		if (exception != null) {
-			LOG.warn("Could not properly shut down the MiniClusterResource.", exception);
-		}
-	}
-
-	private void startMiniCluster() throws Exception {
-		final Configuration configuration = miniClusterResourceConfiguration.getConfiguration();
-		configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath());
-
-		// we need to set this since a lot of test expect this because TestBaseUtils.startCluster()
-		// enabled this by default
-		if (!configuration.contains(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE)) {
-			configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
-		}
-
-		if (!configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-			configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, TestBaseUtils.TASK_MANAGER_MEMORY_SIZE);
-		}
-
-		// set rest port to 0 to avoid clashes with concurrent MiniClusters
-		configuration.setInteger(RestOptions.PORT, 0);
-
-		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-			.setConfiguration(configuration)
-			.setNumTaskManagers(miniClusterResourceConfiguration.getNumberTaskManagers())
-			.setNumSlotsPerTaskManager(miniClusterResourceConfiguration.getNumberSlotsPerTaskManager())
-			.build();
-
-		final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
-
-		miniCluster.start();
-
-		// update the port of the rest endpoint
-		configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
-
-		jobExecutorService = miniCluster;
-		clusterClient = new MiniClusterClient(configuration, miniCluster);
-
-		Configuration restClientConfig = new Configuration();
-		restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost());
-		restClientConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
-		this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
-
-		webUIPort = miniCluster.getRestAddress().getPort();
+@Deprecated
+public class MiniClusterResource extends MiniClusterWithClientResource {
+	public MiniClusterResource(MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+		super(miniClusterResourceConfiguration);
 	}
 }
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
index bd521a2f202..8b5167a4df7 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -22,58 +22,27 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.util.Preconditions;
 
 /**
- * Mini cluster resource configuration object.
+ * Mirror of {@link org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration} which has been
+ * introduced to avoid breaking changes with FLINK-10637.
+ *
+ * @deprecated This class should be replaced with {@link org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration}.
  */
-public class MiniClusterResourceConfiguration {
-
-	private final Configuration configuration;
-
-	private final int numberTaskManagers;
-
-	private final int numberSlotsPerTaskManager;
-
-	private final Time shutdownTimeout;
-
-	private final RpcServiceSharing rpcServiceSharing;
+@Deprecated
+public class MiniClusterResourceConfiguration extends org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration {
 
 	MiniClusterResourceConfiguration(
-		Configuration configuration,
-		int numberTaskManagers,
-		int numberSlotsPerTaskManager,
-		Time shutdownTimeout,
-		RpcServiceSharing rpcServiceSharing) {
-		this.configuration = Preconditions.checkNotNull(configuration);
-		this.numberTaskManagers = numberTaskManagers;
-		this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
-		this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
-		this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
-	}
-
-	public Configuration getConfiguration() {
-		return configuration;
-	}
-
-	public int getNumberTaskManagers() {
-		return numberTaskManagers;
-	}
-
-	public int getNumberSlotsPerTaskManager() {
-		return numberSlotsPerTaskManager;
-	}
-
-	public Time getShutdownTimeout() {
-		return shutdownTimeout;
-	}
-
-	public RpcServiceSharing getRpcServiceSharing() {
-		return rpcServiceSharing;
+			Configuration configuration,
+			int numberTaskManagers,
+			int numberSlotsPerTaskManager,
+			Time shutdownTimeout,
+			RpcServiceSharing rpcServiceSharing) {
+		super(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
 	}
 
 	/**
-	 * Builder for {@link MiniClusterResourceConfiguration}.
+	 * Builder for {@link org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration}.
 	 */
 	public static final class Builder {
 
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
new file mode 100644
index 00000000000..594959b9156
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+
+/**
+ * Starts a Flink mini cluster as a resource and registers the respective
+ * ExecutionEnvironment and StreamExecutionEnvironment.
+ */
+public class MiniClusterWithClientResource extends MiniClusterResource {
+
+	private ClusterClient<?> clusterClient;
+
+	private TestEnvironment executionEnvironment;
+
+	public MiniClusterWithClientResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+		super(miniClusterResourceConfiguration);
+	}
+
+	public ClusterClient<?> getClusterClient() {
+		return clusterClient;
+	}
+
+	public TestEnvironment getTestEnvironment() {
+		return executionEnvironment;
+	}
+
+	@Override
+	public void before() throws Exception {
+		super.before();
+
+		clusterClient = createMiniClusterClient();
+
+		executionEnvironment = new TestEnvironment(getMiniCluster(), getNumberSlots(), false);
+		executionEnvironment.setAsContext();
+		TestStreamEnvironment.setAsContext(getMiniCluster(), getNumberSlots());
+	}
+
+	@Override
+	public void after() {
+		TestStreamEnvironment.unsetAsContext();
+		TestEnvironment.unsetAsContext();
+
+		Exception exception = null;
+
+		if (clusterClient != null) {
+			try {
+				clusterClient.shutdown();
+			} catch (Exception e) {
+				exception = e;
+			}
+		}
+
+		clusterClient = null;
+
+		super.after();
+
+		if (exception != null) {
+			log.warn("Could not properly shut down the MiniClusterWithClientResource.", exception);
+		}
+	}
+
+	private MiniClusterClient createMiniClusterClient() {
+		return new MiniClusterClient(getClientConfiguration(), getMiniCluster());
+	}
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
index bd70cadda0f..e890216e7e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
@@ -23,11 +23,11 @@
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
@@ -56,7 +56,7 @@
 	private CheckedThread jobExecuteThread;
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(1)
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
index 6d9a7b03703..b875d97f578 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
@@ -22,8 +22,9 @@
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.AbstractReporter;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -43,7 +44,7 @@
 /**
  * Integration tests for proper initialization of the system resource metrics.
  */
-public class SystemResourcesMetricsITCase {
+public class SystemResourcesMetricsITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index 1ac75311d7f..1b8ac05f388 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -28,8 +28,8 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -50,7 +50,7 @@
 	private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators";
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(2)
 			.setNumberSlotsPerTaskManager(3)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 2c12e441313..0ead861adba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -39,10 +39,10 @@
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -84,7 +84,7 @@
 	}
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(1)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index a56089186b2..0d29568c2a0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -32,8 +32,8 @@
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -56,7 +56,7 @@
 	// --------------------------------------------------------------------------------------------
 
 	@ClassRule
-	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+	public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(2)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 5dc2aa0e87f..061d7e0580c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -26,6 +26,7 @@
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
@@ -34,8 +35,7 @@
 import org.apache.flink.test.checkpointing.utils.FailingSource;
 import org.apache.flink.test.checkpointing.utils.IntType;
 import org.apache.flink.test.checkpointing.utils.ValidatingSink;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -58,7 +58,7 @@
 	private static final int PARALLELISM = 4;
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(2)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 4fa90206a75..1c04d270b9c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -35,6 +35,7 @@
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -45,8 +46,7 @@
 import org.apache.flink.test.checkpointing.utils.FailingSource;
 import org.apache.flink.test.checkpointing.utils.IntType;
 import org.apache.flink.test.checkpointing.utils.ValidatingSink;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -91,7 +91,7 @@
 
 	private TestingServer zkServer;
 
-	public MiniClusterResource miniClusterResource;
+	public MiniClusterWithClientResource miniClusterResource;
 
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();
@@ -117,8 +117,8 @@ protected StateBackendEnum getStateBackend() {
 		return this.stateBackendEnum;
 	}
 
-	protected final MiniClusterResource getMiniClusterResource() {
-		return new MiniClusterResource(
+	protected final MiniClusterWithClientResource getMiniClusterResource() {
+		return new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(getConfigurationSafe())
 				.setNumberTaskManagers(2)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index 989024cffb7..957cd2b48d4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -31,13 +31,13 @@
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
@@ -79,7 +79,7 @@
 	// ------------------------------------------------------------------------
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(NUM_TASK_MANAGERS)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 3a3eadaea5b..8f788cbfbcd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -43,6 +43,7 @@
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -51,8 +52,7 @@
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -105,7 +105,7 @@
 		NON_PARTITIONED, CHECKPOINTED_FUNCTION, CHECKPOINTED_FUNCTION_BROADCAST, LIST_CHECKPOINTED
 	}
 
-	private static MiniClusterResource cluster;
+	private static MiniClusterWithClientResource cluster;
 
 	@ClassRule
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -127,7 +127,7 @@ public void setup() throws Exception {
 			config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
 			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
-			cluster = new MiniClusterResource(
+			cluster = new MiniClusterWithClientResource(
 				new MiniClusterResourceConfiguration.Builder()
 					.setConfiguration(config)
 					.setNumberTaskManagers(numTaskManagers)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 24f408de9a7..0635f239c56 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -30,14 +30,14 @@
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.state.ManualWindowSpeedITCase;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
@@ -263,7 +263,7 @@ private void testExternalizedCheckpoints(
 			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
 		}
 
-		MiniClusterResource cluster = new MiniClusterResource(
+		MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(config)
 				.setNumberTaskManagers(NUM_TASK_MANAGERS)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index a806f026986..2cd2bbb60e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -43,6 +43,7 @@
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -51,8 +52,7 @@
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -142,7 +142,7 @@ private String submitJobAndGetVerifiedSavepoint(MiniClusterResourceFactory clust
 		final JobID jobId = jobGraph.getJobID();
 		StatefulCounter.resetForTest(parallelism);
 
-		MiniClusterResource cluster = clusterFactory.get();
+		MiniClusterWithClientResource cluster = clusterFactory.get();
 		cluster.before();
 		ClusterClient<?> client = cluster.getClusterClient();
 
@@ -184,7 +184,7 @@ private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceF
 		final JobID jobId = jobGraph.getJobID();
 		StatefulCounter.resetForTest(parallelism);
 
-		MiniClusterResource cluster = clusterFactory.get();
+		MiniClusterWithClientResource cluster = clusterFactory.get();
 		cluster.before();
 		ClusterClient<?> client = cluster.getClusterClient();
 
@@ -230,7 +230,7 @@ public void testTriggerSavepointForNonExistingJob() throws Exception {
 		final Configuration config = new Configuration();
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
-		final MiniClusterResource cluster = new MiniClusterResource(
+		final MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(config)
 				.setNumberTaskManagers(numTaskManagers)
@@ -261,7 +261,7 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
 
 		final Configuration config = new Configuration();
 
-		final MiniClusterResource cluster = new MiniClusterResource(
+		final MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(config)
 				.setNumberTaskManagers(numTaskManagers)
@@ -305,7 +305,7 @@ public void testSubmitWithUnknownSavepointPath() throws Exception {
 		final Configuration config = new Configuration();
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
-		MiniClusterResource cluster = new MiniClusterResource(
+		MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(config)
 				.setNumberTaskManagers(numTaskManagers)
@@ -373,7 +373,7 @@ public void testCanRestoreWithModifiedStatelessOperators() throws Exception {
 		LOG.info("Flink configuration: " + config + ".");
 
 		// Start Flink
-		MiniClusterResource cluster = new MiniClusterResource(
+		MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(config)
 				.setNumberTaskManagers(numTaskManagers)
@@ -417,7 +417,7 @@ public void testCanRestoreWithModifiedStatelessOperators() throws Exception {
 
 		// create a new TestingCluster to make sure we start with completely
 		// new resources
-		cluster = new MiniClusterResource(
+		cluster = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(config)
 				.setNumberTaskManagers(numTaskManagers)
@@ -653,7 +653,7 @@ public Integer map(Integer value) throws Exception {
 		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
-		MiniClusterResource cluster = new MiniClusterResource(
+		MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(config)
 				.setNumberTaskManagers(1)
@@ -789,8 +789,8 @@ private MiniClusterResourceFactory(int numTaskManagers, int numSlotsPerTaskManag
 			this.config = config;
 		}
 
-		MiniClusterResource get() {
-			return new MiniClusterResource(
+		MiniClusterWithClientResource get() {
+			return new MiniClusterWithClientResource(
 				new MiniClusterResourceConfiguration.Builder()
 					.setConfiguration(config)
 					.setNumberTaskManagers(numTaskManagers)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 0ed9d6b2195..0ace56f5db6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -19,9 +19,9 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -41,7 +41,7 @@
 	protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(NUM_TASK_MANAGERS)
 			.setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index b0e2967d4b1..8905ecdd386 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -34,8 +35,7 @@
 import org.apache.flink.test.checkpointing.utils.FailingSource;
 import org.apache.flink.test.checkpointing.utils.IntType;
 import org.apache.flink.test.checkpointing.utils.ValidatingSink;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -71,7 +71,7 @@ public WindowCheckpointingITCase(TimeCharacteristic timeCharacteristic) {
 	private static final int PARALLELISM = 4;
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(2)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 642af40aaf2..7c00de7f436 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -44,12 +44,12 @@
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -104,7 +104,7 @@
 
 	private static TestingServer zkServer;
 
-	private static MiniClusterResource miniClusterResource;
+	private static MiniClusterWithClientResource miniClusterResource;
 
 	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
 	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
@@ -131,7 +131,7 @@ public static void setup() throws Exception {
 
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
-		miniClusterResource = new MiniClusterResource(
+		miniClusterResource = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(config)
 				.setNumberTaskManagers(NUM_TMS)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index a5267b59e68..34c20d63760 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -30,9 +30,9 @@
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.OptionalFailure;
 
@@ -72,7 +72,7 @@ public static void before() {
 	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
 
 	@Rule
-	public final MiniClusterResource miniClusterResource;
+	public final MiniClusterWithClientResource miniClusterResource;
 
 	private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
 	private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
@@ -88,7 +88,7 @@ protected static String getResourceFilename(String filename) {
 	}
 
 	protected SavepointMigrationTestBase() throws Exception {
-		miniClusterResource = new MiniClusterResource(
+		miniClusterResource = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
 				.setConfiguration(getConfiguration())
 				.setNumberTaskManagers(1)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 4851e54c2ee..23503d7e370 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -30,8 +30,8 @@
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.SuccessException;
@@ -44,6 +44,7 @@
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -90,18 +91,18 @@
 	private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
 
 
-	private static final TemporaryFolder FOLDER = new TemporaryFolder();
+	@ClassRule
+	public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
 
-	private static MiniCluster testCluster;
+	private static MiniClusterResource miniClusterResource = null;
 
 	private static final int parallelism = 4;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
-		FOLDER.create();
 
 		Configuration config = new Configuration();
 
@@ -117,26 +118,25 @@ public static void setUp() throws Exception {
 		// required as we otherwise run out of memory
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
 
-		testCluster = new MiniCluster(
-			new MiniClusterConfiguration.Builder()
-				.setNumTaskManagers(2)
-				.setNumSlotsPerTaskManager(2)
+		miniClusterResource = new MiniClusterResource(
+			new MiniClusterResourceConfiguration.Builder()
+				.setNumberTaskManagers(2)
+				.setNumberSlotsPerTaskManager(2)
 				.setConfiguration(config)
-			.build()
-		);
-		testCluster.start();
+				.build());
+
+		miniClusterResource.before();
 	}
 
 	@AfterClass
-	public static void tearDownClass() throws Exception {
-		if (testCluster != null) {
-			testCluster.close();
+	public static void tearDownClass() {
+		if (miniClusterResource != null) {
+			miniClusterResource.after();
 		}
-		FOLDER.delete();
 	}
 
 	@After
-	public void tearDown() throws Exception {
+	public void tearDown() {
 		TestStreamEnvironment.unsetAsContext();
 		TestEnvironment.unsetAsContext();
 	}
@@ -147,7 +147,7 @@ public void testCustomSplitJobWithCustomClassLoaderJar() throws IOException, Pro
 		PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
 
 		TestEnvironment.setAsContext(
-			testCluster,
+			miniClusterResource.getMiniCluster(),
 			parallelism,
 			Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
 			Collections.<URL>emptyList());
@@ -160,7 +160,7 @@ public void testStreamingCustomSplitJobWithCustomClassLoader() throws IOExceptio
 		PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
 
 		TestStreamEnvironment.setAsContext(
-			testCluster,
+			miniClusterResource.getMiniCluster(),
 			parallelism,
 			Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
 			Collections.<URL>emptyList());
@@ -174,7 +174,7 @@ public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, Pr
 		PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
 
 		TestEnvironment.setAsContext(
-			testCluster,
+			miniClusterResource.getMiniCluster(),
 			parallelism,
 			Collections.<Path>emptyList(),
 			Collections.singleton(classpath));
@@ -188,7 +188,7 @@ public void testStreamingClassloaderJobWithCustomClassLoader() throws IOExceptio
 		PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
 
 		TestStreamEnvironment.setAsContext(
-			testCluster,
+			miniClusterResource.getMiniCluster(),
 			parallelism,
 			Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
 			Collections.<URL>emptyList());
@@ -203,7 +203,7 @@ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw
 		PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
 
 		TestStreamEnvironment.setAsContext(
-			testCluster,
+			miniClusterResource.getMiniCluster(),
 			parallelism,
 			Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
 			Collections.<URL>emptyList());
@@ -242,7 +242,7 @@ public void testKMeansJobWithCustomClassLoader() throws IOException, ProgramInvo
 			});
 
 		TestEnvironment.setAsContext(
-			testCluster,
+			miniClusterResource.getMiniCluster(),
 			parallelism,
 			Collections.singleton(new Path(KMEANS_JAR_PATH)),
 			Collections.<URL>emptyList());
@@ -255,7 +255,7 @@ public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, Progr
 		PackagedProgram userCodeTypeProg = new PackagedProgram(new File(USERCODETYPE_JAR_PATH));
 
 		TestEnvironment.setAsContext(
-			testCluster,
+			miniClusterResource.getMiniCluster(),
 			parallelism,
 			Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
 			Collections.<URL>emptyList());
@@ -276,7 +276,7 @@ public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOEx
 			});
 
 		TestStreamEnvironment.setAsContext(
-			testCluster,
+			miniClusterResource.getMiniCluster(),
 			parallelism,
 			Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
 			Collections.<URL>emptyList());
@@ -292,7 +292,7 @@ public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOEx
 	 */
 	@Test
 	public void testDisposeSavepointWithCustomKvState() throws Exception {
-		ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), testCluster);
+		ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), miniClusterResource.getMiniCluster());
 
 		Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
 
@@ -309,7 +309,7 @@ public void testDisposeSavepointWithCustomKvState() throws Exception {
 				});
 
 		TestStreamEnvironment.setAsContext(
-			testCluster,
+			miniClusterResource.getMiniCluster(),
 			parallelism,
 			Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
 			Collections.<URL>emptyList()
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 56df46e49ea..fff77e777d2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -29,8 +29,8 @@
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index 9a96297933a..3cd7674037c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -26,8 +26,8 @@
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -55,7 +55,7 @@
 	private static final int NUM_SLOTS = 20;
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(NUM_TM)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 131b6a00eeb..73e4ae946a6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -22,12 +22,12 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import static org.junit.Assert.fail;
 
@@ -45,7 +45,7 @@ public static void main(String[] args) throws Exception {
 		final int slotsPerTaskManager = 80;
 		final int parallelism = taskManagers * slotsPerTaskManager;
 
-		MiniClusterResource cluster = null;
+		MiniClusterWithClientResource cluster = null;
 
 		try {
 			Configuration config = new Configuration();
@@ -55,7 +55,7 @@ public static void main(String[] args) throws Exception {
 			config.setInteger("taskmanager.net.server.numThreads", 1);
 			config.setInteger("taskmanager.net.client.numThreads", 1);
 
-			cluster = new MiniClusterResource(
+			cluster = new MiniClusterWithClientResource(
 				new MiniClusterResourceConfiguration.Builder()
 					.setConfiguration(config)
 					.setNumberTaskManagers(taskManagers)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index f62ccf7374d..03376ece200 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -26,8 +26,8 @@
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -53,7 +53,7 @@
 	private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(NUM_TM)
 			.setNumberSlotsPerTaskManager(SLOTS_PER_TM)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 07d146d54cf..e9b0d753320 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -27,8 +27,8 @@
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.TestLogger;
 
@@ -50,7 +50,7 @@
 	private static final int PARLLELISM = 5;
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(1)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 6b3371dd4ff..77f3d4981c6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -28,8 +28,8 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -55,7 +55,7 @@
 public class MiscellaneousIssuesITCase extends TestLogger {
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(2)
 			.setNumberSlotsPerTaskManager(3)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index ca6cb14a757..d53c052dc90 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -32,8 +32,8 @@
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
@@ -52,7 +52,7 @@
 	private static final int PARALLELISM = 16;
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(2)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
index a20a0d0c922..1683dfc369f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
@@ -29,9 +29,9 @@
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.operators.util.CollectionDataSets;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -49,7 +49,7 @@
 public class CustomDistributionITCase extends TestLogger {
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(1)
 			.setNumberSlotsPerTaskManager(8)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index 451108b8627..b8b897db3c1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -24,16 +24,13 @@
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -52,44 +49,13 @@
 
 	private static final int USER_DOP = 2;
 
-	private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
-
 	private static final String VALID_STARTUP_TIMEOUT = "100 s";
 
-	private static Configuration configuration;
-
-	private static AutoCloseableAsync resource;
-
-	private static String hostname;
-
-	private static int port;
-
-	@BeforeClass
-	public static void setupCluster() throws Exception {
-		configuration = new Configuration();
-
-		configuration.setInteger(WebOptions.PORT, 0);
-		final MiniCluster miniCluster = new MiniCluster(
-			new MiniClusterConfiguration.Builder()
-				.setConfiguration(configuration)
-				.setNumSlotsPerTaskManager(TM_SLOTS)
-				.build());
-
-		miniCluster.start();
-
-		final URI uri = miniCluster.getRestAddress();
-		hostname = uri.getHost();
-		port = uri.getPort();
-
-		configuration.setInteger(WebOptions.PORT, port);
-
-		resource = miniCluster;
-	}
-
-	@AfterClass
-	public static void tearDownCluster() throws Exception {
-		resource.close();
-	}
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberSlotsPerTaskManager(TM_SLOTS)
+			.build());
 
 	/**
 	 * Ensure that the program parallelism can be set even if the configuration is supplied.
@@ -99,6 +65,10 @@ public void testUserSpecificParallelism() throws Exception {
 		Configuration config = new Configuration();
 		config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
 
+		final URI restAddress = MINI_CLUSTER_RESOURCE.getMiniCluster().getRestAddress();
+		final String hostname = restAddress.getHost();
+		final int port = restAddress.getPort();
+
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				hostname,
 				port,
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
index d3247f22c78..6783df0a85b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
@@ -20,8 +20,8 @@
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.junit.ClassRule;
 
@@ -31,7 +31,7 @@
 public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(2)
 			.setNumberSlotsPerTaskManager(2)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
index 0ccb3fe14f8..c4f3e12d8fb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
@@ -20,8 +20,8 @@
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.junit.ClassRule;
 
@@ -31,7 +31,7 @@
 public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(2)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index c1775573765..5f40115b5a1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -26,9 +26,9 @@
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
@@ -59,7 +59,7 @@
 public class IPv6HostnamesITCase extends TestLogger {
 
 	@Rule
-	public final MiniClusterResource miniClusterResource = new MiniClusterResource(
+	public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(2)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
index 418e9476e38..5be9363d251 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
@@ -20,10 +20,10 @@
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AssumptionViolatedException;
@@ -47,7 +47,7 @@
 
 	@Test
 	public void testNettyEpoll() throws Exception {
-		MiniClusterResource cluster = trySetUpCluster();
+		MiniClusterWithClientResource cluster = trySetUpCluster();
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(NUM_TASK_MANAGERS);
@@ -70,11 +70,11 @@ public Integer getKey(Integer value) throws Exception {
 		}
 	}
 
-	private MiniClusterResource trySetUpCluster() throws Exception {
+	private MiniClusterWithClientResource trySetUpCluster() throws Exception {
 		try {
 			Configuration config = new Configuration();
 			config.setString(TRANSPORT_TYPE, "epoll");
-			MiniClusterResource cluster = new MiniClusterResource(
+			MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
 				new MiniClusterResourceConfiguration.Builder()
 					.setConfiguration(config)
 					.setNumberTaskManagers(NUM_TASK_MANAGERS)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 96cb3973b06..ad1de11992b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -32,8 +32,8 @@
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -234,7 +234,7 @@ public void testThroughput() throws Exception {
 
 			final int numTaskManagers = parallelism / numSlotsPerTaskManager;
 
-			final MiniClusterResource cluster = new MiniClusterResource(
+			final MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
 				new MiniClusterResourceConfiguration.Builder()
 					.setNumberTaskManagers(numTaskManagers)
 					.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
@@ -258,7 +258,7 @@ public void testThroughput() throws Exception {
 	}
 
 	private void testProgram(
-			final MiniClusterResource cluster,
+			final MiniClusterWithClientResource cluster,
 			final int dataVolumeGb,
 			final boolean useForwarder,
 			final boolean isSlowSender,
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 7eebde86028..3db0f62f829 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -30,11 +30,11 @@
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -71,7 +71,7 @@
 	public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
 	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(NUM_TMS)
 			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index bef3ebdb948..cf7233594cb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -20,22 +20,18 @@
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -54,42 +50,9 @@
 	//  The mini cluster that is shared across tests
 	// ------------------------------------------------------------------------
 
-	private static final MiniCluster CLUSTER;
-	private static final RestClusterClient<StandaloneClusterId> CLIENT;
-
-	static {
-		try {
-			MiniClusterConfiguration clusterConfiguration = new MiniClusterConfiguration.Builder()
-				.setNumTaskManagers(1)
-				.setNumSlotsPerTaskManager(1)
-				.build();
-			CLUSTER = new MiniCluster(clusterConfiguration);
-			CLUSTER.start();
-
-			URI restAddress = CLUSTER.getRestAddress();
-
-			final Configuration clientConfig = new Configuration();
-			clientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost());
-			clientConfig.setInteger(RestOptions.PORT, restAddress.getPort());
-
-			CLIENT = new RestClusterClient<>(
-				clientConfig,
-				StandaloneClusterId.getInstance());
-
-		} catch (Exception e) {
-			throw new AssertionError("Could not setup cluster.", e);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Cluster setup & teardown
-	// ------------------------------------------------------------------------
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		CLIENT.shutdown();
-		CLUSTER.close();
-	}
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResourceConfiguration.Builder().build());
 
 	private final Random rnd = new Random();
 
@@ -122,17 +85,26 @@ public String map(Integer value) throws Exception {
 		}).addSink(resultSink);
 
 		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
-		CLIENT.setDetached(false);
-		CLIENT.submitJob(jobGraph, BigUserProgramJobSubmitITCase.class.getClassLoader());
 
-		List<String> expected = Arrays.asList("x 1 0", "x 3 0", "x 5 0");
+		final RestClusterClient<StandaloneClusterId> restClusterClient = new RestClusterClient<>(
+			MINI_CLUSTER_RESOURCE.getClientConfiguration(),
+			StandaloneClusterId.getInstance());
 
-		List<String> result = CollectingSink.result;
+		try {
+			restClusterClient.setDetached(false);
+			restClusterClient.submitJob(jobGraph, BigUserProgramJobSubmitITCase.class.getClassLoader());
+
+			List<String> expected = Arrays.asList("x 1 0", "x 3 0", "x 5 0");
 
-		Collections.sort(expected);
-		Collections.sort(result);
+			List<String> result = CollectingSink.result;
 
-		assertEquals(expected, result);
+			Collections.sort(expected);
+			Collections.sort(result);
+
+			assertEquals(expected, result);
+		} finally {
+			restClusterClient.shutdown();
+		}
 	}
 
 	private static class CollectingSink implements SinkFunction<String> {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 8793b1ff0d1..07f88c6be1d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -29,6 +29,7 @@
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -45,8 +46,7 @@
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -78,7 +78,7 @@
 	static MultiShotLatch latch;
 
 	@ClassRule
-	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+	public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(NUM_TASK_MANAGERS)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Start MiniCluster with random REST port
> ---------------------------------------
>
>                 Key: FLINK-10637
>                 URL: https://issues.apache.org/jira/browse/FLINK-10637
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 1.5.4, 1.6.1, 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The {{MiniCluster}} picks a random port for the {{RpcService}} but not for the REST server endpoint. Due to this it falls back to {{8081}}. This can lead to port conflicts if tests are executed concurrently.
> I propose to rename the {{MiniClusterResource}} into {{MiniClusterResourceWithRestClient}} and add a new {{MiniClusterResource}} which only starts a {{MiniCluster}} with the REST port set to 0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message