spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (SPARK-11711) Finalizer memory leak is pyspark
Date Tue, 29 Dec 2015 11:35:49 GMT

     [ https://issues.apache.org/jira/browse/SPARK-11711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Sean Owen resolved SPARK-11711.
-------------------------------
    Resolution: Duplicate

Merging this into the other as it has slightly more discussion of the same issue

> Finalizer memory leak is pyspark
> --------------------------------
>
>                 Key: SPARK-11711
>                 URL: https://issues.apache.org/jira/browse/SPARK-11711
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.5.1
>            Reporter: David Watson
>
> I've been having super consistent memory leaks in the java process of python spark streaming
scripts on my driver.  A heap profile analysis showed MILLIONS of Finalizer objects.  
> The spark web interface under Executor Thread Dump shows:
> Thread 3: Finalizer (WAITING):
> java.net.SocketInputStream.socketRead0(Native Method)
> java.net.SocketInputStream.read(SocketInputStream.java:152)
> java.net.SocketInputStream.read(SocketInputStream.java:122)
> sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
> sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
> sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
> java.io.InputStreamReader.read(InputStreamReader.java:184)
> java.io.BufferedReader.fill(BufferedReader.java:154)
> java.io.BufferedReader.readLine(BufferedReader.java:317)
> java.io.BufferedReader.readLine(BufferedReader.java:382)
> py4j.CallbackConnection.sendCommand(CallbackConnection.java:82)
> py4j.CallbackClient.sendCommand(CallbackClient.java:236)
> py4j.reflection.PythonProxyHandler.finalize(PythonProxyHandler.java:81)
> java.lang.System$2.invokeFinalize(System.java:1213)
> java.lang.ref.Finalizer.runFinalizer(Finalizer.java:98)
> java.lang.ref.Finalizer.access$100(Finalizer.java:34)
> java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:210)
> It appears the problem is with py4j.  I don't have a patch because the bug is inside
the python/lib/py4j-0.8.2.1-src.zip zip file.  I've monkey patched and it appears to fix the
problem.
> in py4j.java_gateway.CallbackConnection:1186 run():
> """
>                 elif command == GARBAGE_COLLECT_PROXY_COMMAND_NAME:
>                     self.input.readline()
>                     del(self.pool[obj_id])
> """
> NOTE: it doesn't write a response to the socket!
> and on the java side, CallbackConnection.java:82 sendCommand():
> """
> 			returnCommand = reader.readLine();
> """
> I don't know what the protocol is supposed to be, but the java side wants a response
but the python side isn't sending it.  As you can see from the stack trace, this jams up the
java FinalizerThread which keeps anything from getting finalized, spark related or not.
> My monkey patch to py4j.java_gateway.CallbackConnection:1186 run():
> """
>                 elif command == GARBAGE_COLLECT_PROXY_COMMAND_NAME:
>                     self.input.readline()
>                     del(self.pool[obj_id])
> +                    ## PATCH: send an empty response!
> +                    self.socket.sendall("\n")
> +                    ##
> """
> This bug appears to exist in the current py4j, but I can't find the repository for the
0.8.2.1 version embedded in spark.
> I'm not entirely sure, but I suspect that (at least on the driver) this doesn't normally
get triggered because the java object references held by python are long lived so it wouldn't
get triggered (thus jamming up the FinalizerThread) until the program ends. 
> My code is peeking at checkpoint file (code below) before starting the script, which
looks like it's jamming things up at the beginning, and any other finalized objects (scala?
java?) are piling up behind it.
> """
> def loadCheckpoint(checkpointPath):
>     StreamingContext._ensure_initialized()
>     gw = SparkContext._gateway
>     # Check whether valid checkpoint information exists in the given path
>     cp = gw.jvm.CheckpointReader.read(checkpointPath)
>     assert cp is not None, "Couldn't load %s" % checkpointPath
>     return cp.get()
> """
> At any rate, I can confirm that the same situation exists on the worker nodes as well
as the driver and this fixes both.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message