spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Josh Rosen (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-19529) TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()
Date Tue, 14 Feb 2017 19:26:41 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Josh Rosen updated SPARK-19529:
-------------------------------
    Fix Version/s: 2.2.0
                   2.1.1
                   2.0.3

> TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-19529
>                 URL: https://issues.apache.org/jira/browse/SPARK-19529
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.6.0, 2.0.0, 2.1.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>             Fix For: 2.0.3, 2.1.1, 2.2.0
>
>
> In Spark's Netty RPC layer, TransportClientFactory.createClient() calls awaitUninterruptibly()
on a Netty future while waiting for a connection to be established. This creates problem when
a Spark task is interrupted while blocking in this call (which can happen in the event of
a slow connection which will eventually time out). This has bad impacts on task cancellation
when interruptOnCancel = true.
> As an example of the impact of this problem, I experienced significant numbers of uncancellable
"zombie tasks" on a production cluster where several tasks were blocked trying to connect
to a dead shuffle server and then continued running as zombies after I cancelled the associated
Spark stage. The zombie tasks ran for several minutes with the following stack:
> {code}
> java.lang.Object.wait(Native Method)
> java.lang.Object.wait(Object.java:460)
> io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) 
> io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)

> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)

> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
=> holding Monitor(java.lang.Object@1849476028}) 
> org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)

> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)

> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)

> org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)

> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)

> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
> 350) 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)

> org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)

> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)

> org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
> [...]
> {code}
> I believe that we can easily fix this by using the InterruptedException-throwing await()
instead.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message