spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject This Exception has been really hard to trace
Date Mon, 10 Oct 2016 03:13:23 GMT
I tried SpanBy but look like there is a strange error that happening no matter
which way I try. Like the one here described for Java solution.

http://qaoverflow.com/question/how-to-use-spanby-in-java/

java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

JavaPairRDD<ByteBuffer, Iterable<CassandraRow>> cassandraRowsRDD=javaFunctions
(sc).cassandraTable("test", "hello" )
.select("col1", "col2", "col3" )
.spanBy(newFunction<CassandraRow, ByteBuffer>() {
@Override
publicByteBuffer call(CassandraRow v1) {
returnv1.getBytes("rowkey");
}
}, ByteBuffer.class);

And then here I do this here is where the problem occurs
List<Tuple2<ByteBuffer, Iterable<CassandraRow>>> listOftuples =
cassandraRowsRDD.collect(); // ERROR OCCURS HERE
Tuple2<ByteBuffer, Iterable<CassandraRow>> tuple =
listOftuples.iterator().next();
ByteBuffer partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
System.out.println(cassandraRow.getLong("col1"));
}
so I tried this  and same error
Iterable<Tuple2<ByteBuffer, Iterable<CassandraRow>>> listOftuples =
cassandraRowsRDD.collect(); // ERROR OCCURS HERE
Tuple2<ByteBuffer, Iterable<CassandraRow>> tuple =
listOftuples.iterator().next();
ByteBuffer partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
System.out.println(cassandraRow.getLong("col1"));
}
Although I understand that ByteBuffers aren't serializable I didn't get any not
serializable exception but still I went head and changed everything to byte[] so
no more ByteBuffers in the code.
I have also tried cassandraRowsRDD.collect().forEach() and
cassandraRowsRDD.stream().forEachPartition() and the same exact error occurs.
I am running everything locally and in a stand alone mode so my spark cluster is
just running on localhost.
Scala code runner version 2.11.8  // when I run scala -version or even
./spark-shell

compile group: 'org.apache.spark' name: 'spark-core_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-sql_2.11' version: '2.0.0'
compile group: 'com.datastax.spark' name: 'spark-cassandra-connector_2.11'
version: '2.0.0-M3':

So I don't see anything wrong with these versions.
2) I am bundling everything into one jar and so far it did worked out well
except for this error.
I am using Java 8 and Gradle.

any ideas on how I can fix this?
Mime
View raw message