spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From purna pradeep <purna2prad...@gmail.com>
Subject Re: Restart streaming query spark 2.1 structured streaming
Date Tue, 15 Aug 2017 21:29:02 GMT
Thanks tathagata das actually I'm planning to something like this

activeQuery.stop()

//unpersist and persist cached data frame

df.unpersist()

//read the updated data //data size of df is around 100gb

df.persist()

     activeQuery = startQuery()


the cached data frame size around 100gb ,so the question is this the right
place to refresh this huge cached data frame ?

I'm also trying to refresh cached data frame in onqueryprogress() method in
a class which extends StreamingQuerylistner

Would like to know which is the best place to refresh cached data frame and
why

Thanks again for the below response

On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <tathagata.das1565@gmail.com>
wrote:

> You can do something like this.
>
>
> def startQuery(): StreamingQuery = {
>    // create your streaming dataframes
>    // start the query with the same checkpoint directory}
>
> // handle to the active queryvar activeQuery: StreamingQuery = null
> while(!stopped) {
>
>    if (activeQuery = null) {     // if query not active, start query
>      activeQuery = startQuery()
>
>    } else if (shouldRestartQuery())  {      // check your condition and restart query
>      activeQuery.stop()
>      activeQuery = startQuery()
>    }
>
>    activeQuery.awaitTermination(100)   // wait for 100 ms.
>    // if there is any error it will throw exception and quit the loop
>    // otherwise it will keep checking the condition every 100ms}
>
>
>
>
> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2pradeep@gmail.com>
> wrote:
>
>> Thanks Michael
>>
>> I guess my question is little confusing ..let me try again
>>
>>
>> I would like to restart streaming query programmatically while my
>> streaming application is running based on a condition and why I want to do
>> this
>>
>> I want to refresh a cached data frame based on a condition and the best
>> way to do this restart streaming query suggested by Tdas below for similar
>> problem
>>
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+fwmn4jTm1X1nDAPA@mail.gmail.com%3e
>>
>> I do understand that checkpoint if helps in recovery and failures but I
>> would like to know "how to restart streaming query programmatically without
>> stopping my streaming application"
>>
>> In place of query.awaittermination should I need to have an logic to
>> restart query? Please suggest
>>
>>
>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <michael@databricks.com>
>> wrote:
>>
>>> See
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>>>
>>> Though I think that this currently doesn't work with the console sink.
>>>
>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2pradeep@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>>
>>>>> I'm trying to restart a streaming query to refresh cached data frame
>>>>>
>>>>> Where and how should I restart streaming query
>>>>>
>>>>
>>>>
>>>> val sparkSes = SparkSession
>>>>
>>>>       .builder
>>>>
>>>>       .config("spark.master", "local")
>>>>
>>>>       .appName("StreamingCahcePoc")
>>>>
>>>>       .getOrCreate()
>>>>
>>>>
>>>>
>>>>     import sparkSes.implicits._
>>>>
>>>>
>>>>
>>>>     val dataDF = sparkSes.readStream
>>>>
>>>>       .schema(streamSchema)
>>>>
>>>>       .csv("testData")
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>        val query = counts.writeStream
>>>>
>>>>       .outputMode("complete")
>>>>
>>>>       .format("console")
>>>>
>>>>       .start()
>>>>
>>>>
>>>> query.awaittermination()
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>>
>>>
>

Mime
View raw message