spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trung kien <kient...@gmail.com>
Subject Re: Spark Streaming data checkpoint performance
Date Sat, 07 Nov 2015 08:26:26 GMT
Hmm,

Seems it just do a trick.
Using this method, it's very hard to recovery from failure, since we don't
know which batch have been done.

I really want to maintain the whole running stats in memory to archive full
failure-tolerant.

I just wonder if the performance of data checkpoint is that bad? or I
misses something in my setup?

30 seconds for data checkpoint of 1M keys is too much for me.


On Sat, Nov 7, 2015 at 1:25 PM, Aniket Bhatnagar <aniket.bhatnagar@gmail.com
> wrote:

> It depends on the stats you are collecting. For example, if you just
> collecting counts, you can do away with updateStateByKey completely by
> doing insert or update operation on the data store after reduce. I.e.
>
> For each (key, batchCount)
>   if (key exists in dataStore)
>     update count = count + batchCount for the key
>  else
>     insert (key, batchCount)
>
> Thanks,
> Aniket
>
> On Sat, Nov 7, 2015 at 11:38 AM Thúy Hằng Lê <thuyhang612@gmail.com>
> wrote:
>
>> Thanks Aniket,
>>
>> I want to store the state to an external storage but it should be in
>> later step I think.
>> Basically, I have to use updateStateByKey function to maintain the
>> running state (which requires checkpoint), and my bottleneck is now in data
>> checkpoint.
>>
>> My pseudo code is like below:
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(
>> sparkConf,Durations.seconds(2));
>>     jssc.checkpoint("spark-data/checkpoint");
>>     JavaPairInputDStream<String, String> messages =
>> KafkaUtils.createDirectStream(...);
>>     JavaPairDStream<String, List<Double>> stats =
>> messages.mapToPair(parseJson)
>>                             .reduceByKey(REDUCE_STATS)
>>                             .updateStateByKey(RUNNING_STATS);
>>
>>    JavaPairDStream<String, List<Double>> newData =
>> stages.filter(NEW_STATS);
>>
>>    newData.foreachRDD{
>>          rdd.forEachPartition{
>>                //Store to external storage.
>>          }
>>   }
>>
>>   Without using updateStageByKey, I'm only have the stats of the last
>> micro-batch.
>>
>> Any advise on this?
>>
>>
>> 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatnagar@gmail.com>:
>>
>>> Can you try storing the state (word count) in an external key value
>>> store?
>>>
>>> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang612@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Anyone could help me on this. It's a bit urgent for me on this.
>>>> I'm very confused and curious about Spark data checkpoint performance?
>>>> Is there any detail implementation of checkpoint I can look into?
>>>> Spark Streaming only take sub-second to process 20K messages/sec,
>>>> however it take 25 seconds for checkpoint. Now my application have average
>>>> 30 seconds latency and keep increasingly.
>>>>
>>>>
>>>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang612@gmail.com>:
>>>>
>>>>> Thankd all, it would be great to have this feature soon.
>>>>> Do you know what's the release plan for 1.6?
>>>>>
>>>>> In addition to this, I still have checkpoint performance problem
>>>>>
>>>>> My code is just simple like this:
>>>>>     JavaStreamingContext jssc = new
>>>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>>>>     jssc.checkpoint("spark-data/checkpoint");
>>>>>     JavaPairInputDStream<String, String> messages =
>>>>> KafkaUtils.createDirectStream(...);
>>>>>     JavaPairDStream<String, List<Double>> stats =
>>>>> messages.mapToPair(parseJson)
>>>>>                             .reduceByKey(REDUCE_STATS)
>>>>>                             .updateStateByKey(RUNNING_STATS);
>>>>>
>>>>>     stats.print()
>>>>>
>>>>>   Now I need to maintain about 800k keys, the stats here is only count
>>>>> number of occurence for key.
>>>>>   While running the cache dir is very small (about 50M), my question
>>>>> is:
>>>>>
>>>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>>>> 10 seconds when data checkpoint is running
>>>>>   It took me 5 seconds to finish the same size micro-batch, why it's
>>>>> too high? what's kind of job in checkpoint?
>>>>>   why it's keep increasing?
>>>>>
>>>>>   2/ When I changes the data checkpoint interval like using:
>>>>>       stats.checkpoint(Durations.seconds(100)); //change to 100,
>>>>> defaults is 10
>>>>>
>>>>>   The checkpoint is keep increasing significantly first checkpoint is
>>>>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>>>>   Why it's too high when increasing checkpoint interval?
>>>>>
>>>>> It seems that default interval works more stable.
>>>>>
>>>>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atanase@adobe.com> wrote:
>>>>>
>>>>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>>>>
>>>>>> Left some comments on the JIRA and design doc.
>>>>>>
>>>>>> -adrian
>>>>>>
>>>>>> From: Shixiong Zhu
>>>>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>>>>> To: Thúy Hằng Lê
>>>>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>>>>> Subject: Re: Spark Streaming data checkpoint performance
>>>>>>
>>>>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>>>>> performance issue of "updateStateByKey". You can take a look at
>>>>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>>>>> https://github.com/apache/spark/pull/9256
>>>>>>
>>>>>
>>>>
>>


-- 
Thanks
Kien

Mime
View raw message