flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl0u <...@git.apache.org>
Subject [GitHub] flink pull request #4143: [FLINK-6418][cep] Support for dynamic state change...
Date Fri, 23 Jun 2017 10:08:57 GMT
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4143#discussion_r123713960
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
---
    @@ -514,25 +524,46 @@ private void addStopStateToLooping(final State<T> loopingState)
{
     		 */
     		@SuppressWarnings("unchecked")
     		private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState,
final State<T> lastSink) {
    -			final IterativeCondition<T> currentCondition = (IterativeCondition<T>)
currentPattern.getCondition();
    +			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
    +				(IterativeCondition<T>) currentPattern.getCondition(),
    +				(IterativeCondition<T>) currentPattern.getUntilCondition()
    +			);
     
     			final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal);
     			firstState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
    -			firstState.addTake(loopingState, currentCondition);
    +			firstState.addTake(loopingState, takeCondition);
     
     			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
     			if (ignoreFunction != null) {
     				final State<T> firstStateWithoutProceed = createState(currentPattern.getName(),
State.StateType.Normal);
     				firstState.addIgnore(firstStateWithoutProceed, ignoreFunction);
     				firstStateWithoutProceed.addIgnore(ignoreFunction);
    -				firstStateWithoutProceed.addTake(loopingState, currentCondition);
    +				firstStateWithoutProceed.addTake(loopingState, takeCondition);
     
     				addStopStates(firstStateWithoutProceed);
     			}
     			return firstState;
     		}
     
     		/**
    +		 * This method extends the given condition with stop(until) condition if necessary.
    +		 * The until condition needs to be applied only if both of the given conditions are
not null.
    +		 *
    +		 * @param condition the condition to extend
    +		 * @param untilCondition the until condition to join with the given condition
    +		 * @return condition with AND applied or the original condition
    +		 */
    +		private IterativeCondition<T> extendWithUntilCondition(
    +				IterativeCondition<T> condition,
    +				IterativeCondition<T> untilCondition) {
    +			if (untilCondition != null && condition != null) {
    +				return new AndCondition<>(new NotCondition<>(untilCondition), condition);
    +			} else {
    +				return condition;
    +			}
    +		}
    +
    --- End diff --
    
    The way this is, now you do not allow patterns with no condition and only until condition,
like: 
    
    ```
    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("c");
    			}
    		}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
    ```
    
    To do this, I think that transforming the method to the following will do the job:
    
    ```
    if (untilCondition != null && condition != null) {
    				return new AndCondition<>(new NotCondition<>(untilCondition), condition);
    			}
    			if (condition != null) {
    				return condition;
    			}
    			if (untilCondition != null) {
    				return new NotCondition<>(untilCondition);
    			}
    			return null;
    ```
    
    And to do this properly, you could also add a test in the `UntilConditionITCase` like:
    
    ```
    List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    
    		Event startEvent = new Event(40, "c", 1.0);
    		Event middleEvent1 = new Event(41, "a", 2.0);
    		Event middleEvent2 = new Event(42, "a", 3.0);
    		Event startEvent2 = new Event(40, "d", 1.0);
    		Event breaking = new Event(44, "a", 5.0);
    		Event ignored = new Event(45, "a", 6.0);
    
    		inputEvents.add(new StreamRecord<>(startEvent, 1));
    		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
    		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
    		inputEvents.add(new StreamRecord<>(startEvent2, 4));
    		inputEvents.add(new StreamRecord<>(breaking, 6));
    		inputEvents.add(new StreamRecord<>(ignored, 7));
    
    		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("c");
    			}
    		}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
    
    		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    
    		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    ```
    
    And you could also add one for the case that this method returns `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message