spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel de Oliveira Mantovani <>
Subject Getting Message From Structured Streaming Format Kafka
Date Thu, 02 Nov 2017 15:36:34 GMT
Hello, I'm trying to run the following code,

var newContextCreated = false // Flag to detect whether new context
was created or not
val kafkaBrokers = "localhost:9092" // comma separated list of broker:host

private val batchDuration: Duration = Seconds(3)
private val master: String = "local[2]"
private val appName: String = this.getClass().getSimpleName()
private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"

// Create a Spark configuration

val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.remember(Minutes(1)) // To make sure data is not deleted by the
time we query it interactively

val spark = SparkSession

val lines = spark
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "evil_queue")


import spark.implicits._
val noAggDF ="key")


But I'm having the error:

How do I get my messages using kafka as format from Structured Streaming ?

Thank you


Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
+1 786 459 1341

View raw message