flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
Date Mon, 27 Jun 2016 14:11:52 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15351076#comment-15351076
] 

ASF GitHub Bot commented on FLINK-3231:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2131#discussion_r68582645
  
    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kinesis.internals;
    +
    +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
    +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
    +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
    +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
    +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
    +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
    +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * This runnable is in charge of discovering new shards that a fetcher should subscribe
to.
    + * It is submitted to {@link KinesisDataFetcher#shardDiscovererAndSubscriberExecutor}
and continuously runs until the
    + * fetcher is closed. Whenever it discovers a new shard that should be subscribed to,
the shard is added to the
    + * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. where in the
new shard we should start
    + * consuming from.
    + */
    +public class ShardDiscoverer<T> implements Runnable {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(ShardDiscoverer.class);
    +
    +	/** This fetcher reference is used to add discovered shards to the pending shards queue
*/
    +	private final KinesisDataFetcher fetcherRef;
    +
    +	/** Kinesis proxy to retrieve shard lists from Kinesis */
    +	private final KinesisProxyInterface kinesis;
    +
    +	/**
    +	 * The last seen shard of each stream. Since new Kinesis shards are always created in
ascending ids (regardless of
    +	 * whether the new shard was a result of a shard split or merge), this state can be
used when calling
    +	 * {@link KinesisProxyInterface#getShardList(Map)} to ignore shards we have already
discovered before.
    +	 */
    +	private final Map<String,String> streamToLastSeenShard;
    +
    +	private final int totalNumberOfConsumerSubtasks;
    +	private final int indexOfThisConsumerSubtask;
    +
    +	/**
    +	 * Create a new shard discoverer.
    +	 *
    +	 * @param fetcherRef reference to the owning fetcher
    +	 */
    +	public ShardDiscoverer(KinesisDataFetcher<T> fetcherRef) {
    +		this(fetcherRef, KinesisProxy.create(fetcherRef.getConsumerConfiguration()), new HashMap<String,
String>());
    +	}
    +
    +	/** This constructor is exposed for testing purposes */
    +	protected ShardDiscoverer(KinesisDataFetcher<T> fetcherRef,
    +							KinesisProxyInterface kinesis,
    +							Map<String,String> streamToLastSeenShard) {
    +		this.fetcherRef = checkNotNull(fetcherRef);
    +		this.kinesis = checkNotNull(kinesis);
    +		this.streamToLastSeenShard = checkNotNull(streamToLastSeenShard);
    +
    +		this.totalNumberOfConsumerSubtasks = fetcherRef.getSubtaskRuntimeContext().getNumberOfParallelSubtasks();
    +		this.indexOfThisConsumerSubtask = fetcherRef.getSubtaskRuntimeContext().getIndexOfThisSubtask();
    +
    +		// we initially map the last seen shard of each subscribed stream to null;
    +		// the correct values will be set later on in the constructor
    +		for (String stream : fetcherRef.getSubscribedStreams()) {
    +			this.streamToLastSeenShard.put(stream, null);
    +		}
    +
    +		// if we are restoring from a checkpoint, the restored state should already be in the
pending shards queue;
    +		// we iterate over the pending shards queue, and accordingly set the stream-to-last-seen-shard
map
    +		if (fetcherRef.isRestoredFromCheckpoint()) {
    +			if (fetcherRef.getCurrentCountOfPendingShards() == 0) {
    +				throw new RuntimeException("Told to restore from checkpoint, but no shards found
in discovered shards queue");
    +			}
    +
    +			for (KinesisStreamShardState shardState : fetcherRef.cloneCurrentPendingShards())
{
    +				String stream = shardState.getKinesisStreamShard().getStreamName();
    +				String shardId = shardState.getKinesisStreamShard().getShard().getShardId();
    +				if (!this.streamToLastSeenShard.containsKey(stream)) {
    +					throw new RuntimeException(
    +						"pendingShards queue contains a shard belonging to a stream that we are not subscribing
to");
    +				} else {
    +					String lastSeenShardIdOfStream = this.streamToLastSeenShard.get(stream);
    +					// the existing shards in the queue may not be in ascending id order,
    +					// so we must exhaustively find the largest shard id of each stream
    +					if (lastSeenShardIdOfStream == null) {
    +						// if not previously set, simply put as the last seen shard id
    +						this.streamToLastSeenShard.put(stream, shardId);
    +					} else if (KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream)
> 0) {
    +						// override if we have found a shard with a greater shard id for the stream
    +						this.streamToLastSeenShard.put(stream, shardId);
    +					}
    +				}
    +			}
    +		}
    +
    +		// we always query for any new shards that may have been created while the Kinesis
consumer was not running -
    +		// when we are starting fresh (not restoring from a checkpoint), this simply means
all existing shards of streams
    +		// we are subscribing to are new shards; when we are restoring from checkpoint, any
new shards due to Kinesis
    +		// resharding from the time of the checkpoint will be considered new shards.
    +
    +		SentinelSequenceNumber sentinelSequenceNumber;
    +		if (!fetcherRef.isRestoredFromCheckpoint()) {
    +			// if starting fresh, each new shard should start from the user-configured position
    +			sentinelSequenceNumber =
    +				KinesisConfigUtil.getInitialPositionAsSentinelSequenceNumber(fetcherRef.getConsumerConfiguration());
    +		} else {
    +			// if restoring from checkpoint, the new shards due to Kinesis resharding should be
read from earliest record
    +			sentinelSequenceNumber = SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
    +		}
    +
    +		try {
    +			// query for new shards that we haven't seen yet up to this point
    +			discoverNewShardsAndSetInitialStateTo(sentinelSequenceNumber.toString());
    +		} catch (InterruptedException iex) {
    +			fetcherRef.stopWithError(iex);
    +			return;
    +		}
    +
    +		boolean hasShards = false;
    +		StringBuilder streamsWithNoShardsFound = new StringBuilder();
    +		for (Map.Entry<String, String> streamToLastSeenShardEntry : streamToLastSeenShard.entrySet())
{
    +			if (streamToLastSeenShardEntry.getValue() != null) {
    +				hasShards = true;
    +			} else {
    +				streamsWithNoShardsFound.append(streamToLastSeenShardEntry.getKey()).append(", ");
    +			}
    +		}
    +
    +		if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) {
    +			LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams:
{}",
    +				indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString());
    +		}
    +
    +		if (!hasShards) {
    +			fetcherRef.stopWithError(new RuntimeException(
    +				"No shards can be found for all subscribed streams: " + fetcherRef.getSubscribedStreams()));
    +		}
    +	}
    +
    +	@Override
    +	public void run() {
    +		try {
    +			while (isRunning()) {
    +				discoverNewShardsAndSetInitialStateTo(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.toString());
    +			}
    +		} catch (Throwable throwable) {
    +			fetcherRef.stopWithError(throwable);
    +		}
    +	}
    +
    +	private boolean isRunning() {
    +		return !Thread.interrupted();
    +	}
    +
    +	/**
    +	 * A utility function that does the following:
    +	 *
    +	 * 1. Find new shards for each stream that we haven't seen before
    +	 * 2. For each new shard, determine whether this consumer subtask should subscribe to
them;
    +	 * 	  if yes, add the new shard to the discovered shards queue with a specified initial
state (starting sequence num)
    +	 * 3. Update the stream-to-last-seen-shard map so that we won't get shards that we have
already seen before
    +	 *    the next time this function is called
    +	 *
    +	 * @param initialState the initial state to assign to each new shard that this subtask
should subscribe to
    +	 */
    +	private void discoverNewShardsAndSetInitialStateTo(String initialState) throws InterruptedException
{
    +		GetShardListResult shardListResult = kinesis.getShardList(new HashMap<>(streamToLastSeenShard));
    --- End diff --
    
    Mistake, absolutely unnecessary. Thanks for noticing!


> Handle Kinesis-side resharding in Kinesis streaming consumer
> ------------------------------------------------------------
>
>                 Key: FLINK-3231
>                 URL: https://issues.apache.org/jira/browse/FLINK-3231
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kinesis Connector, Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis users can
choose to "merge" and "split" shards at any time for adjustable stream throughput capacity.
This article explains this quite clearly: https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic version of
the Kinesis consumer (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task
mapping is done in a simple round-robin-like distribution which can be locally determined
at each Flink consumer task (Flink Kafka consumer does this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer tasks coordinate
which shards they are currently handling, and allow the tasks to ask the coordinator for a
shards reassignment when the task finds out it has found a closed shard at runtime (shards
will be closed by Kinesis when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink consumer
tasks. Tasks can use this state store to locally determine what shards it can be reassigned.
Amazon KCL uses a DynamoDB table for the coordination, but as described in https://issues.apache.org/jira/browse/FLINK-3211,
we unfortunately can't use KCL for the implementation of the consumer if we want to leverage
Flink's checkpointing mechanics. For our own implementation, Zookeeper can be used for this
state store, but that means it would require the user to set up ZK to work.
> Since this feature introduces extensive work, it is opened as a separate sub-task from
the basic implementation https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message