spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Igor Berman (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections
Date Mon, 26 Sep 2016 14:44:20 GMT
Igor Berman created SPARK-17666:
-----------------------------------

             Summary: take() or isEmpty() on dataset leaks s3a connections
                 Key: SPARK-17666
                 URL: https://issues.apache.org/jira/browse/SPARK-17666
             Project: Spark
          Issue Type: Bug
          Components: Java API
    Affects Versions: 2.0.0
         Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
            Reporter: Igor Berman


I'm experiensing problems with s3a and working with parquet with dataset api
the symptom of problem - tasks failing with 
{code}
Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection
from pool
	at org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
	at org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
	at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
{code}

Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in CLOSE_WAIT state

reproduction of problem:
{code}
package com.dy.sparkbi.experiment.compaction;

import java.text.ParseException;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import com.dy.sparkbi.common.S3CredentialsLoader;

public class ConnectionLeakTest {
	public static void main(String[] args) throws ParseException {
		SparkConf sparkConf = new SparkConf();

		sparkConf.setMaster("local[*]");
		sparkConf.setAppName("Test");
		sparkConf.set("spark.local.dir", "/tmp/spark");
		sparkConf.set("spark.sql.shuffle.partitions", "2");

		SparkSession session = SparkSession.builder().config(sparkConf).getOrCreate();
//set your credentials to your bucket

		for (int i = 0; i < 100; i++) {
			Dataset<Row> df = session
					.sqlContext()
					.read()
					.parquet("s3a://test/*");//contains multiple snappy compressed parquet files
			if (df.rdd().isEmpty()) {//same problem with takeAsList().isEmpty()
				System.out.println("Yes");
			} else {
				System.out.println("No");
			}
		}
		System.out.println("Done");
	}
}
{code}
so when program runs, you can jps for pid and do lsof -p <pid> | grep https
and you'll see constant grow of CLOSE_WAITs

Our way to bypass problem is to use count() == 0
In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() doesn't produce problem
too







--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message