Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4625#discussion_r140645274
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
// Initialize the data caches.
val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType)
val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
- new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache",
- BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo)
+ new MapStateDescriptor[Long, JList[Row]](
+ timeIndicator + "InnerJoinLeftCache",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ leftListTypeInfo)
leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType)
val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
- new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache",
- BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo)
+ new MapStateDescriptor[Long, JList[Row]](
+ timeIndicator + "InnerJoinRightCache",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ rightListTypeInfo)
rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
// Initialize the timer states.
val leftTimerStateDesc: ValueStateDescriptor[Long] =
- new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState",
- classOf[Long])
+ new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long])
leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
val rightTimerStateDesc: ValueStateDescriptor[Long] =
- new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState",
- classOf[Long])
+ new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long])
rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
}
/**
- * Process records from the left stream.
- *
- * @param cRowValue the input record
- * @param ctx the context to register timer or get current time
- * @param out the collector for outputting results
- *
+ * Process rows from the left stream.
*/
override def processElement1(
- cRowValue: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
- getCurrentOperatorTime(ctx)
+ cRowValue: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+ updateOperatorTime(ctx)
+ val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+ val oppositeLowerBound: Long = rowTime - rightRelativeSize
+ val oppositeUpperBound: Long = rowTime + leftRelativeSize
processElement(
cRowValue,
- timeForRecord,
+ rowTime,
ctx,
out,
leftOperatorTime,
+ oppositeLowerBound,
+ oppositeUpperBound,
rightOperatorTime,
rightTimerState,
leftCache,
rightCache,
- true
+ leftRow = true
)
}
/**
- * Process records from the right stream.
- *
- * @param cRowValue the input record
- * @param ctx the context to get current time
- * @param out the collector for outputting results
- *
+ * Process rows from the right stream.
*/
override def processElement2(
- cRowValue: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
- getCurrentOperatorTime(ctx)
+ cRowValue: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+ updateOperatorTime(ctx)
+ val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+ val oppositeLowerBound: Long = rowTime - leftRelativeSize
+ val oppositeUpperBound: Long = rowTime + rightRelativeSize
processElement(
cRowValue,
- timeForRecord,
+ rowTime,
ctx,
out,
rightOperatorTime,
+ oppositeLowerBound,
+ oppositeUpperBound,
leftOperatorTime,
leftTimerState,
rightCache,
leftCache,
- false
+ leftRow = false
)
}
/**
- * Put a record from the input stream into the cache and iterate the opposite cache
to
- * output records meeting the join conditions. If there is no timer set for the OPPOSITE
+ * Put a row from the input stream into the cache and iterate the opposite cache to
+ * output join results meeting the conditions. If there is no timer set for the OPPOSITE
* STREAM, register one.
*/
private def processElement(
- cRowValue: CRow,
- timeForRecord: Long,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow],
- myWatermark: Long,
- oppositeWatermark: Long,
- oppositeTimeState: ValueState[Long],
- recordListCache: MapState[Long, JList[Row]],
- oppositeCache: MapState[Long, JList[Row]],
- leftRecord: Boolean): Unit = {
- if (relativeWindowSize > 0) {
- //TODO Shall we consider adding a method for initialization with the context and
collector?
- cRowWrapper.out = out
-
- val record = cRowValue.row
-
- //TODO Only if the time of the record is greater than the watermark, can we continue.
- if (timeForRecord >= myWatermark - allowedLateness) {
- val oppositeLowerBound: Long =
- if (leftRecord) timeForRecord - rightRelativeSize else timeForRecord - leftRelativeSize
-
- val oppositeUpperBound: Long =
- if (leftRecord) timeForRecord + leftRelativeSize else timeForRecord + rightRelativeSize
-
- // Put the record into the cache for later use.
- val recordList = if (recordListCache.contains(timeForRecord)) {
- recordListCache.get(timeForRecord)
- } else {
- new util.ArrayList[Row]()
- }
- recordList.add(record)
- recordListCache.put(timeForRecord, recordList)
-
- // Register a timer on THE OTHER STREAM to remove records from the cache once
they are
- // expired.
- if (oppositeTimeState.value == 0) {
- registerCleanUpTimer(
- ctx, timeForRecord, oppositeWatermark, oppositeTimeState, leftRecord, true)
- }
+ cRowValue: CRow,
+ timeForRow: Long,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow],
+ myWatermark: Long,
+ oppositeLowerBound: Long,
+ oppositeUpperBound: Long,
+ oppositeWatermark: Long,
+ oppositeTimeState: ValueState[Long],
+ rowListCache: MapState[Long, JList[Row]],
+ oppositeCache: MapState[Long, JList[Row]],
+ leftRow: Boolean): Unit = {
+ cRowWrapper.out = out
+ val row = cRowValue.row
+ if (!checkRowOutOfDate(timeForRow, myWatermark)) {
--- End diff --
Hi @fhueske, I got another concern that if we produce partial results, will that affect
the batch/stream SQL consolidation? Besides, the semantics of the watermark will not be hold
there. I suggest to consult more people and make this decision carefully.
---
|