spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Čizmazia <mici...@gmail.com>
Subject Re: Graceful shutdown drops processing in Spark Streaming
Date Thu, 08 Oct 2015 02:34:05 GMT
Thanks! Done.

https://issues.apache.org/jira/browse/SPARK-10995

On 7 October 2015 at 21:24, Tathagata Das <tdas@databricks.com> wrote:

> Aaah, interesting, you are doing 15 minute slide duration. Yeah,
> internally the streaming scheduler waits for the last "batch" interval
> which has data to be processed, but if there is a sliding interval (i.e. 15
> mins) that is higher than batch interval, then that might not be run. This
> is indeed a bug and should be fixed. Mind setting up a JIRA and assigning
> it to me.
>
> On Wed, Oct 7, 2015 at 8:33 AM, Michal Čizmazia <micizma@gmail.com> wrote:
>
>> After triggering the graceful shutdown on the following application, the
>> application stops before the windowed stream reaches its slide duration. As
>> a result, the data is not completely processed (i.e. saveToMyStorage is not
>> called) before shutdown.
>>
>> According to the documentation, graceful shutdown should ensure that the
>> data, which has been received, is completely processed before shutdown.
>>
>> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
>>
>> Spark version: 1.4.1
>>
>> Code snippet:
>>
>> Function0<JavaStreamingContext> factory = () -> {
>>     JavaStreamingContext context = new JavaStreamingContext(sparkConf,
>> Durations.minutes(1));
>>     context.checkpoint("/test");
>>     JavaDStream<String> records =
>> context.receiverStream(myReliableReceiver).flatMap(...);
>>     records.persist(StorageLevel.MEMORY_AND_DISK());
>>     records.foreachRDD(rdd -> { rdd.count(); return null; });
>>     records
>>         .window(Durations.minutes(15), Durations.minutes(15))
>>         .foreachRDD(rdd -> saveToMyStorage(rdd));
>>     return context;
>> };
>>
>> try (JavaStreamingContext context =
>> JavaStreamingContext.getOrCreate("/test", factory)) {
>>     context.start();
>>     waitForShutdownSignal();
>>     Boolean stopSparkContext = true;
>>     Boolean stopGracefully = true;
>>     context.stop(stopSparkContext, stopGracefully);
>> }
>>
>>
>

Mime
View raw message