flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #5092: [FLINK-4812][metrics] Expose currentLowWatermark f...
Date Wed, 29 Nov 2017 10:01:20 GMT
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5092#discussion_r153739683
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
---
    @@ -224,6 +226,39 @@ public void testLatencyMarkEmission() throws Exception {
     		Assert.assertTrue(output.get(i).isWatermark());
     	}
     
    +	@Test
    +	public void testManualWatermarkContextWatermarkMetric() throws Exception {
    +
    +		// regular stream source operator
    +		final StoppableStreamSource<String, InfiniteSource<String>> operator =
    +			new StoppableStreamSource<>(new InfiniteSource<String>());
    +
    +		long watermarkInterval = 10;
    +		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
    +		processingTimeService.setCurrentTime(0);
    +
    +		setupSourceOperator(operator, TimeCharacteristic.EventTime, watermarkInterval, 0, processingTimeService);
    +
    +		WatermarkGauge watermarkGauge = new WatermarkGauge();
    +
    +		SourceFunction.SourceContext<String> sourceContext = StreamSourceContexts.getSourceContext(TimeCharacteristic.EventTime,
    +			operator.getContainingTask().getProcessingTimeService(),
    +			operator.getContainingTask().getCheckpointLock(),
    +			operator.getContainingTask().getStreamStatusMaintainer(),
    +			new CollectorOutput<String>(new ArrayList<>(1)),
    +			operator.getExecutionConfig().getAutoWatermarkInterval(),
    +			-1,
    +			watermarkGauge);
    +
    +		Watermark wm1 = new Watermark(64);
    +		sourceContext.emitWatermark(wm1);
    +		assertEquals(wm1.getTimestamp(), watermarkGauge.getValue().longValue());
    +
    +		Watermark wm2 = new Watermark(128);
    +		sourceContext.emitWatermark(wm2);
    +		assertEquals(wm2.getTimestamp(), watermarkGauge.getValue().longValue());
    +	}
    +
     	@Test
     	public void testAutomaticWatermarkContext() throws Exception {
    --- End diff --
    
    I think we can strengthen this test a little.
    
    It is missing the case where we emit the `Long.MAX_VALUE` watermark from the source context,
and verify that the output contains it. Additionally verifying that the metric gauge is also
updated to `Long.MAX_VALUE`, we should be able to catch the missing update case I mentioned
above.


---

Mime
View raw message