flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zentol <...@git.apache.org>
Subject [GitHub] flink pull request #4190: [FLINK-7011] [kafka] Harden Kafka testStartFromKaf...
Date Wed, 28 Jun 2017 12:34:07 GMT
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4190#discussion_r124527723
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
    @@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws Exception {
     
     		final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition,
parallelism, 1);
     
    -		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
    +		// read some records so that some offsets are committed to Kafka
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.getConfig().disableSysoutLogging();
    +		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +		env.setParallelism(parallelism);
    +		env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
     
    -		Long o1;
    -		Long o2;
    -		Long o3;
    -		int attempt = 0;
    -		// make sure that o1, o2, o3 are not all null before proceeding
    -		do {
    -			attempt++;
    -			LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka");
    -
    -			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    -			env.getConfig().disableSysoutLogging();
    -			env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    -			env.setParallelism(parallelism);
    -			env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
    -
    -			env
    -				.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
    -				.map(new ThrottledMapper<String>(consumePause))
    -				.map(new MapFunction<String, Object>() {
    -					int count = 0;
    -					@Override
    -					public Object map(String value) throws Exception {
    -						count++;
    -						if (count == recordsToConsume) {
    -							throw new SuccessException();
    -						}
    -						return null;
    +		env
    +			.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
    +			.map(new ThrottledMapper<String>(consumePause))
    +			.map(new MapFunction<String, Object>() {
    +				int count = 0;
    +				@Override
    +				public Object map(String value) throws Exception {
    +					count++;
    +					if (count == recordsToConsume) {
    +						throw new SuccessException();
     					}
    -				})
    -				.addSink(new DiscardingSink<>());
    +					return null;
    +				}
    +			})
    +			.addSink(new DiscardingSink<>());
     
    -			tryExecute(env, "Read some records to commit offsets to Kafka");
    +		tryExecute(env, "Read some records to commit offsets to Kafka");
     
    +		// make sure that we indeed have some offsets committed to Kafka
    +		Long o1 = null;
    +		Long o2 = null;
    +		Long o3 = null;
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
    +		while (o1 == null && o2 == null && o3 == null) {
    +			Thread.sleep(100);
     			o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
     			o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
     			o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
    -		} while (o1 == null && o2 == null && o3 == null && attempt
< 3);
    --- End diff --
    
    I would still add a hard-limit for attempts (20?); there's no benefit in a test that succeeds
_at some point_, if it doesn't finish relatively quickly the entire build will time out anyway.
    
    Also, when running tests locally i suppose there isn't even a time-limit that would kill
the test...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message