spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guillermo Ortiz <konstt2...@gmail.com>
Subject Testing spark-testing-base. Error multiple SparkContext
Date Tue, 03 Apr 2018 22:03:13 GMT
I'm doing a spark test with spark streaming, cassandra and kafka.
I have an action which has an DStream as input and save to Cassandra and
sometimes put some elements in Kafka.
I'm using https://github.com/holdenk/spark-testing-base and kafka y
cassandra in local.


My method looks like:





*def execute(dstream: DStream[MyObject]) : Unit = {    //Some proccesing
--> this works    //Save to Cassandra some RDDs --> this works    //Send to
Kafka some record. --> this doesn't work in test, it works outside of the
test.}*


When I send data to Kafka:


*//There is an error in this method*













*def sendToKafka(rec: DStream[CrmTipoCliente]) = {  rec.foreachRDD( r =>
{    r.foreachPartition {      val kafka =
SparkKafkaSink[String,String](Config.kafkapropsProd)  --> Exception here.
Config.kafkapr.... returns a properties with the values to connect to
Kafka      partition =>        partition.foreach {          message =>
{            //Some logic..            kafka.send("test", null,
"message")          }        }    } })*

My test looks like:

























*@RunWith(classOf[JUnitRunner])class CassandraConnectionIntegrationTest
extends FunSuite with BeforeAndAfter with BeforeAndAfterAll with
StreamingActionBase{    var cluster: Cluster = _    implicit var session:
Session = _    val keyspace: String = "iris"    val table: String =
keyspace + ".tipo_cliente_ref"    var service: MyClass = _override def
beforeAll(): Unit = {    super.beforeAll()    //This line doesn't work!
 sc.getConf.set("spark.driver.allowMultipleContexts", "true")
 ...test("Insert record ") {    val inputInsert = MyObject("...")    val
input = List(List(inputInsert))    runAction[MyObject](input,
service.execute)    val result = session.execute("select * from myTable
WHERE...")    //Some assert to Cassandra and Kafka}*
This test partial works, it saves data into Cassandra but it doesn't work
when it has to send data to Kafka.

The error I can see:
23:58:45.329 [pool-22-thread-1] INFO  o.a.spark.streaming.CheckpointWriter
- Saving checkpoint for time 1000 ms to file
'file:/C:/Users/A148681/AppData/Local/Temp/spark-cdf3229b-9d84
-400f-b92a-5ff4086b81c3/checkpoint-1000'
Exception in thread "streaming-job-executor-0"
java.lang.ExceptionInInitializerError
        at
com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass.scala:49)
        at
com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass
.scala:47)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
*Caused by: org.apache.spark.SparkException: Only one SparkContext may be
running in this JVM (see SPARK-2243). To ignore this error, set
spark.driver.allowMultipleContexts = true. Th*
e currently running SparkContext was created at:
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258)
com.holdenkarau.spark.testing.SharedSparkContext$class.beforeAll(SharedSparkContext.scala:45)

Mime
View raw message