spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: Restart streaming query spark 2.1 structured streaming
Date Tue, 15 Aug 2017 22:35:28 GMT
Both works. The asynchronous method with listener will have less of down
time, just that the first trigger/batch after the asynchronous
unpersist+persist will probably take longer as it has to reload the data.


On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2pradeep@gmail.com>
wrote:

> Thanks tathagata das actually I'm planning to something like this
>
> activeQuery.stop()
>
> //unpersist and persist cached data frame
>
> df.unpersist()
>
> //read the updated data //data size of df is around 100gb
>
> df.persist()
>
>      activeQuery = startQuery()
>
>
> the cached data frame size around 100gb ,so the question is this the right
> place to refresh this huge cached data frame ?
>
> I'm also trying to refresh cached data frame in onqueryprogress() method
> in a class which extends StreamingQuerylistner
>
> Would like to know which is the best place to refresh cached data frame
> and why
>
> Thanks again for the below response
>
> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <tathagata.das1565@gmail.com>
> wrote:
>
>> You can do something like this.
>>
>>
>> def startQuery(): StreamingQuery = {
>>    // create your streaming dataframes
>>    // start the query with the same checkpoint directory}
>>
>> // handle to the active queryvar activeQuery: StreamingQuery = null
>> while(!stopped) {
>>
>>    if (activeQuery = null) {     // if query not active, start query
>>      activeQuery = startQuery()
>>
>>    } else if (shouldRestartQuery())  {      // check your condition and restart query
>>      activeQuery.stop()
>>      activeQuery = startQuery()
>>    }
>>
>>    activeQuery.awaitTermination(100)   // wait for 100 ms.
>>    // if there is any error it will throw exception and quit the loop
>>    // otherwise it will keep checking the condition every 100ms}
>>
>>
>>
>>
>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2pradeep@gmail.com>
>> wrote:
>>
>>> Thanks Michael
>>>
>>> I guess my question is little confusing ..let me try again
>>>
>>>
>>> I would like to restart streaming query programmatically while my
>>> streaming application is running based on a condition and why I want to do
>>> this
>>>
>>> I want to refresh a cached data frame based on a condition and the best
>>> way to do this restart streaming query suggested by Tdas below for similar
>>> problem
>>>
>>> http://mail-archives.apache.org/mod_mbox/spark-user/
>>> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+
>>> fwmn4jTm1X1nDAPA@mail.gmail.com%3e
>>>
>>> I do understand that checkpoint if helps in recovery and failures but I
>>> would like to know "how to restart streaming query programmatically without
>>> stopping my streaming application"
>>>
>>> In place of query.awaittermination should I need to have an logic to
>>> restart query? Please suggest
>>>
>>>
>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <michael@databricks.com>
>>> wrote:
>>>
>>>> See https://spark.apache.org/docs/latest/structured-
>>>> streaming-programming-guide.html#recovering-from-failures-
>>>> with-checkpointing
>>>>
>>>> Though I think that this currently doesn't work with the console sink.
>>>>
>>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2pradeep@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>>
>>>>>> I'm trying to restart a streaming query to refresh cached data frame
>>>>>>
>>>>>> Where and how should I restart streaming query
>>>>>>
>>>>>
>>>>>
>>>>> val sparkSes = SparkSession
>>>>>
>>>>>       .builder
>>>>>
>>>>>       .config("spark.master", "local")
>>>>>
>>>>>       .appName("StreamingCahcePoc")
>>>>>
>>>>>       .getOrCreate()
>>>>>
>>>>>
>>>>>
>>>>>     import sparkSes.implicits._
>>>>>
>>>>>
>>>>>
>>>>>     val dataDF = sparkSes.readStream
>>>>>
>>>>>       .schema(streamSchema)
>>>>>
>>>>>       .csv("testData")
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>        val query = counts.writeStream
>>>>>
>>>>>       .outputMode("complete")
>>>>>
>>>>>       .format("console")
>>>>>
>>>>>       .start()
>>>>>
>>>>>
>>>>> query.awaittermination()
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>

Mime
View raw message