flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
Date Fri, 24 Jun 2016 14:50:16 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348383#comment-15348383
] 

ASF GitHub Bot commented on FLINK-3231:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2131#discussion_r68409331
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
---
    @@ -159,42 +217,65 @@ public String getShardIterator(KinesisStreamShard shard, String
shardIteratorTyp
     		return kinesisClient.getShardIterator(shard.getStreamName(), shard.getShardId(), shardIteratorType,
startingSeqNum).getShardIterator();
     	}
     
    +	private List<KinesisStreamShard> getShardsOfStream(String streamName, String lastSeenShardId)
throws InterruptedException {
    +		List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
    +
    +		DescribeStreamResult describeStreamResult;
    +		do {
    +			describeStreamResult = describeStream(streamName, lastSeenShardId);
    +
    +			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
    +			for (Shard shard : shards) {
    +				shardsOfStream.add(new KinesisStreamShard(streamName, shard));
    +			}
    +
    +			if (shards.size() != 0) {
    +				lastSeenShardId = shards.get(shards.size() - 1).getShardId();
    +			}
    +		} while (describeStreamResult.getStreamDescription().isHasMoreShards());
    +
    +		return shardsOfStream;
    +	}
    +
     	/**
     	 * Get metainfo for a Kinesis stream, which contains information about which shards
this Kinesis stream possess.
     	 *
    +	 * This method is using a "full jitter" approach described in
    +	 * <a href="http://google.com">https://www.awsarchitectureblog.com/2015/03/backoff.html</a>.
This is necessary
    +	 * because concurrent calls will be made by all parallel subtask's {@link ShardDiscoverer}s.
This jitter backoff
    +	 * approach will help distribute calls across the discoverers over time.
    +	 *
     	 * @param streamName the stream to describe
     	 * @param startShardId which shard to start with for this describe operation (earlier
shard's infos will not appear in result)
     	 * @return the result of the describe stream operation
     	 */
    -	private DescribeStreamResult describeStream(String streamName, String startShardId)
{
    +	private DescribeStreamResult describeStream(String streamName, String startShardId)
throws InterruptedException {
     		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
     		describeStreamRequest.setStreamName(streamName);
     		describeStreamRequest.setExclusiveStartShardId(startShardId);
     
     		DescribeStreamResult describeStreamResult = null;
     		String streamStatus = null;
    -		int remainingRetryTimes = Integer.valueOf(
    -			configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
    -		long describeStreamBackoffTimeInMillis = Long.valueOf(
    -			configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
     
    -		// Call DescribeStream, with backoff and retries (if we get LimitExceededException).
    -		while ((remainingRetryTimes >= 0) && (describeStreamResult == null)) {
    +		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
    +		Random seed = null;
    +		int attemptCount = 0;
    +		while (describeStreamResult == null) { // retry until we get a result
     			try {
     				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
     				streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
     			} catch (LimitExceededException le) {
    -				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing
off for "
    -					+ describeStreamBackoffTimeInMillis + " millis.");
    -				try {
    -					Thread.sleep(describeStreamBackoffTimeInMillis);
    -				} catch (InterruptedException ie) {
    -					LOG.debug("Stream " + streamName + " : Sleep  was interrupted ", ie);
    +				if (seed == null) {
    +					seed = new Random();
     				}
    +				long backoffMillis = fullJitterBackoff(
    +					describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant,
attemptCount++, seed);
    +				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing
off for "
    +					+ backoffMillis + " millis.");
    +				Thread.sleep(backoffMillis);
     			} catch (ResourceNotFoundException re) {
     				throw new RuntimeException("Error while getting stream details", re);
     			}
    -			remainingRetryTimes--;
     		}
     
     		if (streamStatus == null) {
    --- End diff --
    
    Actually, since we'll be retrying until we get a describeStreamResult now, this message
and RuntimeException can be removed.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> ------------------------------------------------------------
>
>                 Key: FLINK-3231
>                 URL: https://issues.apache.org/jira/browse/FLINK-3231
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kinesis Connector, Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis users can
choose to "merge" and "split" shards at any time for adjustable stream throughput capacity.
This article explains this quite clearly: https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic version of
the Kinesis consumer (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task
mapping is done in a simple round-robin-like distribution which can be locally determined
at each Flink consumer task (Flink Kafka consumer does this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer tasks coordinate
which shards they are currently handling, and allow the tasks to ask the coordinator for a
shards reassignment when the task finds out it has found a closed shard at runtime (shards
will be closed by Kinesis when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink consumer
tasks. Tasks can use this state store to locally determine what shards it can be reassigned.
Amazon KCL uses a DynamoDB table for the coordination, but as described in https://issues.apache.org/jira/browse/FLINK-3211,
we unfortunately can't use KCL for the implementation of the consumer if we want to leverage
Flink's checkpointing mechanics. For our own implementation, Zookeeper can be used for this
state store, but that means it would require the user to set up ZK to work.
> Since this feature introduces extensive work, it is opened as a separate sub-task from
the basic implementation https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message