spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Burak Yavuz <brk...@gmail.com>
Subject Re: Getting Message From Structured Streaming Format Kafka
Date Thu, 02 Nov 2017 16:04:08 GMT
Hi Daniel,

Several things:
 1) Your error seems to suggest you're using a different version of Spark
and a different version of the sql-kafka connector. Could you make sure
they are on the same Spark version?
 2) With Structured Streaming, you may remove everything related to a
StreamingContext.

val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDir)
ssc.remember(Minutes(1))

These lines are not doing anything for Structured Streaming.


Best,
Burak

On Thu, Nov 2, 2017 at 11:36 AM, Daniel de Oliveira Mantovani <
daniel.oliveira.mantovani@gmail.com> wrote:

> Hello, I'm trying to run the following code,
>
> var newContextCreated = false // Flag to detect whether new context was created or not
> val kafkaBrokers = "localhost:9092" // comma separated list of broker:host
>
> private val batchDuration: Duration = Seconds(3)
> private val master: String = "local[2]"
> private val appName: String = this.getClass().getSimpleName()
> private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"
>
> // Create a Spark configuration
>
> val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>
> val ssc = new StreamingContext(sparkConf, batchDuration)
> ssc.checkpoint(checkpointDir)
> ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query it
interactively
>
> val spark = SparkSession
>   .builder
>   .config(sparkConf)
>   .getOrCreate()
>
> val lines = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "evil_queue")
>   .load()
>
> lines.printSchema()
>
> import spark.implicits._
> val noAggDF = lines.select("key")
>
> noAggDF
>   .writeStream
>   .format("console")
>   .start()
>
>
> But I'm having the error:
>
> http://paste.scsys.co.uk/565658
>
>
> How do I get my messages using kafka as format from Structured Streaming ?
>
>
> Thank you
>
>
> --
>
> --
> Daniel de Oliveira Mantovani
> Perl Evangelist/Data Hacker
> +1 786 459 1341 <(786)%20459-1341>
>

Mime
View raw message