spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Igor Berman (JIRA)" <>
Subject [jira] [Created] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections
Date Mon, 26 Sep 2016 14:44:20 GMT
Igor Berman created SPARK-17666:

             Summary: take() or isEmpty() on dataset leaks s3a connections
                 Key: SPARK-17666
             Project: Spark
          Issue Type: Bug
          Components: Java API
    Affects Versions: 2.0.0
         Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
            Reporter: Igor Berman

I'm experiensing problems with s3a and working with parquet with dataset api
the symptom of problem - tasks failing with 
Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection
from pool
	at org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(
	at org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(
	at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)

Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in CLOSE_WAIT state

reproduction of problem:
package com.dy.sparkbi.experiment.compaction;

import java.text.ParseException;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import com.dy.sparkbi.common.S3CredentialsLoader;

public class ConnectionLeakTest {
	public static void main(String[] args) throws ParseException {
		SparkConf sparkConf = new SparkConf();

		sparkConf.set("spark.local.dir", "/tmp/spark");
		sparkConf.set("spark.sql.shuffle.partitions", "2");

		SparkSession session = SparkSession.builder().config(sparkConf).getOrCreate();
//set your credentials to your bucket

		for (int i = 0; i < 100; i++) {
			Dataset<Row> df = session
					.parquet("s3a://test/*");//contains multiple snappy compressed parquet files
			if (df.rdd().isEmpty()) {//same problem with takeAsList().isEmpty()
			} else {
so when program runs, you can jps for pid and do lsof -p <pid> | grep https
and you'll see constant grow of CLOSE_WAITs

Our way to bypass problem is to use count() == 0
In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() doesn't produce problem

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message