spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas Powell (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
Date Fri, 23 Sep 2016 16:59:20 GMT

    [ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15516975#comment-15516975
] 

Thomas Powell commented on SPARK-17634:
---------------------------------------

Several hours. I modified {{worker.R}} so that it logs the time it takes to read each 10000
rows. I saw terrible slow down.
{code}
Starting to read the data
10000: 2.663s
20000: 4.223s
30000: 6.005s
40000: 7.799s
50000: 9.998s
...
540000: 116.293s
550000: 122.248s
560000: 122.798s
570000: 126.57s
580000: 135.371s
{code}

This isn't total time to read rows. It is the time to read each 10000 rows. I modified the
{{readMultipleObjects}} function to be:
{code}
readMultipleObjectsWithDebug <- function(inputCon) {
    # readMultipleObjects will read multiple continuous objects from
    # a DataOutputStream. There is no preceding field telling the count
    # of the objects, so the number of objects varies, we try to read
    # all objects in a loop until the end of the stream.
    cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n")
    data <- list()
    secs <- elapsedSecs()
    while (TRUE) {
        # If reaching the end of the stream, type returned should be "".
        type <- SparkR:::readType(inputCon)
        if (type == "") {
            break
        }
        data[[length(data) + 1L]] <- SparkR:::readTypedObject(inputCon, type)
        if (length(data) %% 10000 == 0) {
            duration <- elapsedSecs() - secs
            cat(paste(length(data), ": ", duration, "s", sep=""), file=debugFile, append=TRUE,
sep="\n")
            secs <- elapsedSecs()
        }
    }
    data # this is a list of named lists now
}
{code}

> Spark job hangs when using dapply
> ---------------------------------
>
>                 Key: SPARK-17634
>                 URL: https://issues.apache.org/jira/browse/SPARK-17634
>             Project: Spark
>          Issue Type: Bug
>          Components: SparkR
>    Affects Versions: 2.0.0
>            Reporter: Thomas Powell
>            Priority: Critical
>
> I'm running into an issue when using dapply on yarn. I have a data frame backed by files
in parquet with around 200 files that is around 2GB. When I load this in with the new partition
coalescing it ends up having around 20 partitions so each one roughly 100MB. The data frame
itself has 4 columns of integers and doubles. If I run a count over this things work fine.
> However, if I add a {{dapply}} in between the read and the {{count}} that just uses an
identity function the tasks hang and make no progress. Both the R and Java processes are running
on the Spark nodes and are listening on the {{SPARKR_WORKER_PORT}}.
> {{result <- dapply(df, function(x){x}, SparkR::schema(df))}}
> I took a jstack of the Java process and see that it is just listening on the socket but
never seems to make any progress. The R process is harder to debug what it is doing.
> {code}
> Thread 112823: (state = IN_NATIVE)
>  - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], int, int, int)
@bci=0 (Interpreted frame)
>  - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, int, int)
@bci=8, line=116 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 (Interpreted
frame)
>  - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 (Interpreted frame)
>  - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
>  - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
>  - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
>  - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() @bci=4, line=212
(Interpreted frame)
>  - org.apache.spark.api.r.RRunner$$anon$1.<init>(org.apache.spark.api.r.RRunner)
@bci=25, line=96 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) @bci=109, line=87
(Interpreted frame)
>  - org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator)
@bci=322, line=59 (Interpreted frame)
>  - org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) @bci=5,
line=29 (Interpreted frame)
>  - org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator)
@bci=59, line=178 (Interpreted frame)
>  - org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object)
@bci=5, line=175 (Interpreted frame)
>  - org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext,
int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object,
java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext)
@bci=27, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext)
@bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext)
@bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext)
@bci=24, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext)
@bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext)
@bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext)
@bci=24, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext)
@bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext)
@bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=168,
line=79 (Interpreted frame)
>  - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2,
line=47 (Interpreted frame)
>  - org.apache.spark.scheduler.Task.run(long, int, org.apache.spark.metrics.MetricsSystem)
@bci=82, line=85 (Interpreted frame)
>  - org.apache.spark.executor.Executor$TaskRunner.run() @bci=374, line=274 (Interpreted
frame)
>  - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
@bci=95, line=1142 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=617 (Interpreted
frame)
>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
> {code}
> Any recommendations on how best to debug? Nothing appears in the logs since the processes
don't actually fail. The executors themselves have 4GB of memory which should be more than
enough.
> My feeling is this could be something around serialization?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message