flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
Date Tue, 04 Dec 2018 16:02:01 GMT
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates
in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238709945
 
 

 ##########
 File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##########
 @@ -49,6 +50,90 @@ import scala.collection.mutable
 /**
   * A code generator for generating CEP related functions.
   *
+  * Aggregates are generated as follows:
+  * 1. all aggregate [[RexCall]]s are grouped by corresponding pattern variable
+  * 2. even if the same aggregation is used multiple times in an expression
+  *    (e.g. SUM(A.price) > SUM(A.price) + 1) it will be calculated once. To do so [[AggBuilder]]
+  *    keeps set of already seen different aggregation calls, and reuses the code to access
+  *    appropriate field of aggregation result
+  * 3. after translating every expression (either in [[generateCondition]] or in
+  *    [[generateOneRowPerMatchExpression]]) there will be generated code for
+  *       * [[GeneratedFunction]], which will be an inner class
+  *       * said [[GeneratedFunction]] will be instantiated in the ctor and opened/closed
+  *         in corresponding methods of top level generated classes
+  *       * function that transforms input rows (row by row) into aggregate input rows
+  *       * function that calculates aggregates for variable, that uses the previous method
+  *    The generated code will look similar to this:
+  *
+  *
+  * {{{
+  *
+  * public class MatchRecognizePatternSelectFunction$175 extends RichPatternSelectFunction
{
+  *
+  *     // Class used to calculate aggregates for a single pattern variable
+  *     public final class AggFunction_variable$115$151 extends GeneratedAggregations {
+  *       ...
+  *     }
+  *
+  *     private final AggFunction_variable$115$151 aggregator_variable$115;
+  *
+  *     public MatchRecognizePatternSelectFunction$175() {
+  *       aggregator_variable$115 = new AggFunction_variable$115$151();
+  *     }
+  *
+  *     public void open() {
+  *       aggregator_variable$115.open();
+  *       ...
+  *     }
+  *
+  *     // Function to transform incoming row into aggregate specific row. It can e.g calculate
+  *     // inner expression of said aggregate
+  *     private Row transformRowForAgg_variable$115(Row inAgg) {
+  *         ...
+  *     }
+  *
+  *     // Function to calculate all aggregates for a single pattern variable
+  *     private Row calculateAgg_variable$115(List<Row> input) {
+  *       Acc accumulator = aggregator_variable$115.createAccumulator();
+  *       for (Row row : input) {
+  *         aggregator_variable$115.accumulate(accumulator, transformRowForAgg_variable$115(row));
+  *       }
+  *
+  *       return aggregator_variable$115.getResult(accumulator);
+  *     }
+  *
+  *     @Override
+  *     public Object select(Map<String, List<Row>> in1) throws Exception {
+  *
+  *       // Extract list of rows assigned to a single pattern variable
+  *       java.util.List patternEvents$130 = (java.util.List) in1.get("A");
+  *       ...
+  *
+  *       // Calculate aggregates
+  *       Row aggRow_variable$110$111 = calculateAgg_variable$110(patternEvents$114);
+  *
+  *       // Every aggregation (e.g SUM(A.price) and AVG(A.price)) will be extracted to a
variable
+  *       double result$135 = aggRow_variable$126$127.getField(0);
+  *       long result$137 = aggRow_variable$126$127.getField(1);
+  *
+  *       // Result of aggregation will be used in expression evaluation
+  *       out.setField(0, result$135)
+  *
+  *       long result$140 = result$137 * 2;
+  *       out.setField(1, result$140);
+  *
+  *       double result$144 = $result135 + result$137;
+  *       out.setField(2, result$144);
+  *     }
+  *
+  *     public void close() {
+  *       aggregator_variable$115.close();
+  *       ...
+  *     }
+  *
+  * }
+  * }}}
+  *
 
 Review comment:
   Awesome documentation. Very helpful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message