flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1753) Add more tests for Kafka Connectors
Date Mon, 13 Apr 2015 10:10:12 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492225#comment-14492225
] 

ASF GitHub Bot commented on FLINK-1753:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/589#discussion_r28227273
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
---
    @@ -111,18 +128,38 @@ public void initialize() throws InterruptedException {
     		} while (metadata == null);
     
     		if (metadata.leader() == null) {
    -			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts.get(0)
    -					+ ":" + port);
    +			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts
+ ")");
     		}
     
    -		leadBroker = metadata.leader().host();
    +		leadBroker = metadata.leader();
     		clientName = "Client_" + topic + "_" + partition;
     
    -		consumer = new SimpleConsumer(leadBroker, port, connectTimeoutMs, bufferSize, clientName);
    +		consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), connectTimeoutMs,
bufferSize, clientName);
    +
    +		try {
    +			readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
    +		} catch (NotLeaderForPartitionException e) {
    +			do {
    +
    +				metadata = findLeader(hosts, topic, partition);
     
    -		readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
    +				try {
    +					Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
    +				} catch (InterruptedException ie) {
    +					throw new InterruptedException("Establishing connection to Kafka failed");
    +				}
    +			} while (metadata == null);
    --- End diff --
    
    This loop also might run forever


> Add more tests for Kafka Connectors
> -----------------------------------
>
>                 Key: FLINK-1753
>                 URL: https://issues.apache.org/jira/browse/FLINK-1753
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Gábor Hermann
>
> The current {{KafkaITCase}} is only doing a single test.
> We need to refactor that test so that it brings up a Kafka/Zookeeper server and than
performs various tests:
> Tests to include:
> - A topology with non-string types MERGED IN 359b39c3
> - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3
> - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3
> - Kafka broker failure.
> - Flink TaskManager failure



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message