flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.
Date Wed, 28 Jun 2017 14:07:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066521#comment-16066521
] 

ASF GitHub Bot commented on FLINK-7008:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4195#discussion_r124550121
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
---
    @@ -297,6 +297,85 @@ public void testKeyedAdvancingTimeWithoutElements() throws Exception
{
     	}
     
     	@Test
    +	public void testKeyedCEPOperatorNFAChanged() throws Exception {
    --- End diff --
    
    I meant rather checking if the resetting for the flag works correct. We could do it by
veryfying correct number of invocations of `update` method. Also we should test it with both
RocksDB and InMemory state backends.
    
    We can do it with Mockito. Just a suggestion for the code:
    
        @Test
        public void testKeyedCEPOperatorNFAChanged() throws Exception {
    
        	String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
        	RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
        	rocksDBStateBackend.setDbStoragePath(rocksDbPath);
    
        	KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>(
        		Event.createTypeSerializer(),
        		true,
        		IntSerializer.INSTANCE,
        		new SimpleNFAFactory(),
        		true);
        	OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>>
harness = getCepTestHarness(operator);
    
        	try {
        		harness.setStateBackend(rocksDBStateBackend);
    
        		harness.open();
    
        		final ValueState nfaOperatorState = Whitebox.<ValueState>getInternalState(operator,
"nfaOperatorState");
        		final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState);
        		Whitebox.setInternalState(operator, "nfaOperatorState", nfaOperatorStateSpy);
    
        		Event startEvent = new Event(42, "c", 1.0);
        		SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
        		Event endEvent = new Event(42, "b", 1.0);
    
        		harness.processElement(new StreamRecord<>(startEvent, 1L));
        		harness.processElement(new StreamRecord<>(new Event(42, "d", 1.0), 4L));
        		harness.processElement(new StreamRecord<Event>(middleEvent, 4L));
        		harness.processElement(new StreamRecord<>(endEvent, 4L));
    
        		Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any());
        		// get and verify the output
    
        		Queue<Object> result = harness.getOutput();
    
        		assertEquals(1, result.size());
    
        		verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
        	} finally {
        		harness.close();
        	}
        }


> Update NFA state only when the NFA changes.
> -------------------------------------------
>
>                 Key: FLINK-7008
>                 URL: https://issues.apache.org/jira/browse/FLINK-7008
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.3.1
>            Reporter: Kostas Kloudas
>            Assignee: Dian Fu
>             Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we update the
NFA state every time the NFA is touched. This leads to redundant puts/gets to the state when
there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message