spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stephen (Jira)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint
Date Tue, 31 Dec 2019 02:02:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Stephen updated SPARK-30393:
----------------------------
    Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was produced to
Kinesis at at most 2000 records/second. At non peak time data only comes in at 200 records/second.
Each record is 0.5K Bytes. So 6 shards is enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding window is one hour
long.

Recently I am trying to checkpoint the application to S3. I am testing this at nonpeak time
so the data incoming rate is very low like 200 records/sec. I run the Spark application by
creating new context, checkpoint is created at s3, but when I kill the app and restarts, it
failed to recover from checkpoint, and the error message is the following and my SparkUI shows
all the batches are stucked, and it takes a long time for the checkpoint recovery to complete,
15 minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 4452, ip-172-17-32-11.ec2.internal,
executor 9): org.apache.spark.SparkException: Gave up after 3 retries while getting shard
iterator from sequence number 49601654074184110438492229476281538439036626028298502210, last
exception:
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq.         at scala.Option.getOrElse(Option.scala:121)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq.         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq.         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq.         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq.         at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq.         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq.         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq.         at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq.         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq.         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq.         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq.         at java.lang.Thread.run(Thread.java:748)
bq. Caused by: com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException:
Rate exceeded for shard shardId-000000000004 in stream my-stream-name under account my-account-number.
(Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException;
Request ID: e368b876-c315-d0f0-b513-e2af2bd14525)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq.         at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq.         at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782)
bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2749)
bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2738)
bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1383)
bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1355)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
bq.         ... 20 more{quote}


I see someone reported the similar problem https://issues.apache.org/jira/browse/SPARK-24970,
not sure whether there is any fix for that.

Since my batchinterval is 150 seconds, I have tried increase blockinterval to 1000ms (1 second)
so that I have less number of partitions. But the problem still exists.

I also tried enable WAL, spark.streaming.receiver.writeAheadLog.enable=true, but still the
problem exists. I also read that enable WAL is no longer necessary from beyong spark version
2.

I understand checkpoint recovery might be a lengthy process, but how do I eliminate the "
ProvisionedThroughputExceededException" error, I think that is perhaps causing the slow checkpoint
recovery.

Thanks, can someone please help? 


  was:
I have a spark application which consume from Kinesis with 6 shards. Data was produced to
Kinesis at at most 2000 records/second. At non peak time data only comes in at 200 records/second.
Each record is 0.5K Bytes. So 6 shards is enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding window is one hour
long.

Recently I am trying to checkpoint the application to S3. I am testing this at nonpeak time
so the data incoming rate is very low like 200 records/sec. I run the Spark application by
creating new context, checkpoint is created at s3, but when I kill the app and restarts, it
failed to recover from checkpoint, and the error message is the following and my SparkUI shows
all the batches are stucked, and it takes a long time for the checkpoint recovery to complete,
15 minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 4452, ip-172-17-32-11.ec2.internal,
executor 9): org.apache.spark.SparkException: Gave up after 3 retries while getting shard
iterator from sequence number 49601654074184110438492229476281538439036626028298502210, last
exception:
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq.         at scala.Option.getOrElse(Option.scala:121)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq.         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq.         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq.         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq.         at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq.         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq.         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq.         at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq.         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq.         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq.         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq.         at java.lang.Thread.run(Thread.java:748)
bq. Caused by: com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException:
Rate exceeded for shard shardId-000000000004 in stream my-stream-name under account my-account-number.
(Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException;
Request ID: e368b876-c315-d0f0-b513-e2af2bd14525)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq.         at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq.         at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782)
bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2749)
bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2738)
bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1383)
bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1355)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
bq.         ... 20 more{quote}


I see someone reported the similar problem https://issues.apache.org/jira/browse/SPARK-24970,
not sure whether there is any fix for that.

Since my batchinterval is 150 seconds, I have tried increase blockinterval to 1000ms (1 second)
so that I have less number of partitions. But the problem still exists.

I also tried enable WAL, spark.streaming.receiver.writeAheadLog.enable=true, but still the
problem exists. I also read that enable WAL is no longer necessary for spark version 2 beyond.

I understand checkpoint recovery might be a lengthy process, but how do I eliminate the "
ProvisionedThroughputExceededException" error, I think that is perhaps causing the slow checkpoint
recovery.

Thanks, can someone please help? 



> Too much ProvisionedThroughputExceededException while recover from checkpoint
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-30393
>                 URL: https://issues.apache.org/jira/browse/SPARK-30393
>             Project: Spark
>          Issue Type: Question
>          Components: DStreams
>    Affects Versions: 2.4.3
>         Environment: I am using EMR 5.23.0, Spark 2.4.3, spark-streaming-kinesis-asl
2.4.3 I have 6 r5.4xLarge in my cluster, plenty of memory. 6 kinesis shards, I even increased
to 12 shards but still see the kinesis error
>            Reporter: Stephen
>            Priority: Major
>         Attachments: kinesisusagewhilecheckpointrecoveryerror.png, sparkuiwhilecheckpointrecoveryerror.png
>
>
> I have a spark application which consume from Kinesis with 6 shards. Data was produced
to Kinesis at at most 2000 records/second. At non peak time data only comes in at 200 records/second.
Each record is 0.5K Bytes. So 6 shards is enough to handle that.
> I use reduceByKeyAndWindow and mapWithState in the program and the sliding window is
one hour long.
> Recently I am trying to checkpoint the application to S3. I am testing this at nonpeak
time so the data incoming rate is very low like 200 records/sec. I run the Spark application
by creating new context, checkpoint is created at s3, but when I kill the app and restarts,
it failed to recover from checkpoint, and the error message is the following and my SparkUI
shows all the batches are stucked, and it takes a long time for the checkpoint recovery to
complete, 15 minutes to over an hour.
> I found lots of error message in the log related to Kinesis exceeding read limit:
> {quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 4452,
ip-172-17-32-11.ec2.internal, executor 9): org.apache.spark.SparkException: Gave up after
3 retries while getting shard iterator from sequence number 49601654074184110438492229476281538439036626028298502210,
last exception:
> bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
> bq.         at scala.Option.getOrElse(Option.scala:121)
> bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
> bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
> bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
> bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
> bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
> bq.         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq.         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq.         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq.         at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
> bq.         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> bq.         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> bq.         at org.apache.spark.scheduler.Task.run(Task.scala:121)
> bq.         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> bq.         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> bq.         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> bq.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> bq.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> bq.         at java.lang.Thread.run(Thread.java:748)
> bq. Caused by: com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException:
Rate exceeded for shard shardId-000000000004 in stream my-stream-name under account my-account-number.
(Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException;
Request ID: e368b876-c315-d0f0-b513-e2af2bd14525)
> bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
> bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
> bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
> bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
> bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
> bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
> bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
> bq.         at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
> bq.         at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
> bq.         at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
> bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782)
> bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2749)
> bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2738)
> bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1383)
> bq.         at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1355)
> bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
> bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
> bq.         at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
> bq.         ... 20 more{quote}
> I see someone reported the similar problem https://issues.apache.org/jira/browse/SPARK-24970,
not sure whether there is any fix for that.
> Since my batchinterval is 150 seconds, I have tried increase blockinterval to 1000ms
(1 second) so that I have less number of partitions. But the problem still exists.
> I also tried enable WAL, spark.streaming.receiver.writeAheadLog.enable=true, but still
the problem exists. I also read that enable WAL is no longer necessary from beyong spark version
2.
> I understand checkpoint recovery might be a lengthy process, but how do I eliminate the
" ProvisionedThroughputExceededException" error, I think that is perhaps causing the slow
checkpoint recovery.
> Thanks, can someone please help? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message