flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9878) IO worker threads BLOCKED on SSL Session Cache while CMS full gc
Date Mon, 20 Aug 2018 21:54:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586567#comment-16586567
] 

ASF GitHub Bot commented on FLINK-9878:
---------------------------------------

NicoK closed pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/security_configuration.html b/docs/_includes/generated/security_configuration.html
index cd682ecaf0f..357629473cd 100644
--- a/docs/_includes/generated/security_configuration.html
+++ b/docs/_includes/generated/security_configuration.html
@@ -12,11 +12,21 @@
             <td style="word-wrap: break-word;">"TLS_RSA_WITH_AES_128_CBC_SHA"</td>
             <td>The comma separated list of standard SSL algorithms to be supported.
Read more &#60;a href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites"&#62;here&#60;/a&#62;.</td>
         </tr>
+        <tr>
+            <td><h5>security.ssl.close-notify-flush-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) for flushing the `close_notify` that was triggered
by closing a channel. If the `close_notify` was not flushed in the given timeout the channel
will be closed forcibly. (-1 = use system default)</td>
+        </tr>
         <tr>
             <td><h5>security.ssl.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Turns on SSL for internal network communication. This can be optionally
overridden by flags defined in different transport modules.</td>
         </tr>
+        <tr>
+            <td><h5>security.ssl.handshake-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) during SSL handshake. (-1 = use system default)</td>
+        </tr>
         <tr>
             <td><h5>security.ssl.key-password</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -37,6 +47,16 @@
             <td style="word-wrap: break-word;">"TLSv1.2"</td>
             <td>The SSL protocol version to be supported for the ssl transport. Note
that it doesn’t support comma separated list.</td>
         </tr>
+        <tr>
+            <td><h5>security.ssl.session-cache-size</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The size of the cache used for storing SSL session objects. According
to https://github.com/netty/netty/issues/832, you should always set this to an appropriate
number to not run into a bug with stalling IO threads during garbage collection. (-1 = use
system default).</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.session-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) for the cached SSL session objects. (-1 = use system
default)</td>
+        </tr>
         <tr>
             <td><h5>security.ssl.truststore</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/ops/security-ssl.md b/docs/ops/security-ssl.md
index c2ba7df8849..a805238ae08 100644
--- a/docs/ops/security-ssl.md
+++ b/docs/ops/security-ssl.md
@@ -33,6 +33,10 @@ SSL can be enabled for all network communication between Flink components.
SSL k
 * **akka.ssl.enabled**: SSL flag for akka based control connection between the Flink client,
jobmanager and taskmanager 
 * **jobmanager.web.ssl.enabled**: Flag to enable https access to the jobmanager's web frontend
 
+### Complete List of SSL Options
+
+{% include generated/security_configuration.html %}
+
 ## Deploying Keystores and Truststores
 
 You need to have a Java Keystore generated and copied to each node in the Flink cluster.
The common name or subject alternative names in the certificate should match the node's hostname
and IP address. Keystores and truststores can be generated using the [keytool utility](https://docs.oracle.com/javase/8/docs/technotes/tools/unix/keytool.html).
All Flink components should have read access to the keystore and truststore files.
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 0f25c6caf95..60a97643a4e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -160,4 +160,41 @@
 		key("security.ssl.verify-hostname")
 			.defaultValue(true)
 			.withDescription("Flag to enable peer’s hostname verification during ssl handshake.");
+
+	/**
+	 * SSL session cache size.
+	 */
+	public static final ConfigOption<Integer> SSL_SESSION_CACHE_SIZE =
+		key("security.ssl.session-cache-size")
+			.defaultValue(-1)
+			.withDescription("The size of the cache used for storing SSL session objects. "
+				+ "According to https://github.com/netty/netty/issues/832, you should always set "
+				+ "this to an appropriate number to not run into a bug with stalling IO threads "
+				+ "during garbage collection. (-1 = use system default).");
+
+	/**
+	 * SSL session timeout.
+	 */
+	public static final ConfigOption<Integer> SSL_SESSION_TIMEOUT =
+		key("security.ssl.session-timeout")
+			.defaultValue(-1)
+			.withDescription("The timeout (in ms) for the cached SSL session objects. (-1 = use system
default)");
+
+	/**
+	 * SSL session timeout during handshakes.
+	 */
+	public static final ConfigOption<Integer> SSL_HANDSHAKE_TIMEOUT =
+		key("security.ssl.handshake-timeout")
+			.defaultValue(-1)
+			.withDescription("The timeout (in ms) during SSL handshake. (-1 = use system default)");
+
+	/**
+	 * SSL session timeout after flushing the <tt>close_notify</tt> message.
+	 */
+	public static final ConfigOption<Integer> SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT =
+		key("security.ssl.close-notify-flush-timeout")
+			.defaultValue(-1)
+			.withDescription("The timeout (in ms) for flushing the `close_notify` that was triggered
by closing a " +
+				"channel. If the `close_notify` was not flushed in the given timeout the channel will
be closed " +
+				"forcibly. (-1 = use system default)");
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 57f4718816c..30c4edf0155 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -58,7 +58,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.File;
@@ -104,7 +104,8 @@
 
 	private final Map<Path, URL> paths = new HashMap<>();
 
-	private final SSLContext serverSSLContext;
+	@Nullable
+	private final SSLUtils.SSLContext serverSSLContext;
 
 	public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration
config)
 		throws Exception {
@@ -139,7 +140,7 @@ protected void initChannel(SocketChannel ch) {
 
 				// SSL should be the first handler in the pipeline
 				if (serverSSLContext != null) {
-					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+					SSLEngine sslEngine = serverSSLContext.getSslContext().createSSLEngine();
 					SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
 					sslEngine.setUseClientMode(false);
 					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index f27ae0067d7..4323ad01168 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -92,7 +92,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -130,7 +130,8 @@
 	/** Service which retrieves the currently leading JobManager and opens a JobManagerGateway.
*/
 	private final LeaderGatewayRetriever<JobManagerGateway> retriever;
 
-	private final SSLContext serverSSLContext;
+	@Nullable
+	private final SSLUtils.SSLContext serverSSLContext;
 
 	private final CompletableFuture<String> localRestAddress = new CompletableFuture<>();
 
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 108f5c4595a..0484afdcc34 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -43,7 +43,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileWriter;
@@ -90,7 +90,8 @@
 
 	private final HistoryServerArchiveFetcher archiveFetcher;
 
-	private final SSLContext serverSSLContext;
+	@Nullable
+	private final SSLUtils.SSLContext serverSSLContext;
 	private WebFrontendBootstrap netty;
 
 	private final Object startupShutdownLock = new Object();
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index 740beaee1cb..c3148b73878 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -40,7 +40,7 @@
 
 import org.slf4j.Logger;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.File;
@@ -55,7 +55,8 @@
 	private final Router router;
 	private final Logger log;
 	private final File uploadDir;
-	private final SSLContext serverSSLContext;
+	@Nullable
+	private final SSLUtils.SSLContext serverSSLContext;
 	private final ServerBootstrap bootstrap;
 	private final Channel serverChannel;
 	private final String restAddress;
@@ -64,7 +65,7 @@ public WebFrontendBootstrap(
 			Router router,
 			Logger log,
 			File directory,
-			SSLContext sslContext,
+			@Nullable SSLUtils.SSLContext sslContext,
 			String configuredAddress,
 			int configuredPort,
 			final Configuration config) throws InterruptedException, UnknownHostException {
@@ -81,7 +82,7 @@ protected void initChannel(SocketChannel ch) {
 
 				// SSL should be the first handler in the pipeline
 				if (serverSSLContext != null) {
-					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+					SSLEngine sslEngine = serverSSLContext.getSslContext().createSSLEngine();
 					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
 					sslEngine.setUseClientMode(false);
 					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 8e6b32811c2..4bf71779058 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -31,7 +31,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLSocket;
 
@@ -91,7 +90,7 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig)
t
 
 		try {
 			// Check if ssl is enabled
-			SSLContext clientSSLContext = null;
+			SSLUtils.SSLContext clientSSLContext = null;
 			if (clientConfig != null &&
 				clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
 
@@ -102,7 +101,7 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig)
t
 
 				LOG.info("Using ssl connection to the blob server");
 
-				SSLSocket sslSocket = (SSLSocket) clientSSLContext.getSocketFactory().createSocket(
+				SSLSocket sslSocket = (SSLSocket) clientSSLContext.getSslContext().getSocketFactory().createSocket(
 					serverAddress.getAddress(),
 					serverAddress.getPort());
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index dd0155cbbdd..1a1b0da63dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -33,7 +33,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -79,7 +78,7 @@
 	private final ServerSocket serverSocket;
 
 	/** The SSL server context if ssl is enabled for the connections. */
-	private final SSLContext serverSSLContext;
+	private final SSLUtils.SSLContext serverSSLContext;
 
 	/** Blob Server configuration. */
 	private final Configuration blobServiceConfiguration;
@@ -196,7 +195,7 @@ public ServerSocket createSocket(int port) throws IOException {
 					return new ServerSocket(port, finalBacklog);
 				} else {
 					LOG.info("Enabling ssl for the blob server");
-					return serverSSLContext.getServerSocketFactory().createServerSocket(port, finalBacklog);
+					return serverSSLContext.getSslContext().getServerSocketFactory().createServerSocket(port,
finalBacklog);
 				}
 			}
 		});
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index 5fb083d33ad..44561b0638f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.net.SSLUtils;
+
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -34,9 +36,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
@@ -52,7 +55,8 @@
 
 	private Bootstrap bootstrap;
 
-	private SSLContext clientSSLContext = null;
+	@Nullable
+	private SSLUtils.SSLContext clientSSLContext = null;
 
 	NettyClient(NettyConfig config) {
 		this.config = config;
@@ -178,7 +182,7 @@ public void initChannel(SocketChannel channel) throws Exception {
 
 				// SSL handler should be added first in the pipeline
 				if (clientSSLContext != null) {
-					SSLEngine sslEngine = clientSSLContext.createSSLEngine(
+					SSLEngine sslEngine = clientSSLContext.getSslContext().createSSLEngine(
 						serverSocketAddress.getAddress().getCanonicalHostName(),
 						serverSocketAddress.getPort());
 					sslEngine.setUseClientMode(true);
@@ -190,7 +194,14 @@ public void initChannel(SocketChannel channel) throws Exception {
 						sslEngine.setSSLParameters(newSSLParameters);
 					}
 
-					channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
+					SslHandler sslHandler = new SslHandler(sslEngine);
+					if (clientSSLContext.getHandshakeTimeoutMs() >= 0) {
+						sslHandler.setHandshakeTimeoutMillis(clientSSLContext.getHandshakeTimeoutMs());
+					}
+					if (clientSSLContext.getCloseNotifyFlushTimeoutMs() >= 0) {
+						sslHandler.setCloseNotifyTimeoutMillis(clientSSLContext.getCloseNotifyFlushTimeoutMs());
+					}
+					channel.pipeline().addLast("ssl", sslHandler);
 				}
 				channel.pipeline().addLast(protocol.getClientChannelHandlers());
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 18527c46a53..9b32ebbcd95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -23,12 +23,14 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.net.SSLUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
+
 import java.net.InetAddress;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -189,26 +191,13 @@ public TransportType getTransportType() {
 		}
 	}
 
-	public SSLContext createClientSSLContext() throws Exception {
-
-		// Create SSL Context from config
-		SSLContext clientSSLContext = null;
-		if (getSSLEnabled()) {
-			clientSSLContext = SSLUtils.createSSLClientContext(config);
-		}
-
-		return clientSSLContext;
+	@Nullable
+	public SSLUtils.SSLContext createClientSSLContext() throws Exception {
+		return SSLUtils.createSSLClientContext(config);
 	}
 
-	public SSLContext createServerSSLContext() throws Exception {
-
-		// Create SSL Context from config
-		SSLContext serverSSLContext = null;
-		if (getSSLEnabled()) {
-			serverSSLContext = SSLUtils.createSSLServerContext(config);
-		}
-
-		return serverSSLContext;
+	public SSLUtils.SSLContext createServerSSLContext() throws Exception {
+		return SSLUtils.createSSLServerContext(config);
 	}
 
 	public boolean getSSLEnabled() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index c6d09d05499..f919ded9e3c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -36,7 +37,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.IOException;
@@ -61,7 +62,8 @@
 
 	private ChannelFuture bindFuture;
 
-	private SSLContext serverSSLContext = null;
+	@Nullable
+	private SSLUtils.SSLContext serverSSLContext = null;
 
 	private InetSocketAddress localAddress;
 
@@ -152,10 +154,17 @@ void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool)
throws
 			@Override
 			public void initChannel(SocketChannel channel) throws Exception {
 				if (serverSSLContext != null) {
-					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+					SSLEngine sslEngine = serverSSLContext.getSslContext().createSSLEngine();
 					config.setSSLVerAndCipherSuites(sslEngine);
 					sslEngine.setUseClientMode(false);
-					channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
+					SslHandler sslHandler = new SslHandler(sslEngine);
+					if (serverSSLContext.getHandshakeTimeoutMs() >= 0) {
+						sslHandler.setHandshakeTimeoutMillis(serverSSLContext.getHandshakeTimeoutMs());
+					}
+					if (serverSSLContext.getCloseNotifyFlushTimeoutMs() >= 0) {
+						sslHandler.setCloseNotifyTimeoutMillis(serverSSLContext.getCloseNotifyFlushTimeoutMs());
+					}
+					channel.pipeline().addLast("ssl", sslHandler);
 				}
 
 				channel.pipeline().addLast(protocol.getServerChannelHandlers());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index b574d30484f..69da666ab1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -113,7 +113,7 @@ private static SSLEngineFactory createSSLEngineFactory(
 		checkState(sslContext != null, "%s it not enabled", SecurityOptions.SSL_ENABLED.key());
 
 		return new SSLEngineFactory(
-			sslContext,
+			sslContext.getSslContext(),
 			getEnabledProtocols(config),
 			getEnabledCipherSuites(config),
 			clientMode);
@@ -176,39 +176,43 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters
s
 	public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception
{
 
 		Preconditions.checkNotNull(sslConfig);
-		SSLContext clientSSLContext = null;
 
-		if (getSSLEnabled(sslConfig)) {
-			LOG.debug("Creating client SSL context from configuration");
-
-			String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
-			String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
-			String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+		if (!getSSLEnabled(sslConfig)) {
+			return null;
+		}
 
-			Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() +
" was not configured.");
-			Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key()
+ " was not configured.");
+		LOG.debug("Creating client SSL context from configuration");
 
-			KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+		String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
+		String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+		String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+		int sessionCacheSize = sslConfig.getInteger(SecurityOptions.SSL_SESSION_CACHE_SIZE);
+		int sessionTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_SESSION_TIMEOUT);
+		int handshakeTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_HANDSHAKE_TIMEOUT);
+		int closeNotifyFlushTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT);
 
-			FileInputStream trustStoreFile = null;
-			try {
-				trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
-				trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
-			} finally {
-				if (trustStoreFile != null) {
-					trustStoreFile.close();
-				}
-			}
+		Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + "
was not configured.");
+		Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key()
+ " was not configured.");
 
-			TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
-				TrustManagerFactory.getDefaultAlgorithm());
-			trustManagerFactory.init(trustStore);
+		KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
 
-			clientSSLContext = SSLContext.getInstance(sslProtocolVersion);
-			clientSSLContext.init(null, trustManagerFactory.getTrustManagers(), null);
+		try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath)))
{
+			trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
 		}
 
-		return clientSSLContext;
+		TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
+			TrustManagerFactory.getDefaultAlgorithm());
+		trustManagerFactory.init(trustStore);
+
+		javax.net.ssl.SSLContext clientSSLContext = javax.net.ssl.SSLContext.getInstance(sslProtocolVersion);
+		clientSSLContext.init(null, trustManagerFactory.getTrustManagers(), null);
+		if (sessionCacheSize >= 0) {
+			clientSSLContext.getClientSessionContext().setSessionCacheSize(sessionCacheSize);
+		}
+		if (sessionTimeoutMs >= 0) {
+			clientSSLContext.getClientSessionContext().setSessionTimeout(sessionTimeoutMs / 1000);
+		}
+		return new SSLContext(clientSSLContext, handshakeTimeoutMs, closeNotifyFlushTimeoutMs);
 	}
 
 	/**
@@ -225,38 +229,77 @@ public static SSLContext createSSLClientContext(Configuration sslConfig)
throws
 	public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception
{
 
 		Preconditions.checkNotNull(sslConfig);
-		SSLContext serverSSLContext = null;
 
-		if (getSSLEnabled(sslConfig)) {
-			LOG.debug("Creating server SSL context from configuration");
+		if (!getSSLEnabled(sslConfig)) {
+			return null;
+		}
 
-			String keystoreFilePath = sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
+		LOG.debug("Creating server SSL context from configuration");
 
-			String keystorePassword = sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+		String keystoreFilePath = sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
+		String keystorePassword = sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+		String certPassword = sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
+		String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+		int sessionCacheSize = sslConfig.getInteger(SecurityOptions.SSL_SESSION_CACHE_SIZE);
+		int sessionTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_SESSION_TIMEOUT);
+		int handshakeTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_HANDSHAKE_TIMEOUT);
+		int closeNotifyFlushTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT);
 
-			String certPassword = sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
+		Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was
not configured.");
+		Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key()
+ " was not configured.");
+		Preconditions.checkNotNull(certPassword, SecurityOptions.SSL_KEY_PASSWORD.key() + " was
not configured.");
 
-			String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+		KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+		try (FileInputStream keyStoreFile = new FileInputStream(new File(keystoreFilePath))) {
+			ks.load(keyStoreFile, keystorePassword.toCharArray());
+		}
 
-			Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was
not configured.");
-			Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key()
+ " was not configured.");
-			Preconditions.checkNotNull(certPassword, SecurityOptions.SSL_KEY_PASSWORD.key() + " was
not configured.");
+		// Set up key manager factory to use the server key store
+		KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+			KeyManagerFactory.getDefaultAlgorithm());
+		kmf.init(ks, certPassword.toCharArray());
 
-			KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-			try (FileInputStream keyStoreFile = new FileInputStream(new File(keystoreFilePath))) {
-				ks.load(keyStoreFile, keystorePassword.toCharArray());
-			}
+		// Initialize the SSLContext
+		javax.net.ssl.SSLContext serverSSLContext = javax.net.ssl.SSLContext.getInstance(sslProtocolVersion);
+		serverSSLContext.init(kmf.getKeyManagers(), null, null);
+		if (sessionCacheSize >= 0) {
+			serverSSLContext.getServerSessionContext().setSessionCacheSize(sessionCacheSize);
+		}
+		if (sessionTimeoutMs >= 0) {
+			serverSSLContext.getServerSessionContext().setSessionTimeout(sessionTimeoutMs / 1000);
+		}
 
-			// Set up key manager factory to use the server key store
-			KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-					KeyManagerFactory.getDefaultAlgorithm());
-			kmf.init(ks, certPassword.toCharArray());
+		return new SSLContext(serverSSLContext, handshakeTimeoutMs, closeNotifyFlushTimeoutMs);
+	}
 
-			// Initialize the SSLContext
-			serverSSLContext = SSLContext.getInstance(sslProtocolVersion);
-			serverSSLContext.init(kmf.getKeyManagers(), null, null);
+	/**
+	 * Wrapper around javax.net.ssl.SSLContext, adding SSL handshake and close notify timeouts
+	 * which cannot be set on the SSL context directly.
+	 */
+	public static class SSLContext {
+		private final javax.net.ssl.SSLContext sslContext;
+		private final int handshakeTimeoutMs;
+		private final int closeNotifyFlushTimeoutMs;
+
+		public SSLContext(
+				javax.net.ssl.SSLContext sslContext,
+				int handshakeTimeoutMs,
+				int closeNotifyFlushTimeoutMs) {
+			this.sslContext = sslContext;
+			this.handshakeTimeoutMs = handshakeTimeoutMs;
+			this.closeNotifyFlushTimeoutMs = closeNotifyFlushTimeoutMs;
 		}
 
-		return serverSSLContext;
+		public javax.net.ssl.SSLContext getSslContext() {
+			return sslContext;
+		}
+
+		public int getHandshakeTimeoutMs() {
+			return handshakeTimeoutMs;
+		}
+
+		public int getCloseNotifyFlushTimeoutMs() {
+			return closeNotifyFlushTimeoutMs;
+		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 33e004ebdf6..e7113ec6695 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.util.NetUtils;
@@ -26,15 +27,27 @@
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecoder;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.net.ssl.SSLSessionContext;
+
 import java.net.InetAddress;
 
+import static org.apache.flink.configuration.SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT;
+import static org.apache.flink.configuration.SecurityOptions.SSL_HANDSHAKE_TIMEOUT;
+import static org.apache.flink.configuration.SecurityOptions.SSL_SESSION_CACHE_SIZE;
+import static org.apache.flink.configuration.SecurityOptions.SSL_SESSION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for communication between {@link NettyServer} and {@link NettyClient} via SSL.
+ */
 public class NettyClientServerSslTest {
 
 	/**
@@ -42,52 +55,76 @@
 	 */
 	@Test
 	public void testValidSslConnection() throws Exception {
-		NettyProtocol protocol = new NettyProtocol(null, null, true) {
-			@Override
-			public ChannelHandler[] getServerChannelHandlers() {
-				return new ChannelHandler[0];
-			}
+		testValidSslConnection(createSslConfig());
+	}
 
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-		};
+	/**
+	 * Verify valid (advanced) ssl configuration and connection.
+	 */
+	@Test
+	public void testValidSslConnectionAdvanced() throws Exception {
+		Configuration sslConfig = createSslConfig();
+		sslConfig.setInteger(SSL_SESSION_CACHE_SIZE, 1);
+		sslConfig.setInteger(SSL_SESSION_TIMEOUT, 1_000);
+		sslConfig.setInteger(SSL_HANDSHAKE_TIMEOUT, 1_000);
+		sslConfig.setInteger(SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT, 1_000);
+
+		testValidSslConnection(sslConfig);
+	}
+
+	private void testValidSslConnection(Configuration sslConfig) throws Exception {
+		NettyProtocol protocol = getEmptyNettyProtocol();
 
 		NettyConfig nettyConfig = new NettyConfig(
 			InetAddress.getLoopbackAddress(),
 			NetUtils.getAvailablePort(),
 			NettyTestUtil.DEFAULT_SEGMENT_SIZE,
 			1,
-			createSslConfig());
+			sslConfig);
 
 		NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol,
nettyConfig);
 
 		Channel ch = NettyTestUtil.connect(serverAndClient);
 
+		SslHandler sslHandler = (SslHandler) ch.pipeline().get("ssl");
+		assertEqualsOrDefault(sslConfig, SSL_HANDSHAKE_TIMEOUT, sslHandler.getHandshakeTimeoutMillis());
+		assertEqualsOrDefault(sslConfig, SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT, sslHandler.getCloseNotifyTimeoutMillis());
+
 		// should be able to send text data
 		ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
 		assertTrue(ch.writeAndFlush("test").await().isSuccess());
 
+		// session context is only be available after a session was setup -> this should be
true after data was sent
+		SSLSessionContext sessionContext = sslHandler.engine().getSession().getSessionContext();
+		assertNotNull("bug in unit test setup: session context not available", sessionContext);
+		assertEqualsOrDefault(sslConfig, SSL_SESSION_CACHE_SIZE, sessionContext.getSessionCacheSize());
+		int sessionTimeout = sslConfig.getInteger(SSL_SESSION_TIMEOUT);
+		if (sessionTimeout != -1) {
+			// session timeout config is in milliseconds but the context returns it in seconds
+			assertEquals(sessionTimeout / 1000, sessionContext.getSessionTimeout());
+		} else {
+			assertTrue("default value (-1) should not be propagated", sessionContext.getSessionTimeout()
>= 0);
+		}
+
 		NettyTestUtil.shutdown(serverAndClient);
 	}
 
+	private static void assertEqualsOrDefault(Configuration sslConfig, ConfigOption<Integer>
option, long actual) {
+		long expected = sslConfig.getInteger(option);
+		if (expected != option.defaultValue()) {
+			assertEquals(expected, actual);
+		} else {
+			assertTrue("default value (" + option.defaultValue() + ") should not be propagated",
+				actual >= 0);
+		}
+	}
+
 	/**
 	 * Verify failure on invalid ssl configuration.
 	 */
 	@Test
-	public void testInvalidSslConfiguration() throws Exception {
-		NettyProtocol protocol = new NettyProtocol(null, null, true) {
-			@Override
-			public ChannelHandler[] getServerChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-		};
+	public void testInvalidSslConfiguration() {
+		NettyProtocol protocol = getEmptyNettyProtocol();
 
 		Configuration config = createSslConfig();
 		// Modify the keystore password to an incorrect one
@@ -116,17 +153,7 @@ public void testInvalidSslConfiguration() throws Exception {
 	 */
 	@Test
 	public void testSslHandshakeError() throws Exception {
-		NettyProtocol protocol = new NettyProtocol(null, null, true) {
-			@Override
-			public ChannelHandler[] getServerChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-		};
+		NettyProtocol protocol = getEmptyNettyProtocol();
 
 		Configuration config = createSslConfig();
 
@@ -151,8 +178,7 @@ public void testSslHandshakeError() throws Exception {
 		NettyTestUtil.shutdown(serverAndClient);
 	}
 
-	private Configuration createSslConfig() throws Exception {
-
+	private Configuration createSslConfig() {
 		Configuration flinkConfig = new Configuration();
 		flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
 		flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
@@ -162,4 +188,18 @@ private Configuration createSslConfig() throws Exception {
 		flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
 		return flinkConfig;
 	}
+
+	private static NettyProtocol getEmptyNettyProtocol() {
+		return new NettyProtocol(null, null, true) {
+			@Override
+			public ChannelHandler[] getServerChannelHandlers() {
+				return new ChannelHandler[0];
+			}
+
+			@Override
+			public ChannelHandler[] getClientChannelHandlers() {
+				return new ChannelHandler[0];
+			}
+		};
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index 38c8ceeedf9..a5db40fd5af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -24,7 +24,6 @@
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLServerSocket;
 
@@ -33,6 +32,7 @@
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -58,7 +58,7 @@ public void testCreateSSLClientContext() throws Exception {
 		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
 		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
 
-		SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
+		SSLUtils.SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
 		Assert.assertNotNull(clientContext);
 	}
 
@@ -71,7 +71,7 @@ public void testCreateSSLClientContextWithSSLDisabled() throws Exception
{
 		Configuration clientConfig = new Configuration();
 		clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, false);
 
-		SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
+		SSLUtils.SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
 		Assert.assertNull(clientContext);
 	}
 
@@ -87,7 +87,7 @@ public void testCreateSSLClientContextMisconfiguration() {
 		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "badpassword");
 
 		try {
-			SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
+			SSLUtils.SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
 			Assert.fail("SSL client context created even with bad SSL configuration ");
 		} catch (Exception e) {
 			// Exception here is valid
@@ -106,7 +106,7 @@ public void testCreateSSLServerContext() throws Exception {
 		serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
 		serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
 
-		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+		SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 		Assert.assertNotNull(serverContext);
 	}
 
@@ -119,7 +119,7 @@ public void testCreateSSLServerContextWithSSLDisabled() throws Exception
{
 		Configuration serverConfig = new Configuration();
 		serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, false);
 
-		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+		SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 		Assert.assertNull(serverContext);
 	}
 
@@ -136,7 +136,7 @@ public void testCreateSSLServerContextMisconfiguration() {
 		serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "badpassword");
 
 		try {
-			SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+			SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 			Assert.fail("SSL server context created even with bad SSL configuration ");
 		} catch (Exception e) {
 			// Exception here is valid
@@ -157,7 +157,7 @@ public void testCreateSSLServerContextWithMultiProtocols() {
 		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1,TLSv1.2");
 
 		try {
-			SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+			SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 			Assert.fail("SSL server context created even with multiple protocols set ");
 		} catch (Exception e) {
 			// Exception here is valid
@@ -178,10 +178,9 @@ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws
Exceptio
 		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1.1");
 		serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256");
 
-		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
-		ServerSocket socket = null;
-		try {
-			socket = serverContext.getServerSocketFactory().createServerSocket(0);
+		SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+		assertNotNull(serverContext);
+		try (ServerSocket socket = serverContext.getSslContext().getServerSocketFactory().createServerSocket(0))
{
 
 			String[] protocols = ((SSLServerSocket) socket).getEnabledProtocols();
 			String[] algorithms = ((SSLServerSocket) socket).getEnabledCipherSuites();
@@ -198,10 +197,6 @@ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws
Exceptio
 			Assert.assertEquals(2, algorithms.length);
 			Assert.assertTrue(algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
 			Assert.assertTrue(algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
-		} finally {
-			if (socket != null) {
-				socket.close();
-			}
 		}
 	}
 
@@ -219,8 +214,9 @@ public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception
{
 		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1");
 		serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
 
-		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
-		SSLEngine engine = serverContext.createSSLEngine();
+		SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+		assertNotNull(serverContext);
+		SSLEngine engine = serverContext.getSslContext().createSSLEngine();
 
 		String[] protocols = engine.getEnabledProtocols();
 		String[] algorithms = engine.getEnabledCipherSuites();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 93dbb5dc145..59db1630cce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -164,9 +164,9 @@ public void setup() throws Exception {
 		config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath());
 
 		defaultSSLContext = SSLContext.getDefault();
-		final SSLContext sslClientContext = SSLUtils.createSSLClientContext(config);
+		final SSLUtils.SSLContext sslClientContext = SSLUtils.createSSLClientContext(config);
 		if (sslClientContext != null) {
-			SSLContext.setDefault(sslClientContext);
+			SSLContext.setDefault(sslClientContext.getSslContext());
 		}
 
 		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> IO worker threads BLOCKED on SSL Session Cache while CMS full gc
> ----------------------------------------------------------------
>
>                 Key: FLINK-9878
>                 URL: https://issues.apache.org/jira/browse/FLINK-9878
>             Project: Flink
>          Issue Type: Bug
>          Components: Network
>    Affects Versions: 1.5.0, 1.5.1, 1.6.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> According to https://github.com/netty/netty/issues/832, there is a JDK issue during garbage
collection when the SSL session cache is not limited. We should allow the user to configure
this and further (advanced) SSL parameters for fine-tuning to fix this and similar issues.
In particular, the following parameters should be configurable:
> - SSL session cache size
> - SSL session timeout
> - SSL handshake timeout
> - SSL close notify flush timeout



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Mime
View raw message