flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4574) Strengthen fetch interval implementation in Kinesis consumer
Date Thu, 02 Feb 2017 08:08:51 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15849622#comment-15849622
] 

ASF GitHub Bot commented on FLINK-4574:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2925#discussion_r99065929
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
---
    @@ -154,42 +166,115 @@ public void run() {
     						}
     					}
     
    -					// set the nextShardItr so we can continue iterating in the next while loop
    -					nextShardItr = getRecordsResult.getNextShardIterator();
    +					// set the startShardItr so we can continue iterating in the next while loop
    +					startShardItr = getRecordsResult.getNextShardIterator();
     				} else {
     					// the last record was non-aggregated, so we can simply start from the next record
    -					nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
lastSequenceNum.getSequenceNumber());
    +					startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
lastSequenceNum.getSequenceNumber());
     				}
     			}
     
    -			while(isRunning()) {
    -				if (nextShardItr == null) {
    -					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
    -
    -					// we can close this consumer thread once we've reached the end of the subscribed
shard
    -					break;
    -				} else {
    -					if (fetchIntervalMillis != 0) {
    -						Thread.sleep(fetchIntervalMillis);
    -					}
    +			ArrayBlockingQueue<UserRecord> queue = new ArrayBlockingQueue<>(maxNumberOfRecordsPerFetch);
    +			ShardConsumerFetcher shardConsumerFetcher;
     
    -					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
    +			if (fetchIntervalMillis > 0L) {
    +				shardConsumerFetcher = new ShardConsumerFetcher(this, startShardItr, queue, false);
    +				timer.scheduleAtFixedRate(shardConsumerFetcher, 0L, fetchIntervalMillis);
    +			} else {
    +				// if fetchIntervalMillis is 0, make the task run forever and schedule it once only.
    +				shardConsumerFetcher = new ShardConsumerFetcher(this, startShardItr, queue, true);
    +				timer.schedule(shardConsumerFetcher, 0L);
    +			}
     
    -					// each of the Kinesis records may be aggregated, so we must deaggregate them before
proceeding
    -					List<UserRecord> fetchedRecords = deaggregateRecords(
    -						getRecordsResult.getRecords(),
    -						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
    -						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
    +			while(isRunning()) {
    +				UserRecord record = queue.poll();
    +				if (record != null) {
    +					deserializeRecordForCollectionAndUpdateState(record);
    +				} else {
    +					if (shardConsumerFetcher.nextShardItr == null) {
    +						fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
     
    -					for (UserRecord record : fetchedRecords) {
    -						deserializeRecordForCollectionAndUpdateState(record);
    +						// we can close this consumer thread once we've reached the end of the subscribed
shard
    +						break;
     					}
    +				}
     
    -					nextShardItr = getRecordsResult.getNextShardIterator();
    +				Throwable throwable = this.error.get();
    +				if (throwable != null) {
    +					throw throwable;
     				}
     			}
     		} catch (Throwable t) {
     			fetcherRef.stopWithError(t);
    +		} finally {
    +			timer.cancel();
    +		}
    +	}
    +
    +	private class ShardConsumerFetcher extends TimerTask {
    --- End diff --
    
    Would be nice to have Javadoc explaining what this task does.


> Strengthen fetch interval implementation in Kinesis consumer
> ------------------------------------------------------------
>
>                 Key: FLINK-4574
>                 URL: https://issues.apache.org/jira/browse/FLINK-4574
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>
> As pointed out by [~rmetzger], right now the fetch interval implementation in the {{ShardConsumer}}
class of the Kinesis consumer can lead to much longer interval times than specified by the
user, ex. say the specified fetch interval is {{f}}, it takes {{x}} to complete a {{getRecords()}}
call, and {{y}} to complete processing the fetched records for emitting, than the actual interval
between each fetch is actually {{f+x+y}}.
> The main problem with this is that we can never guarantee how much time has past since
the last {{getRecords}} call, thus can not guarantee that returned shard iterators will not
have expired the next time we use them, even if we limit the user-given value for {{f}} to
not be longer than the iterator expire time.
> I propose to improve this by, per {{ShardConsumer}}, use a {{ScheduledExecutorService}}
/ {{Timer}} to do the fixed-interval fetching, and a separate blocking queue that collects
the fetched records for emitting.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message