flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
Date Wed, 31 Jan 2018 15:21:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346998#comment-16346998
] 

ASF GitHub Bot commented on FLINK-8516:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5393#discussion_r165081173
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
---
    @@ -584,17 +594,34 @@ private static ShardMetricsReporter registerShardMetrics(MetricGroup
metricGroup
     	//  Miscellaneous utility functions
     	// ------------------------------------------------------------------------
     
    +	/**
    +	 * Function to map a Kinesis shard to a Flink subtask index.
    +	 */
    +	public interface ShardToSubtaskIndexFn {
    --- End diff --
    
    I would prefer if this is a top-level class, instead of nested in `KinesisDataFetcher`
(which is actually an internal class).
    
    Also, the `Fn` name could also be polished a bit. Maybe something like `KinesisShardAssigner`
would be better? That would also be coherent name-wise with the Kafka side, which has a `KafkaTopicPartitionAssigner`
class (though it isn't exposed).


> FlinkKinesisConsumer does not balance shards over subtasks
> ----------------------------------------------------------
>
>                 Key: FLINK-8516
>                 URL: https://issues.apache.org/jira/browse/FLINK-8516
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>    Affects Versions: 1.4.0, 1.3.2, 1.5.0
>            Reporter: Thomas Weise
>            Assignee: Thomas Weise
>            Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over subtasks round
robin. This works as long as shard identifiers are sequential. After shards are rebalanced
in Kinesis, that may no longer be the case and the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message