spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean R. Owen (Jira)" <>
Subject [jira] [Resolved] (SPARK-24970) Spark Kinesis streaming application fails to recover from streaming checkpoint due to ProvisionedThroughputExceededException
Date Tue, 31 Dec 2019 02:09:00 GMT


Sean R. Owen resolved SPARK-24970.
    Resolution: Not A Problem

> Spark Kinesis streaming application fails to recover from streaming checkpoint due to
> ----------------------------------------------------------------------------------------------------------------------------
>                 Key: SPARK-24970
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.2.0
>            Reporter: bruce_zhao
>            Priority: Major
>              Labels: kinesis
> We're using Spark streaming to consume Kinesis data, and found that it reads more data
from Kinesis and is easy to touch ProvisionedThroughputExceededException *when it recovers
from streaming checkpoint*. 
> Normally, it's a WARN in spark log. But when we have multiple streaming applications
(i.e., 5 applications) to consume the same Kinesis stream, the situation becomes serious. *The
application will fail to recover due to the following exception in driver.*  And one application
failure will also affect the other running applications. 
> {panel:title=Exception}
> org.apache.spark.SparkException: Job aborted due to stage failure: {color:#ff0000}*Task
5 in stage 7.0 failed 4 times, most recent failure*:{color} Lost task 5.3 in stage 7.0 (TID
128, ip-172-31-14-36.ap-northeast-1.compute.internal, executor 1): org.apache.spark.SparkException:
Gave up after 3 retries while getting records using shard iterator, last exception: at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:223)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:207)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954) at org.apache.spark.api.python.PythonRunner$
> Caused by: *{color:#ff0000}
Rate exceeded for shard shardId-000000000000 in stream rellfsstream-an under account 1111111111111{color}.*
(Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException;
Request ID: d3520677-060e-14c4-8014-2886b6b75f03) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(
at com.amazonaws.http.AmazonHttpClient.execute( at
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
> {panel}
> After check the source code, we found it calls getBlockFromKinesis() to recover data
and in this function it accesses Kinesis directly to read data. As all partitions in the
BlockRDD will access Kinesis, and AWS Kinesis only supports 5 concurrency reads per shard
per second, it will touch ProvisionedThroughputExceededException easily. Even the code does
some retries, it's still easy to fail when conflicts is heavy. 
> {code:java}
> // KinesisBackedBlockRDD.scala
> def getBlockFromKinesis(): Iterator[T] = {
>   val credentials = kinesisCreds.provider.getCredentials
>   partition.seqNumberRanges.ranges.iterator.flatMap { range =>
>     new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
>       range, kinesisReadConfigs).map(messageHandler)
>   }
> }
> if (partition.isBlockIdValid) {
>   getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
> } else {
>   getBlockFromKinesis()
> }
> {code}
> Why do we need to re-read data from Kinesis directly? Is there any way to avoid it under
the current design?
> Mostly, when we use Spark streaming, we will enable WAL. Then we can recover data from
WAL, instead of re-reading from Kinesis directly.
> I'd like to make a change for this. It may not be a perfect fix, but at least we can
provide a choice to avoid this problem under current design. 

This message was sent by Atlassian Jira

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message