spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: NoSuchMethodError: writing spark-streaming data to cassandra
Date Tue, 09 Dec 2014 11:09:40 GMT
You're using two conflicting versions of the connector: the Scala version
at 1.1.0 and the Java version at 1.0.4.

I don't use Java, but I guess you only need the java dependency for your
job - and with the version fixed.

<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>*1.1.0*</version>
</dependency>
<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>*1.0.4*</version>
</dependency>

On Tue, Dec 9, 2014 at 11:16 AM, <m.sarosh@accenture.com> wrote:

>
> Hi,
>
> I am intending to save the streaming data from kafka into Cassandra, using
> spark-streaming:
> But there seems to be problem with line
> javaFunctions(data).writerBuilder("testkeyspace", "test_table",
> mapToRow(TestTable.class)).saveToCassandra();
> I am getting NoSuchMethodError.
> The code, the error-log and POM.xml dependencies are listed below:
> Please help me find the reason as to why is this happening.
>
>
> public class SparkStream {
>         static int key=0;
>         public static void main(String args[]) throws Exception
>         {
>                   if(args.length != 3)
>                   {
>                         System.out.println("SparkStream <zookeeper_ip>
> <group_nm> <topic1,topic2,...>");
>                         System.exit(1);
>                   }
>
>                   Logger.getLogger("org").setLevel(Level.OFF);
>                   Logger.getLogger("akka").setLevel(Level.OFF);
>                   Map<String,Integer> topicMap = new
> HashMap<String,Integer>();
>                   String[] topic = args[2].split(",");
>                   for(String t: topic)
>                   {
>                         topicMap.put(t, new Integer(3));
>                   }
>
>                   /* Connection to Spark */
>                   SparkConf conf = new SparkConf();
>                   JavaSparkContext sc = new JavaSparkContext("local[4]",
> "SparkStream",conf);
>                   JavaStreamingContext jssc = new
> JavaStreamingContext(sc, new Duration(3000));
>
>
>                   /* Receive Kafka streaming inputs */
>                   JavaPairReceiverInputDStream<String, String> messages =
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
>
>
>                   /* Create DStream */
>                   JavaDStream<TestTable> data = messages.map(new
> Function<Tuple2<String,String>, TestTable >()
>                   {
>                         public TestTable call(Tuple2<String, String>
> message)
>                         {
>                                 return new TestTable(new Integer(++key),
> message._2() );
>                         }
>                   }
>                   );
>
>                   /* Write to cassandra */
>
> javaFunctions(data,TestTable.class).saveToCassandra("testkeyspace","test_table");
>         //      data.print(); //creates console output stream.
>
>
>                   jssc.start();
>                   jssc.awaitTermination();
>
>         }
> }
>
> class TestTable implements Serializable
> {
>         Integer key;
>         String value;
>
>         public TestTable() {}
>
>         public TestTable(Integer k, String v)
>         {
>                   key=k;
>                   value=v;
>         }
>
>         public Integer getKey(){
>                   return key;
>         }
>
>         public void setKey(Integer k){
>                   key=k;
>         }
>
>         public String getValue(){
>                   return value;
>         }
>
>         public void setValue(String v){
>                   value=v;
>         }
>
>         public String toString(){
>                   return
> MessageFormat.format("TestTable'{'key={0},value={1}'}'", key, value);
>         }
> }
>
> The output log is:
> Exception in thread "main" java.lang.NoSuchMethodError:
> com.datastax.spark.connector.streaming.DStreamFunctions.<init>(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;)V
>         at
> com.datastax.spark.connector.DStreamJavaFunctions.<init>(DStreamJavaFunctions.java:17)
>         at
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(CassandraJavaUtil.java:89)
>         at com.spark.SparkStream.main(SparkStream.java:83)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> And the POM dependencies are:
>
>         <dependency>
>                   <groupId>org.apache.spark</groupId>
>                   <artifactId>spark-streaming-kafka_2.10</artifactId>
>                   <version>1.1.0</version>
>         </dependency>
>
>         <dependency>
>                   <groupId>org.apache.spark</groupId>
>                   <artifactId>spark-streaming_2.10</artifactId>
>                   <version>1.1.0</version>
>         </dependency>
>
> <dependency>
>         <groupId>com.datastax.spark</groupId>
>         <artifactId>spark-cassandra-connector_2.10</artifactId>
>         <version>1.1.0</version>
> </dependency>
> <dependency>
>         <groupId>com.datastax.spark</groupId>
>         <artifactId>spark-cassandra-connector-java_2.10</artifactId>
>         <version>1.0.4</version>
> </dependency>
> <dependency>
>         <groupId>org.apache.spark</groupId>
>         <artifactId>spark-core_2.10</artifactId>
>         <version>1.1.1</version>
> </dependency>
>
>
>         <dependency>
>                   <groupId>com.msiops.footing</groupId>
>                   <artifactId>footing-tuple</artifactId>
>                   <version>0.2</version>
>         </dependency>
>
>         <dependency>
>                   <groupId>com.datastax.cassandra</groupId>
>                   <artifactId>cassandra-driver-core</artifactId>
>                   <version>1.0.8</version>
>         </dependency>
>
>
> Thanks,
> Aiman
>
>
>
> ------------------------------
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>

Mime
View raw message