spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Helena Edelson <helena.edel...@datastax.com>
Subject Re: Execption writing on two cassandra tables NoHostAvailableException: All host(s) tried for query failed (no host was tried)
Date Mon, 01 Jun 2015 11:26:11 GMT
Hi Antonio,
 
First, what version of the Spark Cassandra Connector are you using? You are using Spark 1.3.1,
which the Cassandra connector today supports in builds from the master branch only - the release
with public artifacts supporting Spark 1.3.1 is coming soon ;)
Please see https://github.com/datastax/spark-cassandra-connector#version-compatibility <https://github.com/datastax/spark-cassandra-connector#version-compatibility>

Try the version change and LMK.

What does your cassandra log say?

Note that you can read from a Spark stream like Flume, for instance in your flumeStreamNavig.map(..)
code (in scala at least, with a lot less code - I have not used java)
(here it’s kafka) https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39
<https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39>
And write inline to Cassandra 
https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45
<https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45>

https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64
<https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64>


Helena
tw: @helenaedelson


> On May 29, 2015, at 6:11 AM, Antonio Giambanco <antogiamba@gmail.com> wrote:
> 
> Hi all,
> I have in a single server installed spark 1.3.1 and cassandra 2.0.14
> I'm coding a simple java class for Spark Streaming as follow:
> reading header events from flume sink
> based on header I write the event body on navigation or transaction table (cassandra)
> unfortunatly I get NoHostAvailableException, if I comment the code for saving one of
the two tables everything works
> 
> 
> here the code
> 
>      public static void main(String[] args) {
>          
>         // Create a local StreamingContext with two working thread and batch interval
of 1 second
>          SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("DWXNavigationApp");
>          
>          conf.set("spark.cassandra.connection.host", "127.0.0.1");
>          conf.set("spark.cassandra.connection.native.port","9042");
>          conf.set("spark.cassandra.output.batch.size.rows", "1");
>          conf.set("spark.cassandra.output.concurrent.writes", "1");
>          
>          
>          final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
>          
>          JavaReceiverInputDStream<SparkFlumeEvent> flumeStreamNavig = FlumeUtils.createPollingStream(jssc,
"127.0.0.1", 8888);     
>          
>         
>          JavaDStream<String> logRowsNavig = flumeStreamNavig.map(
>                  new Function<SparkFlumeEvent,String>(){
> 
>                     @Override
>                     public String call(SparkFlumeEvent arg0) throws Exception {
>                         // TODO Auto-generated method stub0.
>                         
>                         Map<CharSequence,CharSequence> headers = arg0.event().getHeaders();
>                                                 
>                         ByteBuffer bytePayload = arg0.event().getBody(); 
>                         String s = headers.get("source_log").toString() + "#" + new String(bytePayload.array());
>                         System.out.println("RIGA: " + s);
>                         return s;
>                     }
>                  });
>          
>         
>          logRowsNavig.foreachRDD(
>                  new Function<JavaRDD<String>,Void>(){
>                     @Override
>                     public Void call(JavaRDD<String> rows) throws Exception {
>                                                 
>                         if(!rows.isEmpty()){
>                              
>                              //String header = getHeaderFronRow(rows.collect());
>                              
>                              List<Navigation> listNavigation = new ArrayList<Navigation>();
>                              List<Transaction> listTransaction = new ArrayList<Transaction>();
>                              
>                              for(String row : rows.collect()){
>                                  
>                                  String header = row.substring(0, row.indexOf("#"));
>                                  
>                                  if(header.contains("controller_log")){
>                                      listNavigation.add(createNavigation(row));
>                                      System.out.println("Added Element in Navigation
List");
>                                  
>                                  }else if(header.contains("business_log")){
>                                      listTransaction.add(createTransaction(row));
>                                      System.out.println("Added Element in Transaction
List");
>                                  }     
>                                      
>                              }
>                              
>                     
>                              if(!listNavigation.isEmpty()){
>                                  JavaRDD<Navigation> navigationRows= jssc.sparkContext().parallelize(listNavigation);
>                             
>                                  javaFunctions(navigationRows).writerBuilder("cassandrasink",
"navigation", mapToRow(Navigation.class)).saveToCassandra();
>                              }
>                 
>                                 
>                              if(!listTransaction.isEmpty()){
>                                  JavaRDD<Transaction> transactionRows= jssc.sparkContext().parallelize(listTransaction);
>                                     
>                                  javaFunctions(transactionRows).writerBuilder("cassandrasink",
"transaction", mapToRow(Transaction.class)).saveToCassandra();
>                                  
>                              }
>                             
>                         }
>                         return null;
>                         
>                     }
>              });
>          
>         jssc.start();              // Start the computation
>         jssc.awaitTermination();   // Wait for the computation to terminate 
>      }
> 
> 
> here the exception
> 
> 
> 15/05/29 11:19:29 ERROR QueryExecutor: Failed to execute:
> 
> com.datastax.spark.connector.writer.RichBatchStatement@ab76b83 <mailto:com.datastax.spark.connector.writer.RichBatchStatement@ab76b83>
> com.datastax.driver.core.exceptions.NoHostAvailableException: All
> 
> host(s) tried for query failed (no host was tried)
> 
>          at
> 
> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
> 
>          at
> 
> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
> 
>          at
> 
> com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:577)
> 
>          at
> 
> com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:119)
> 
>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>          at
> 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
>          at
> 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>          at java.lang.reflect.Method.invoke(Method.java:601)
> 
>          at
> 
> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
> 
>          at $Proxy17.executeAsync(Unknown Source)
> 
>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>          at
> 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
>          at
> 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>          at java.lang.reflect.Method.invoke(Method.java:601)
> 
>          at
> 
> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
> 
>          at $Proxy17.executeAsync(Unknown Source)
> 
>          at
> 
> com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11)
> 
>          at
> 
> com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11)
> 
>          at
> 
> com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:137)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:136)
> 
>          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 
>          at
> 
> com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:136)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 
>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
> 
>          at
> 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> 
>          at java.lang.Thread.run(Thread.java:722)
> 
> 15/05/29 11:19:29 ERROR Executor: Exception in task 1.0 in stage 15.0 (TID 20)
> 
> java.io.IOException: Failed to write statements to cassandrasink.navigation.
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 
>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
> 
>          at
> 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> 
>          at java.lang.Thread.run(Thread.java:722)
> 
> 15/05/29 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 15.0 (TID 20, localhost):
java.io.IOException: Failed to write statements to cassandrasink.navigation.
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 
>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
> 
>          at
> 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> 
>          at java.lang.Thread.run(Thread.java:722)
> 
> 
> 
> 
> A G
> 


Mime
View raw message