flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Date Mon, 18 Sep 2017 15:51:23 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4625#discussion_r139400962
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
---
    @@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{
         val expectedOutput = new ConcurrentLinkedQueue[Object]()
     
         expectedOutput.add(new StreamRecord(
    -      CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7))
    +      CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7))
         expectedOutput.add(new StreamRecord(
    -      CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
    +      CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12))
     
         verify(expectedOutput, result, new RowResultSortComparator())
     
         testHarness.close()
       }
     
    +  /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/
    +  @Test
    +  def testCommonRowTimeJoin() {
    +
    +    val joinProcessFunc = new RowTimeBoundedStreamInnerJoin(
    +      -10, 20, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0)
    +
    +    val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
    +      new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc)
    +    val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow]
=
    +      new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
    +        operator,
    +        new TupleRowKeySelector[String](1),
    +        new TupleRowKeySelector[String](1),
    +        BasicTypeInfo.STRING_TYPE_INFO,
    +        1, 1, 0)
    +
    +    testHarness.open()
    +
    +    // Advance
    +    testHarness.processWatermark1(new Watermark(1))
    +    testHarness.processWatermark2(new Watermark(1))
    +
    +    // Test late data
    +    testHarness.processElement1(new StreamRecord[CRow](
    +      CRow(Row.of(1L: JLong, "k1"), true), 0))
    +
    +    assertEquals(0, testHarness.numEventTimeTimers())
    +
    +    testHarness.processElement1(new StreamRecord[CRow](
    +      CRow(Row.of(2L: JLong, "k1"), true), 0))
    +    testHarness.processElement2(new StreamRecord[CRow](
    +      CRow(Row.of(2L: JLong, "k1"), true), 0))
    +
    +    assertEquals(2, testHarness.numEventTimeTimers())
    +    assertEquals(4, testHarness.numKeyedStateEntries())
    +
    +    testHarness.processElement1(new StreamRecord[CRow](
    +      CRow(Row.of(5L: JLong, "k1"), true), 0))
    +    testHarness.processElement2(new StreamRecord[CRow](
    +      CRow(Row.of(15L: JLong, "k1"), true), 0))
    +
    +    testHarness.processWatermark1(new Watermark(20))
    +    testHarness.processWatermark2(new Watermark(20))
    +
    +    assertEquals(4, testHarness.numKeyedStateEntries())
    +
    +    testHarness.processElement1(new StreamRecord[CRow](
    +      CRow(Row.of(35L: JLong, "k1"), true), 0))
    +
    +    testHarness.processWatermark1(new Watermark(38))
    +    testHarness.processWatermark2(new Watermark(38))
    +
    +    testHarness.processElement1(new StreamRecord[CRow](
    +      CRow(Row.of(40L: JLong, "k2"), true), 0))
    +    testHarness.processElement2(new StreamRecord[CRow](
    +      CRow(Row.of(39L: JLong, "k2"), true), 0))
    +
    +    assertEquals(6, testHarness.numKeyedStateEntries())
    +
    +    testHarness.processWatermark1(new Watermark(61))
    +    testHarness.processWatermark2(new Watermark(61))
    +
    +    assertEquals(4, testHarness.numKeyedStateEntries())
    +
    +    val expectedOutput = new ConcurrentLinkedQueue[Object]()
    +    expectedOutput.add(new StreamRecord(
    +      CRow(Row.of(2L: JLong, "k1", 2L: JLong, "k1"), true), 0))
    +    expectedOutput.add(new StreamRecord(
    +      CRow(Row.of(5L: JLong, "k1", 2L: JLong, "k1"), true), 0))
    +    expectedOutput.add(new StreamRecord(
    +      CRow(Row.of(5L: JLong, "k1", 15L: JLong, "k1"), true), 0))
    +    expectedOutput.add(new StreamRecord(
    +      CRow(Row.of(35L: JLong, "k1", 15L: JLong, "k1"), true), 0))
    +    expectedOutput.add(new StreamRecord(
    +      CRow(Row.of(40L: JLong, "k2", 39L: JLong, "k2"), true), 0))
    +
    +    val result = testHarness.getOutput
    +    verify(expectedOutput, result, new RowResultSortComparator())
    +    testHarness.close()
    +  }
    +
    +  /** a.rowtime >= b.rowtime - 10 and a.rowtime <= b.rowtime - 7 **/
    +  @Test
    +  def testNegativeRowTimeJoin() {
    +
    +    val joinProcessFunc = new RowTimeBoundedStreamInnerJoin(
    +      -10, -7, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0)
    +
    +    val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
    +      new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc)
    +    val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow]
=
    +      new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
    +        operator,
    +        new TupleRowKeySelector[String](1),
    +        new TupleRowKeySelector[String](1),
    +        BasicTypeInfo.STRING_TYPE_INFO,
    +        1, 1, 0)
    +
    +    testHarness.open()
    +
    +    // Advance
    --- End diff --
    
    Add more inline comments.


---

Mime
View raw message