flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.
Date Wed, 28 Jun 2017 14:07:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066521#comment-16066521

ASF GitHub Bot commented on FLINK-7008:

Github user dawidwys commented on a diff in the pull request:

    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
    @@ -297,6 +297,85 @@ public void testKeyedAdvancingTimeWithoutElements() throws Exception
    +	public void testKeyedCEPOperatorNFAChanged() throws Exception {
    --- End diff --
    I meant rather checking if the resetting for the flag works correct. We could do it by
veryfying correct number of invocations of `update` method. Also we should test it with both
RocksDB and InMemory state backends.
    We can do it with Mockito. Just a suggestion for the code:
        public void testKeyedCEPOperatorNFAChanged() throws Exception {
        	String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
        	RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
        	KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>(
        		new SimpleNFAFactory(),
        	OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>>
harness = getCepTestHarness(operator);
        	try {
        		final ValueState nfaOperatorState = Whitebox.<ValueState>getInternalState(operator,
        		final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState);
        		Whitebox.setInternalState(operator, "nfaOperatorState", nfaOperatorStateSpy);
        		Event startEvent = new Event(42, "c", 1.0);
        		SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
        		Event endEvent = new Event(42, "b", 1.0);
        		harness.processElement(new StreamRecord<>(startEvent, 1L));
        		harness.processElement(new StreamRecord<>(new Event(42, "d", 1.0), 4L));
        		harness.processElement(new StreamRecord<Event>(middleEvent, 4L));
        		harness.processElement(new StreamRecord<>(endEvent, 4L));
        		Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any());
        		// get and verify the output
        		Queue<Object> result = harness.getOutput();
        		assertEquals(1, result.size());
        		verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
        	} finally {

> Update NFA state only when the NFA changes.
> -------------------------------------------
>                 Key: FLINK-7008
>                 URL: https://issues.apache.org/jira/browse/FLINK-7008
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.3.1
>            Reporter: Kostas Kloudas
>            Assignee: Dian Fu
>             Fix For: 1.4.0
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we update the
NFA state every time the NFA is touched. This leads to redundant puts/gets to the state when
there are no changes to the NFA itself.

This message was sent by Atlassian JIRA

View raw message