flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
Date Wed, 31 Jan 2018 15:21:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347001#comment-16347001
] 

ASF GitHub Bot commented on FLINK-8516:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5393#discussion_r165083343
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
---
    @@ -584,17 +594,34 @@ private static ShardMetricsReporter registerShardMetrics(MetricGroup
metricGroup
     	//  Miscellaneous utility functions
     	// ------------------------------------------------------------------------
     
    +	/**
    +	 * Function to map a Kinesis shard to a Flink subtask index.
    +	 */
    +	public interface ShardToSubtaskIndexFn {
    +		/**
    +		 * Function to map a Kinesis shard to a Flink subtask index.
    +		 *
    +		 * @param shard the shard to determine
    +		 * @param totalNumberOfSubtasks total number of subtasks
    +		 * @return index or hash code
    +		 */
    +		// TODO: extra parameter can be eliminated by creating hash function after runtime
context is present
    +		int getSubTaskIndex(StreamShardHandle shard, int totalNumberOfSubtasks);
    --- End diff --
    
    I actually prefer passing the `totalNumberOfSubtasks` value independently, instead of
passing in `runtimeContext`. IMO, it provides more context on the nature of the assignment.
    
    Moreover, IMO, having a factory as the API is much more complicating for the user.


> FlinkKinesisConsumer does not balance shards over subtasks
> ----------------------------------------------------------
>
>                 Key: FLINK-8516
>                 URL: https://issues.apache.org/jira/browse/FLINK-8516
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>    Affects Versions: 1.4.0, 1.3.2, 1.5.0
>            Reporter: Thomas Weise
>            Assignee: Thomas Weise
>            Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over subtasks round
robin. This works as long as shard identifiers are sequential. After shards are rebalanced
in Kinesis, that may no longer be the case and the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message