flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Date Fri, 29 Sep 2017 09:58:10 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4625#discussion_r141831721
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
---
    @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
         // Initialize the data caches.
         val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType)
         val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    -      new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache",
    -        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo)
    +      new MapStateDescriptor[Long, JList[Row]](
    +        timeIndicator + "InnerJoinLeftCache",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    +        leftListTypeInfo)
         leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
     
         val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType)
         val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    -      new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache",
    -        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo)
    +      new MapStateDescriptor[Long, JList[Row]](
    +        timeIndicator + "InnerJoinRightCache",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    +        rightListTypeInfo)
         rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
     
         // Initialize the timer states.
         val leftTimerStateDesc: ValueStateDescriptor[Long] =
    -      new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState",
    -        classOf[Long])
    +      new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long])
         leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
     
         val rightTimerStateDesc: ValueStateDescriptor[Long] =
    -      new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState",
    -        classOf[Long])
    +      new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long])
         rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
       }
     
       /**
    -    * Process records from the left stream.
    -    *
    -    * @param cRowValue the input record
    -    * @param ctx       the context to register timer or get current time
    -    * @param out       the collector for outputting results
    -    *
    +    * Process rows from the left stream.
         */
       override def processElement1(
    -    cRowValue: CRow,
    -    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -    out: Collector[CRow]): Unit = {
    -    val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
    -    getCurrentOperatorTime(ctx)
    +      cRowValue: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +    updateOperatorTime(ctx)
    +    val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
    +    val oppositeLowerBound: Long = rowTime - rightRelativeSize
    +    val oppositeUpperBound: Long = rowTime + leftRelativeSize
         processElement(
           cRowValue,
    -      timeForRecord,
    +      rowTime,
           ctx,
           out,
           leftOperatorTime,
    +      oppositeLowerBound,
    +      oppositeUpperBound,
           rightOperatorTime,
           rightTimerState,
           leftCache,
           rightCache,
    -      true
    +      leftRow = true
         )
       }
     
       /**
    -    * Process records from the right stream.
    -    *
    -    * @param cRowValue the input record
    -    * @param ctx       the context to get current time
    -    * @param out       the collector for outputting results
    -    *
    +    * Process rows from the right stream.
         */
       override def processElement2(
    -    cRowValue: CRow,
    -    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -    out: Collector[CRow]): Unit = {
    -    val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
    -    getCurrentOperatorTime(ctx)
    +      cRowValue: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +    updateOperatorTime(ctx)
    +    val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
    +    val oppositeLowerBound: Long = rowTime - leftRelativeSize
    +    val oppositeUpperBound: Long =  rowTime + rightRelativeSize
         processElement(
           cRowValue,
    -      timeForRecord,
    +      rowTime,
           ctx,
           out,
           rightOperatorTime,
    +      oppositeLowerBound,
    +      oppositeUpperBound,
           leftOperatorTime,
           leftTimerState,
           rightCache,
           leftCache,
    -      false
    +      leftRow = false
         )
       }
     
       /**
    -    * Put a record from the input stream into the cache and iterate the opposite cache
to
    -    * output records meeting the join conditions. If there is no timer set for the OPPOSITE
    +    * Put a row from the input stream into the cache and iterate the opposite cache to
    +    * output join results meeting the conditions. If there is no timer set for the OPPOSITE
         * STREAM, register one.
         */
       private def processElement(
    -    cRowValue: CRow,
    -    timeForRecord: Long,
    -    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -    out: Collector[CRow],
    -    myWatermark: Long,
    -    oppositeWatermark: Long,
    -    oppositeTimeState: ValueState[Long],
    -    recordListCache: MapState[Long, JList[Row]],
    -    oppositeCache: MapState[Long, JList[Row]],
    -    leftRecord: Boolean): Unit = {
    -    if (relativeWindowSize > 0) {
    -      //TODO Shall we consider adding a method for initialization with the context and
collector?
    -      cRowWrapper.out = out
    -
    -      val record = cRowValue.row
    -
    -      //TODO Only if the time of the record is greater than the watermark, can we continue.
    -      if (timeForRecord >= myWatermark - allowedLateness) {
    -        val oppositeLowerBound: Long =
    -          if (leftRecord) timeForRecord - rightRelativeSize else timeForRecord - leftRelativeSize
    -
    -        val oppositeUpperBound: Long =
    -          if (leftRecord) timeForRecord + leftRelativeSize else timeForRecord + rightRelativeSize
    -
    -        // Put the record into the cache for later use.
    -        val recordList = if (recordListCache.contains(timeForRecord)) {
    -          recordListCache.get(timeForRecord)
    -        } else {
    -          new util.ArrayList[Row]()
    -        }
    -        recordList.add(record)
    -        recordListCache.put(timeForRecord, recordList)
    -
    -        // Register a timer on THE OTHER STREAM to remove records from the cache once
they are
    -        // expired.
    -        if (oppositeTimeState.value == 0) {
    -          registerCleanUpTimer(
    -            ctx, timeForRecord, oppositeWatermark, oppositeTimeState, leftRecord, true)
    -        }
    +      cRowValue: CRow,
    +      timeForRow: Long,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow],
    +      myWatermark: Long,
    +      oppositeLowerBound: Long,
    +      oppositeUpperBound: Long,
    +      oppositeWatermark: Long,
    +      oppositeTimeState: ValueState[Long],
    +      rowListCache: MapState[Long, JList[Row]],
    +      oppositeCache: MapState[Long, JList[Row]],
    +      leftRow: Boolean): Unit = {
    +    cRowWrapper.out = out
    +    val row = cRowValue.row
    +    if (!checkRowOutOfDate(timeForRow, myWatermark)) {
    --- End diff --
    
    There is no problem with the unified semantics for batch and streaming. In fact, handling
as much late data as possible will result in closer results of streaming and batch queries.

    
    You can think of a batch table as a streaming table that does not provide watermark except
for one `Long.MAX_VALUE` watermark after all rows have been shipped. Hence, batch tables have
no late data and the more (late) data a streaming query processes the closer its result is
to the result of an equivalent batch query.


---

Mime
View raw message