flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] twalthr commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE
Date Wed, 17 Oct 2018 11:12:26 GMT
twalthr commented on a change in pull request #6815:  [FLINK-7062][cep][table] Added basic
support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225590279
 
 

 ##########
 File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##########
 @@ -172,71 +204,112 @@ class MatchCodeGenerator(
         if (count != 0) {
           throw new TableException("Flink does not support physical offsets within partition.")
         } else {
-          val patternFieldVisitor = new RexPatternFieldRefVisitor(config,
-            nullableInput,
+          val patternFieldVisitor = new RexPatternFieldRefVisitor(
+            config,
             input,
             0,
-            false)
-          call.getOperands.get(0).accept(patternFieldVisitor)
+            false,
+            reusablePatternListNames)
+          applyPatternVisitorToRef(call.getOperands.get(0), patternFieldVisitor)
         }
 
-      case MATCH_NUMBER =>
-        throw new TableException(s"Unsupported call: $call")
-
       case FIRST | LAST =>
         val countLiteral = call.operands.get(1).asInstanceOf[RexLiteral]
         val count = checkedDownCast(countLiteral.getValueAs(classOf[JLong]))
         val patternRef = call.operands.get(0)
 
-        val patternFieldVisitor = new RexPatternFieldRefVisitor(config,
-          nullableInput,
+        val patternFieldVisitor = new RexPatternFieldRefVisitor(
+          config,
           input,
           count,
-          call.getOperator == FIRST)
-        patternRef.accept(patternFieldVisitor)
+          call.getOperator == FIRST,
+          reusablePatternListNames)
+        applyPatternVisitorToRef(patternRef, patternFieldVisitor)
 
-      case CLASSIFIER | RUNNING =>
-        throw new TableException(s"${call.getOperator} is not supported yet.")
       case FINAL =>
         call.getOperands.get(0).accept(this)
 
       case _ => super.visitCall(call)
     }
   }
 
+  private def applyPatternVisitorToRef(
+    node: RexNode,
+    visitor: RexPatternFieldRefVisitor)
+  : GeneratedExpression = {
+    val code = node.accept(visitor)
+    reusablePerRecordStatements.add(visitor.reusePerRecordCode())
+    reusableMemberStatements.add(visitor.reuseMemberCode())
+    reusableInitStatements.add(visitor.reuseInitCode())
+
+    code
+  }
+
+  private def newNameForPatternList(patternName: String) = {
+    reusablePatternListNames.getOrElseUpdate(patternName, newName("patternEvents"))
+  }
+
+  override private[flink] def generateProctimeTimestamp() = {
+    val resultTerm = newName("result")
+
+    //TODO use timerService once it is available in PatternFlatSelectFunction
+    val resultCode =
+      s"""
+         |long $resultTerm = System.currentTimeMillis();
+         |""".stripMargin
+    GeneratedExpression(resultTerm, NEVER_NULL, resultCode, SqlTimeTypeInfo.TIMESTAMP)
+  }
+
   override def visitPatternFieldRef(fieldRef: RexPatternFieldRef): GeneratedExpression =
{
-    val patternFieldRefVisitor = new RexPatternFieldRefVisitor(config,
-      nullableInput,
+    val patternFieldRefVisitor = new RexPatternFieldRefVisitor(
+      config,
       input,
       0,
-      false)
-    fieldRef.accept(patternFieldRefVisitor)
+      false,
+      reusablePatternListNames)
+    applyPatternVisitorToRef(fieldRef, patternFieldRefVisitor)
   }
 
-  class RexPatternFieldRefVisitor(
+  private class RexPatternFieldRefVisitor(
     config: TableConfig,
-    nullableInput: Boolean,
     input: TypeInformation[_ <: Any],
     offset: Int,
-    first: Boolean) extends CodeGenerator(config, nullableInput, input) {
+    first: Boolean,
+    patternListNames: mutable.Map[String, String]) extends CodeGenerator(config, false, input)
{
+
+    private case class GeneratedCode(resultTerm: String, code: String)
 
 Review comment:
   All usages of `GeneratedCode` have actually no code. So we could simply return String in
those cases.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message