spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel de Oliveira Mantovani <daniel.oliveira.mantov...@gmail.com>
Subject Re: Getting Message From Structured Streaming Format Kafka
Date Fri, 01 Dec 2017 21:52:12 GMT
Hello Burak,

Sorry to the delayed answer, you were right.

1) -  I change the sql-kafka connector version and fixed.
2) - The propose was just test, and I was using normal streaming also for
other thing.

I'm was wondering how did you know was the sql-kafka connector version
reading the logs. I Couldn't find anything useful there.

Thank you very much!

On Thu, Nov 2, 2017 at 12:04 PM, Burak Yavuz <brkyvz@gmail.com> wrote:

> Hi Daniel,
>
> Several things:
>  1) Your error seems to suggest you're using a different version of Spark
> and a different version of the sql-kafka connector. Could you make sure
> they are on the same Spark version?
>  2) With Structured Streaming, you may remove everything related to a
> StreamingContext.
>
> val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>
> val ssc = new StreamingContext(sparkConf, batchDuration)
> ssc.checkpoint(checkpointDir)
> ssc.remember(Minutes(1))
>
> These lines are not doing anything for Structured Streaming.
>
>
> Best,
> Burak
>
> On Thu, Nov 2, 2017 at 11:36 AM, Daniel de Oliveira Mantovani <
> daniel.oliveira.mantovani@gmail.com> wrote:
>
>> Hello, I'm trying to run the following code,
>>
>> var newContextCreated = false // Flag to detect whether new context was created or
not
>> val kafkaBrokers = "localhost:9092" // comma separated list of broker:host
>>
>> private val batchDuration: Duration = Seconds(3)
>> private val master: String = "local[2]"
>> private val appName: String = this.getClass().getSimpleName()
>> private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"
>>
>> // Create a Spark configuration
>>
>> val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>>
>> val ssc = new StreamingContext(sparkConf, batchDuration)
>> ssc.checkpoint(checkpointDir)
>> ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query
it interactively
>>
>> val spark = SparkSession
>>   .builder
>>   .config(sparkConf)
>>   .getOrCreate()
>>
>> val lines = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "localhost:9092")
>>   .option("subscribe", "evil_queue")
>>   .load()
>>
>> lines.printSchema()
>>
>> import spark.implicits._
>> val noAggDF = lines.select("key")
>>
>> noAggDF
>>   .writeStream
>>   .format("console")
>>   .start()
>>
>>
>> But I'm having the error:
>>
>> http://paste.scsys.co.uk/565658
>>
>>
>> How do I get my messages using kafka as format from Structured Streaming ?
>>
>>
>> Thank you
>>
>>
>> --
>>
>> --
>> Daniel de Oliveira Mantovani
>> Perl Evangelist/Data Hacker
>> +1 786 459 1341 <(786)%20459-1341>
>>
>
>


-- 

--
Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
+1 786 459 1341

Mime
View raw message