spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <ko...@tresata.com>
Subject Re: Strongly Connected Components
Date Sat, 12 Nov 2016 19:48:12 GMT
not sure i see the faster algo in the paper you mention.

i see this in section 6.1.2:
"In what follows we give a simple labeling algorithm that computes
connectivity  on  sparse  graphs  in O(log N) rounds."
N here is the size of the graph, not the largest component diameter.

that is the exact same algo as is implemented in graphx i think. or is it
not?

On Fri, Nov 11, 2016 at 7:58 PM, Daniel Darabos <
daniel.darabos@lynxanalytics.com> wrote:

> Hi Shreya,
> GraphFrames just calls the GraphX strongly connected components code. (
> https://github.com/graphframes/graphframes/blob/
> release-0.2.0/src/main/scala/org/graphframes/lib/
> StronglyConnectedComponents.scala#L51)
>
> For choosing the number of iterations: If the number of iterations is less
> than the diameter of the graph, you may get an incorrect result. But
> running for more iterations than that buys you nothing. The algorithm is
> basically to broadcast your ID to all your neighbors in the first round,
> and then broadcast the smallest ID that you have seen so far in the next
> rounds. So with only 1 round you will get a wrong result unless each vertex
> is connected to the vertex with the lowest ID in that component. (Unlikely
> in a real graph.)
>
> See https://github.com/apache/spark/blob/v2.0.2/graphx/src/
> main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala for the
> actual implementation.
>
> A better algorithm exists for this problem that only requires O(log(N))
> iterations when N is the largest component diameter. (It is described in "A
> Model of Computation for MapReduce", http://www.sidsuri.com/
> Publications_files/mrc.pdf.) This outperforms GraphX's implementation
> immensely. (See the last slide of http://www.slideshare.net/
> SparkSummit/interactive-graph-analytics-daniel-darabos#33.) The large
> advantage is due to the lower number of necessary iterations.
>
> For why this is failing even with one iteration: I would first check your
> partitioning. Too many or too few partitions could equally cause the issue.
> If you are lucky, there is no overlap between the "too many" and "too few"
> domains :).
>
> On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal <shreyagr@microsoft.com>
> wrote:
>
>> Tried GraphFrames. Still faced the same – job died after a few hours .
>> The errors I see (And I see tons of them) are –
>>
>> (I ran with 3 times the partitions as well, which was 12 times number of
>> executors , but still the same.)
>>
>>
>>
>> -------------------------------------
>>
>> ERROR NativeAzureFileSystem: Encountered Storage Exception for write on
>> Blob : hdp/spark2-events/application_1478717432179_0021.inprogress
>> Exception details: null Error Code : RequestBodyTooLarge
>>
>>
>>
>> -------------------------------------
>>
>>
>>
>> 16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests
>> outstanding when connection from /10.0.0.95:43301 is closed
>>
>> 16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2
>> outstanding blocks after 5000 ms
>>
>> 16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500
>> non-empty blocks out of 1500 blocks
>>
>> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting
>> block fetches
>>
>> java.io.IOException: Connection from /10.0.0.95:43301 closed
>>
>>
>>
>> -------------------------------------
>>
>>
>>
>> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting
>> block fetches
>>
>> java.lang.RuntimeException: java.io.FileNotFoundException:
>> /mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/appcac
>> he/application_1478717432179_0021/blockmgr-b1dde30d-359e-
>> 4932-b7a4-a5e138a52360/37/shuffle_1346_21_0.index (No such file or
>> directory)
>>
>>
>>
>> -------------------------------------
>>
>>
>>
>> org.apache.spark.SparkException: Exception thrown in awaitResult
>>
>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(
>> RpcTimeout.scala:77)
>>
>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(
>> RpcTimeout.scala:75)
>>
>>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout
>> $1.applyOrElse(RpcTimeout.scala:59)
>>
>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout
>> $1.applyOrElse(RpcTimeout.scala:59)
>>
>>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>
>>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.
>> scala:83)
>>
>>         at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint
>> Ref.scala:102)
>>
>>         at org.apache.spark.executor.Executor.org$apache$spark$executor
>> $Executor$$reportHeartBeat(Executor.scala:518)
>>
>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.
>> apply$mcV$sp(Executor.scala:547)
>>
>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.
>> apply(Executor.scala:547)
>>
>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.
>> apply(Executor.scala:547)
>>
>>         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.sca
>> la:1857)
>>
>>         at org.apache.spark.executor.Executor$$anon$1.run(Executor.scal
>> a:547)
>>
>>         at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>>
>>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:
>> 308)
>>
>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
>> tureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>
>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
>> tureTask.run(ScheduledThreadPoolExecutor.java:294)
>>
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.util.ConcurrentModificationException
>>
>>         at java.util.ArrayList.writeObject(ArrayList.java:766)
>>
>>         at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
>>
>>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>>
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>
>>         at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClas
>> s.java:1028)
>>
>>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>> m.java:1496)
>>
>>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>> tream.java:1432)
>>
>>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>>
>>
>>
>> -------------------------------------
>>
>>
>>
>> 16/11/11 13:21:54 WARN Executor: Issue communicating with driver in
>> heartbeater
>>
>> org.apache.spark.SparkException: Error sending message [message =
>> Heartbeat(537,[Lscala.Tuple2;@2999dae4,BlockManagerId(537, 10.0.0.103,
>> 36162))]
>>
>>         at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint
>> Ref.scala:119)
>>
>>         at org.apache.spark.executor.Executor.org$apache$spark$executor
>> $Executor$$reportHeartBeat(Executor.scala:518)
>>
>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.
>> apply$mcV$sp(Executor.scala:547)
>>
>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.
>> apply(Executor.scala:547)
>>
>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.
>> apply(Executor.scala:547)
>>
>>         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.sca
>> la:1857)
>>
>>         at org.apache.spark.executor.Executor$$anon$1.run(Executor.scal
>> a:547)
>>
>>         at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>>
>>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:
>> 308)
>>
>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
>> tureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>
>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
>> tureTask.run(ScheduledThreadPoolExecutor.java:294)
>>
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out
>> after [10 seconds]. This timeout is controlled by
>> spark.executor.heartbeatInterval
>>
>>         at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$
>> RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>>
>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout
>> $1.applyOrElse(RpcTimeout.scala:63)
>>
>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout
>> $1.applyOrElse(RpcTimeout.scala:59)
>>
>>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>
>>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.
>> scala:83)
>>
>>         at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint
>> Ref.scala:102)
>>
>>         ... 13 more
>>
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>> after [10 seconds]
>>
>>         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.s
>> cala:219)
>>
>>         at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.
>> scala:223)
>>
>>         at scala.concurrent.Await$$anonfun$result$1.apply(package.
>> scala:190)
>>
>>
>>
>> *From:* Shreya Agarwal
>> *Sent:* Thursday, November 10, 2016 8:16 PM
>> *To:* 'Felix Cheung' <felixcheung_m@hotmail.com>; user@spark.apache.org
>> *Subject:* RE: Strongly Connected Components
>>
>>
>>
>> Yesterday’s run died sometime during the night, without any errors.
>> Today, I am running it using GraphFrames instead. It is still spawning new
>> tasks, so there is progress.
>>
>>
>>
>> *From:* Felix Cheung [mailto:felixcheung_m@hotmail.com
>> <felixcheung_m@hotmail.com>]
>> *Sent:* Thursday, November 10, 2016 7:50 PM
>> *To:* user@spark.apache.org; Shreya Agarwal <shreyagr@microsoft.com>
>> *Subject:* Re: Strongly Connected Components
>>
>>
>>
>> It is possible it is dead. Could you check the Spark UI to see if there
>> is any progress?
>>
>>
>>
>> _____________________________
>> From: Shreya Agarwal <shreyagr@microsoft.com>
>> Sent: Thursday, November 10, 2016 12:45 AM
>> Subject: RE: Strongly Connected Components
>> To: <user@spark.apache.org>
>>
>>
>> Bump. Anyone? Its been running for 10 hours now. No results.
>>
>>
>>
>> *From:* Shreya Agarwal
>> *Sent:* Tuesday, November 8, 2016 9:05 PM
>> *To:* user@spark.apache.org
>> *Subject:* Strongly Connected Components
>>
>>
>>
>> Hi,
>>
>>
>>
>> I am running this on a graph with >5B edges and >3B edges and have 2
>> questions –
>>
>>
>>
>>    1. What is the optimal number of iterations?
>>    2. I am running it for 1 iteration right now on a beefy 100 node
>>    cluster, with 300 executors each having 30GB RAM and 5 cores. I have
>>    persisted the graph to MEMORY_AND_DISK. And it has been running for 3 hours
>>    already. Any ideas on how to speed this up?
>>
>>
>>
>> Regards,
>>
>> Shreya
>>
>>
>>
>
>

Mime
View raw message