flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6418) Support for dynamic state changes in CEP patterns
Date Fri, 23 Jun 2017 10:09:02 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060680#comment-16060680
] 

ASF GitHub Bot commented on FLINK-6418:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4143#discussion_r123713960
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
---
    @@ -514,25 +524,46 @@ private void addStopStateToLooping(final State<T> loopingState)
{
     		 */
     		@SuppressWarnings("unchecked")
     		private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState,
final State<T> lastSink) {
    -			final IterativeCondition<T> currentCondition = (IterativeCondition<T>)
currentPattern.getCondition();
    +			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
    +				(IterativeCondition<T>) currentPattern.getCondition(),
    +				(IterativeCondition<T>) currentPattern.getUntilCondition()
    +			);
     
     			final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal);
     			firstState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
    -			firstState.addTake(loopingState, currentCondition);
    +			firstState.addTake(loopingState, takeCondition);
     
     			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
     			if (ignoreFunction != null) {
     				final State<T> firstStateWithoutProceed = createState(currentPattern.getName(),
State.StateType.Normal);
     				firstState.addIgnore(firstStateWithoutProceed, ignoreFunction);
     				firstStateWithoutProceed.addIgnore(ignoreFunction);
    -				firstStateWithoutProceed.addTake(loopingState, currentCondition);
    +				firstStateWithoutProceed.addTake(loopingState, takeCondition);
     
     				addStopStates(firstStateWithoutProceed);
     			}
     			return firstState;
     		}
     
     		/**
    +		 * This method extends the given condition with stop(until) condition if necessary.
    +		 * The until condition needs to be applied only if both of the given conditions are
not null.
    +		 *
    +		 * @param condition the condition to extend
    +		 * @param untilCondition the until condition to join with the given condition
    +		 * @return condition with AND applied or the original condition
    +		 */
    +		private IterativeCondition<T> extendWithUntilCondition(
    +				IterativeCondition<T> condition,
    +				IterativeCondition<T> untilCondition) {
    +			if (untilCondition != null && condition != null) {
    +				return new AndCondition<>(new NotCondition<>(untilCondition), condition);
    +			} else {
    +				return condition;
    +			}
    +		}
    +
    --- End diff --
    
    The way this is, now you do not allow patterns with no condition and only until condition,
like: 
    
    ```
    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("c");
    			}
    		}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
    ```
    
    To do this, I think that transforming the method to the following will do the job:
    
    ```
    if (untilCondition != null && condition != null) {
    				return new AndCondition<>(new NotCondition<>(untilCondition), condition);
    			}
    			if (condition != null) {
    				return condition;
    			}
    			if (untilCondition != null) {
    				return new NotCondition<>(untilCondition);
    			}
    			return null;
    ```
    
    And to do this properly, you could also add a test in the `UntilConditionITCase` like:
    
    ```
    List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    
    		Event startEvent = new Event(40, "c", 1.0);
    		Event middleEvent1 = new Event(41, "a", 2.0);
    		Event middleEvent2 = new Event(42, "a", 3.0);
    		Event startEvent2 = new Event(40, "d", 1.0);
    		Event breaking = new Event(44, "a", 5.0);
    		Event ignored = new Event(45, "a", 6.0);
    
    		inputEvents.add(new StreamRecord<>(startEvent, 1));
    		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
    		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
    		inputEvents.add(new StreamRecord<>(startEvent2, 4));
    		inputEvents.add(new StreamRecord<>(breaking, 6));
    		inputEvents.add(new StreamRecord<>(ignored, 7));
    
    		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("c");
    			}
    		}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
    
    		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    
    		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    ```
    
    And you could also add one for the case that this method returns `null`.


> Support for dynamic state changes in CEP patterns
> -------------------------------------------------
>
>                 Key: FLINK-6418
>                 URL: https://issues.apache.org/jira/browse/FLINK-6418
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Elias Levy
>            Assignee: Dawid Wysakowicz
>
> Flink CEP library allows one to define event pattern to match where the match condition
can be determined programmatically via the {{where}} method.  Flink 1.3 will introduce so-called
iterative conditions, which allow the predicate to look up events already matched by the pattern
and thus be conditional on them.
> 1.3 also introduces to the API quantifer methods which allow one to declaratively specific
how many times a condition must be matched before there is a state change.
> Alas, there are use cases where the quantifier must be determined dynamically based on
the events matched by the pattern so far.  Therefore, I propose the adding of a new {{Pattern}}:
{{until}}.
> Like the new iterative variant of {{where}}, {{until}} would take a predicate function
and a context that provides access to events already matched.  But whereas {{where}} determines
if an event is accepted by the pattern, {{until}} determines whether is pattern should move
on to the next state.
> In our particular use case, we have a pattern where an event is matched a number of times,
but depending on the event type, the number (threshold) for the pattern to match is different.
 We could decompose the pattern into multiple similar patterns, but that could be inefficient
if we have many such patterns.  If the functionality of {{until}} were available, we could
make do with a single pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message