spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <ko...@tresata.com>
Subject Re: Strongly Connected Components
Date Sat, 12 Nov 2016 20:01:28 GMT
oh ok i see now its not the same

On Sat, Nov 12, 2016 at 2:48 PM, Koert Kuipers <koert@tresata.com> wrote:

> not sure i see the faster algo in the paper you mention.
>
> i see this in section 6.1.2:
> "In what follows we give a simple labeling algorithm that computes
> connectivity  on  sparse  graphs  in O(log N) rounds."
> N here is the size of the graph, not the largest component diameter.
>
> that is the exact same algo as is implemented in graphx i think. or is it
> not?
>
> On Fri, Nov 11, 2016 at 7:58 PM, Daniel Darabos <
> daniel.darabos@lynxanalytics.com> wrote:
>
>> Hi Shreya,
>> GraphFrames just calls the GraphX strongly connected components code. (
>> https://github.com/graphframes/graphframes/blob/release-0.
>> 2.0/src/main/scala/org/graphframes/lib/StronglyConnec
>> tedComponents.scala#L51)
>>
>> For choosing the number of iterations: If the number of iterations is
>> less than the diameter of the graph, you may get an incorrect result. But
>> running for more iterations than that buys you nothing. The algorithm is
>> basically to broadcast your ID to all your neighbors in the first round,
>> and then broadcast the smallest ID that you have seen so far in the next
>> rounds. So with only 1 round you will get a wrong result unless each vertex
>> is connected to the vertex with the lowest ID in that component. (Unlikely
>> in a real graph.)
>>
>> See https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/
>> scala/org/apache/spark/graphx/lib/ConnectedComponents.scala for the
>> actual implementation.
>>
>> A better algorithm exists for this problem that only requires O(log(N))
>> iterations when N is the largest component diameter. (It is described in "A
>> Model of Computation for MapReduce", http://www.sidsuri.com/Publica
>> tions_files/mrc.pdf.) This outperforms GraphX's implementation
>> immensely. (See the last slide of http://www.slideshare.net/Spar
>> kSummit/interactive-graph-analytics-daniel-darabos#33.) The large
>> advantage is due to the lower number of necessary iterations.
>>
>> For why this is failing even with one iteration: I would first check your
>> partitioning. Too many or too few partitions could equally cause the issue.
>> If you are lucky, there is no overlap between the "too many" and "too few"
>> domains :).
>>
>> On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal <shreyagr@microsoft.com>
>> wrote:
>>
>>> Tried GraphFrames. Still faced the same – job died after a few hours .
>>> The errors I see (And I see tons of them) are –
>>>
>>> (I ran with 3 times the partitions as well, which was 12 times number of
>>> executors , but still the same.)
>>>
>>>
>>>
>>> -------------------------------------
>>>
>>> ERROR NativeAzureFileSystem: Encountered Storage Exception for write on
>>> Blob : hdp/spark2-events/application_1478717432179_0021.inprogress
>>> Exception details: null Error Code : RequestBodyTooLarge
>>>
>>>
>>>
>>> -------------------------------------
>>>
>>>
>>>
>>> 16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests
>>> outstanding when connection from /10.0.0.95:43301 is closed
>>>
>>> 16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2
>>> outstanding blocks after 5000 ms
>>>
>>> 16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500
>>> non-empty blocks out of 1500 blocks
>>>
>>> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting
>>> block fetches
>>>
>>> java.io.IOException: Connection from /10.0.0.95:43301 closed
>>>
>>>
>>>
>>> -------------------------------------
>>>
>>>
>>>
>>> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting
>>> block fetches
>>>
>>> java.lang.RuntimeException: java.io.FileNotFoundException:
>>> /mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/appcac
>>> he/application_1478717432179_0021/blockmgr-b1dde30d-359e-493
>>> 2-b7a4-a5e138a52360/37/shuffle_1346_21_0.index (No such file or
>>> directory)
>>>
>>>
>>>
>>> -------------------------------------
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Exception thrown in awaitResult
>>>
>>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTi
>>> meout.scala:77)
>>>
>>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTi
>>> meout.scala:75)
>>>
>>>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>>> unction.scala:36)
>>>
>>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout
>>> $1.applyOrElse(RpcTimeout.scala:59)
>>>
>>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout
>>> $1.applyOrElse(RpcTimeout.scala:59)
>>>
>>>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>
>>>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala
>>> :83)
>>>
>>>         at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint
>>> Ref.scala:102)
>>>
>>>         at org.apache.spark.executor.Executor.org$apache$spark$executor
>>> $Executor$$reportHeartBeat(Executor.scala:518)
>>>
>>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap
>>> ply$mcV$sp(Executor.scala:547)
>>>
>>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap
>>> ply(Executor.scala:547)
>>>
>>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap
>>> ply(Executor.scala:547)
>>>
>>>         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.sca
>>> la:1857)
>>>
>>>         at org.apache.spark.executor.Executor$$anon$1.run(Executor.scal
>>> a:547)
>>>
>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>> s.java:511)
>>>
>>>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:
>>> 308)
>>>
>>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
>>> tureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>
>>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
>>> tureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>>
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.util.ConcurrentModificationException
>>>
>>>         at java.util.ArrayList.writeObject(ArrayList.java:766)
>>>
>>>         at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
>>>
>>>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>>
>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>
>>>         at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClas
>>> s.java:1028)
>>>
>>>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>>> m.java:1496)
>>>
>>>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>>> tream.java:1432)
>>>
>>>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>> ava:1178)
>>>
>>>
>>>
>>> -------------------------------------
>>>
>>>
>>>
>>> 16/11/11 13:21:54 WARN Executor: Issue communicating with driver in
>>> heartbeater
>>>
>>> org.apache.spark.SparkException: Error sending message [message =
>>> Heartbeat(537,[Lscala.Tuple2;@2999dae4,BlockManagerId(537, 10.0.0.103,
>>> 36162))]
>>>
>>>         at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint
>>> Ref.scala:119)
>>>
>>>         at org.apache.spark.executor.Executor.org$apache$spark$executor
>>> $Executor$$reportHeartBeat(Executor.scala:518)
>>>
>>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap
>>> ply$mcV$sp(Executor.scala:547)
>>>
>>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap
>>> ply(Executor.scala:547)
>>>
>>>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap
>>> ply(Executor.scala:547)
>>>
>>>         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.sca
>>> la:1857)
>>>
>>>         at org.apache.spark.executor.Executor$$anon$1.run(Executor.scal
>>> a:547)
>>>
>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>> s.java:511)
>>>
>>>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:
>>> 308)
>>>
>>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
>>> tureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>
>>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
>>> tureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>>
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out
>>> after [10 seconds]. This timeout is controlled by
>>> spark.executor.heartbeatInterval
>>>
>>>         at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTime
>>> out$$createRpcTimeoutException(RpcTimeout.scala:48)
>>>
>>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout
>>> $1.applyOrElse(RpcTimeout.scala:63)
>>>
>>>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout
>>> $1.applyOrElse(RpcTimeout.scala:59)
>>>
>>>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>
>>>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala
>>> :83)
>>>
>>>         at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint
>>> Ref.scala:102)
>>>
>>>         ... 13 more
>>>
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>> after [10 seconds]
>>>
>>>         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.s
>>> cala:219)
>>>
>>>         at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.
>>> scala:223)
>>>
>>>         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala
>>> :190)
>>>
>>>
>>>
>>> *From:* Shreya Agarwal
>>> *Sent:* Thursday, November 10, 2016 8:16 PM
>>> *To:* 'Felix Cheung' <felixcheung_m@hotmail.com>; user@spark.apache.org
>>> *Subject:* RE: Strongly Connected Components
>>>
>>>
>>>
>>> Yesterday’s run died sometime during the night, without any errors.
>>> Today, I am running it using GraphFrames instead. It is still spawning new
>>> tasks, so there is progress.
>>>
>>>
>>>
>>> *From:* Felix Cheung [mailto:felixcheung_m@hotmail.com
>>> <felixcheung_m@hotmail.com>]
>>> *Sent:* Thursday, November 10, 2016 7:50 PM
>>> *To:* user@spark.apache.org; Shreya Agarwal <shreyagr@microsoft.com>
>>> *Subject:* Re: Strongly Connected Components
>>>
>>>
>>>
>>> It is possible it is dead. Could you check the Spark UI to see if there
>>> is any progress?
>>>
>>>
>>>
>>> _____________________________
>>> From: Shreya Agarwal <shreyagr@microsoft.com>
>>> Sent: Thursday, November 10, 2016 12:45 AM
>>> Subject: RE: Strongly Connected Components
>>> To: <user@spark.apache.org>
>>>
>>>
>>> Bump. Anyone? Its been running for 10 hours now. No results.
>>>
>>>
>>>
>>> *From:* Shreya Agarwal
>>> *Sent:* Tuesday, November 8, 2016 9:05 PM
>>> *To:* user@spark.apache.org
>>> *Subject:* Strongly Connected Components
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I am running this on a graph with >5B edges and >3B edges and have 2
>>> questions –
>>>
>>>
>>>
>>>    1. What is the optimal number of iterations?
>>>    2. I am running it for 1 iteration right now on a beefy 100 node
>>>    cluster, with 300 executors each having 30GB RAM and 5 cores. I have
>>>    persisted the graph to MEMORY_AND_DISK. And it has been running for 3 hours
>>>    already. Any ideas on how to speed this up?
>>>
>>>
>>>
>>> Regards,
>>>
>>> Shreya
>>>
>>>
>>>
>>
>>
>

Mime
View raw message