spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Deming Zhu (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly
Date Thu, 29 Oct 2015 14:18:28 GMT

    [ https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14980492#comment-14980492
] 

Deming Zhu commented on SPARK-5569:
-----------------------------------

For OffsetRange ClassNotFound issue, I'm sure this patch could solve the issue because actually
the document on
Spark Home Page could not run without this patch

Here's the sample code on Spark Home Page:
```
// Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array[OffsetRange]()
	
 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }
```
we can see that offsetRanges is an array and need to be deserialized by executors.

For KafkaRDDPartition ClassNotFound issue, since I cannot see the source code,
I cannot 100% sure that this patch can solve the issue.

But, In my opinion, this kind of issue are all caused by Spark trying to load an array object.


Can anyone of you try this patch can tell us whether your issues are solved or not?

> Checkpoints cannot reference classes defined outside of Spark's assembly
> ------------------------------------------------------------------------
>
>                 Key: SPARK-5569
>                 URL: https://issues.apache.org/jira/browse/SPARK-5569
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Patrick Wendell
>
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to create a JIRA
to make sure we document this behavior.
> First documented by Cody Koeninger:
> https://gist.github.com/koeninger/561a61482cd1b5b3600c
> {code}
> 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from file file:/var/tmp/cp/checkpoint-1421100410000.bk
> 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file file:/var/tmp/cp/checkpoint-1421100410000.bk
> java.io.IOException: java.lang.ClassNotFoundException: org.apache.spark.rdd.kafka.KafkaRDDPartition
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
>         at org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
>         at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
>         at org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
>         at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>         at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:239)
>         at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:552)
>         at example.CheckpointedExample$.main(CheckpointedExample.scala:34)
>         at example.CheckpointedExample.main(CheckpointedExample.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:365)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException: org.apache.spark.rdd.kafka.KafkaRDDPartition
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:274)
>         at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>         at org.apache.spark.streaming.ObjectInputStreamWithLoader.resolveClass(Checkpoint.scala:279)
>         at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>         at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
>         at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
>         at scala.collection.mutable.HashTable$class.init(HashTable.scala:105)
>         at scala.collection.mutable.HashMap.init(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.readObject(HashMap.scala:142)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$readObject$1.apply$mcV$sp(DStreamCheckpointData.scala:148)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
>         ... 52 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message