spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <sro...@gmail.com>
Subject Re: Extending GraphFrames without running into serialization issues
Date Tue, 05 Jan 2021 21:25:41 GMT
It's because this calls the no-arg superclass constructor that sets
_vertices and _edges in the actual GraphFrame class to null. That yields
the error.
Normally you'd just show you want to call the two-arg superclass
constructor with "extends GraphFrame(_vertices, _edges)" but that
constructor is private.
Instead try overriding the accessors in your class body:

  override def vertices = _vertices
  override def edges = _edges

On Tue, Jan 5, 2021 at 3:16 PM Michal Monselise <michal.monselise@gmail.com>
wrote:

> Hi,
>
> I am trying to extend GraphFrames and create my own class that has some
> additional graph functionality.
>
> To simplify for this example, I have created a class that doesn't contain
> any functions. All it does is just extend GraphFrames:
>
> import org.apache.spark.sql.DataFrameimport org.graphframes._
> class NewGraphFrame(@transient private val _vertices: DataFrame,
>                     @transient private val _edges: DataFrame) extends GraphFrame {
>
> }
> val vertices = Seq(
>   (1, "John"),
>   (2, "Jane"),
>   (3, "Karen")
> ).toDF("id", "name")
> val edges = Seq(
>   (1, 3),
>   (2, 3),
>   (2, 1)
> ).toDF("src", "dst")
> val g = new NewGraphFrame(vertices, edges)
>
> When I run this code in the REPL, I get the following error:
>
> java.lang.Exception: You cannot use GraphFrame objects within a Spark closure
>   at org.graphframes.GraphFrame.vertices(GraphFrame.scala:125)
>   at org.graphframes.GraphFrame.toString(GraphFrame.scala:55)
>   at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332)
>   at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337)
>   at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345)
>   at .$print$lzycompute(<console>:10)
>   at .$print(<console>:6)
>   at $print(<console>)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
>   at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
>   at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
>   at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
>   at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
>   at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
>   at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
>   at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
>   at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
>   at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
>   at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
>   at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
>   at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
>   at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
>   at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
>   at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
>   at org.apache.spark.repl.Main$.doMain(Main.scala:76)
>   at org.apache.spark.repl.Main$.main(Main.scala:56)
>   at org.apache.spark.repl.Main.main(Main.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>   at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
>   at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> I know that this means that I'm serializing twice. However, I am obviously
> not interested in doing that. I simply want to extend this class so that I
> can use the graph functionality in my class. How do I extend this class
> without the spark repl throwing this error?
>

Mime
View raw message