carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CI][Streaming] Reduce the execution time of TestStreamingTableOperation test suite
Date Sun, 28 Jan 2018 07:35:44 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 2f4dbb694 -> 5a82232a8


[CI][Streaming] Reduce the execution time of TestStreamingTableOperation test suite

Combine test case to reduce the execution time of TestStreamingTableOperation test suiteļ¼Œ
but not reduce the test coverage.

This closes #1863


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5a82232a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5a82232a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5a82232a

Branch: refs/heads/master
Commit: 5a82232a85abf97bc6fad7c8b0ab036e5d74eab2
Parents: 2f4dbb6
Author: QiangCai <qiangcai@qq.com>
Authored: Fri Jan 26 09:40:46 2018 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Sun Jan 28 13:05:32 2018 +0530

----------------------------------------------------------------------
 .../TestStreamingTableOperation.scala           | 659 +++++--------------
 1 file changed, 170 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a82232a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 62076bf..3de1391 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -63,42 +63,23 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     createTable(tableName = "batch_table", streaming = false, withBatchLoad = true)
 
     // 2. streaming table with different input source
-    // socket source
-    createTable(tableName = "stream_table_socket", streaming = true, withBatchLoad = true)
     // file source
     createTable(tableName = "stream_table_file", streaming = true, withBatchLoad = true)
 
     // 3. streaming table with bad records
-    createTable(tableName = "bad_record_force", streaming = true, withBatchLoad = true)
     createTable(tableName = "bad_record_fail", streaming = true, withBatchLoad = true)
 
     // 4. streaming frequency check
     createTable(tableName = "stream_table_1s", streaming = true, withBatchLoad = true)
-    createTable(tableName = "stream_table_10s", streaming = true, withBatchLoad = true)
 
     // 5. streaming table execute batch loading
-    createTable(tableName = "stream_table_batch", streaming = true, withBatchLoad = false)
-
     // 6. detail query
-    // full scan
-    createTable(tableName = "stream_table_scan", streaming = true, withBatchLoad = true)
-    createTableWithComplexType(
-      tableName = "stream_table_scan_complex", streaming = true, withBatchLoad = true)
-    // filter scan
+    // 8. compaction
+    // full scan + filter scan + aggregate query
     createTable(tableName = "stream_table_filter", streaming = true, withBatchLoad = true)
-    createTableWithComplexType(
-      tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true)
 
-    // 7. aggregate query
-    createTable(tableName = "stream_table_agg", streaming = true, withBatchLoad = true)
     createTableWithComplexType(
-      tableName = "stream_table_agg_complex", streaming = true, withBatchLoad = true)
-
-    // 8. compaction
-    createTable(tableName = "stream_table_compact", streaming = true, withBatchLoad = true)
-
-    // 9. create new stream segment if current stream segment is full
-    createTable(tableName = "stream_table_new", streaming = true, withBatchLoad = true)
+      tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true)
 
     // 10. fault tolerant
     createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true)
@@ -108,22 +89,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     createTable(tableName = "stream_table_delete_date", streaming = true, withBatchLoad =
false)
 
     // 12. reject alter streaming properties
-    createTable(tableName = "stream_table_alter", streaming = false, withBatchLoad = false)
-
-    // 13. handoff streaming segment
-    createTable(tableName = "stream_table_handoff", streaming = true, withBatchLoad = false)
-
-    // 14. finish streaming
-    createTable(tableName = "stream_table_finish", streaming = true, withBatchLoad = true)
+    // 13. handoff streaming segment and finish streaming
+    createTable(tableName = "stream_table_handoff", streaming = false, withBatchLoad = false)
 
     // 15. auto handoff streaming segment
-    createTable(tableName = "stream_table_auto_handoff", streaming = true, withBatchLoad
= false)
-
     // 16. close streaming table
-    createTable(tableName = "stream_table_close", streaming = true, withBatchLoad = false)
-    createTable(tableName = "stream_table_close_auto_handoff", streaming = true, withBatchLoad
= false)
-
     // 17. reopen streaming table after close
+    // 9. create new stream segment if current stream segment is full
     createTable(tableName = "stream_table_reopen", streaming = true, withBatchLoad = false)
 
     // 18. block drop table while streaming is in progress
@@ -193,30 +165,15 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
   def dropTable(): Unit = {
     sql("drop table if exists streaming.batch_table")
-    sql("drop table if exists streaming.stream_table_socket")
     sql("drop table if exists streaming.stream_table_file")
-    sql("drop table if exists streaming.bad_record_force")
     sql("drop table if exists streaming.bad_record_fail")
     sql("drop table if exists streaming.stream_table_1s")
-    sql("drop table if exists streaming.stream_table_10s")
-    sql("drop table if exists streaming.stream_table_batch")
-    sql("drop table if exists streaming.stream_table_scan")
-    sql("drop table if exists streaming.stream_table_scan_complex")
-    sql("drop table if exists streaming.stream_table_filter")
+    sql("drop table if exists streaming.stream_table_filter ")
     sql("drop table if exists streaming.stream_table_filter_complex")
-    sql("drop table if exists streaming.stream_table_agg")
-    sql("drop table if exists streaming.stream_table_agg_complex")
-    sql("drop table if exists streaming.stream_table_compact")
-    sql("drop table if exists streaming.stream_table_new")
     sql("drop table if exists streaming.stream_table_tolerant")
     sql("drop table if exists streaming.stream_table_delete_id")
     sql("drop table if exists streaming.stream_table_delete_date")
-    sql("drop table if exists streaming.stream_table_alter")
     sql("drop table if exists streaming.stream_table_handoff")
-    sql("drop table if exists streaming.stream_table_finish")
-    sql("drop table if exists streaming.stream_table_auto_handoff")
-    sql("drop table if exists streaming.stream_table_close")
-    sql("drop table if exists streaming.stream_table_close_auto_handoff")
     sql("drop table if exists streaming.stream_table_reopen")
     sql("drop table if exists streaming.stream_table_drop")
     sql("drop table if exists streaming.agg_table_block")
@@ -254,26 +211,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     }
   }
 
-  // input source: socket
-  test("streaming ingest from socket source") {
-    executeStreamingIngest(
-      tableName = "stream_table_socket",
-      batchNums = 2,
-      rowNumsEachBatch = 10,
-      intervalOfSource = 1,
-      intervalOfIngest = 1,
-      continueSeconds = 10,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      autoHandoff = false
-    )
-
-    checkAnswer(
-      sql("select count(*) from streaming.stream_table_socket"),
-      Seq(Row(25))
-    )
-  }
-
   // input source: file
   test("streaming ingest from file source") {
     val identifier = new TableIdentifier("stream_table_file", Option("streaming"))
@@ -288,7 +225,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     thread.start()
     Thread.sleep(2000)
     generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
-    Thread.sleep(10000)
+    Thread.sleep(5000)
     thread.interrupt()
     checkAnswer(
       sql("select count(*) from streaming.stream_table_file"),
@@ -297,25 +234,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
   }
 
   // bad records
-  test("streaming table with bad records action: force") {
-    executeStreamingIngest(
-      tableName = "bad_record_force",
-      batchNums = 2,
-      rowNumsEachBatch = 10,
-      intervalOfSource = 1,
-      intervalOfIngest = 1,
-      continueSeconds = 10,
-      generateBadRecords = true,
-      badRecordAction = "force",
-      autoHandoff = false
-    )
-    checkAnswer(
-      sql("select count(*) from streaming.stream_table_socket"),
-      Seq(Row(25))
-    )
-
-  }
-
   test("streaming table with bad records action: fail") {
     executeStreamingIngest(
       tableName = "bad_record_fail",
@@ -323,138 +241,58 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       rowNumsEachBatch = 10,
       intervalOfSource = 1,
       intervalOfIngest = 1,
-      continueSeconds = 10,
+      continueSeconds = 8,
       generateBadRecords = true,
       badRecordAction = "fail",
       autoHandoff = false
     )
     val result = sql("select count(*) from streaming.bad_record_fail").collect()
-    assert(result(0).getLong(0) < 25)
+    assert(result(0).getLong(0) < 10 + 5)
   }
 
   // ingest with different interval
   test("1 row per 1 second interval") {
     executeStreamingIngest(
       tableName = "stream_table_1s",
-      batchNums = 20,
+      batchNums = 3,
       rowNumsEachBatch = 1,
       intervalOfSource = 1,
       intervalOfIngest = 1,
-      continueSeconds = 20,
+      continueSeconds = 6,
       generateBadRecords = false,
       badRecordAction = "force",
       autoHandoff = false
     )
     val result = sql("select count(*) from streaming.stream_table_1s").collect()
     // 20 seconds can't ingest all data, exists data delay
-    assert(result(0).getLong(0) > 5 + 10)
+    assert(result(0).getLong(0) > 5)
   }
 
-  test("10000 row per 10 seconds interval") {
+  test("query on stream table with dictionary, sort_columns") {
     executeStreamingIngest(
-      tableName = "stream_table_10s",
-      batchNums = 5,
-      rowNumsEachBatch = 10000,
+      tableName = "stream_table_filter",
+      batchNums = 2,
+      rowNumsEachBatch = 25,
       intervalOfSource = 5,
-      intervalOfIngest = 10,
-      continueSeconds = 50,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      autoHandoff = false
-    )
-    checkAnswer(
-      sql("select count(*) from streaming.stream_table_10s"),
-      Seq(Row(5 + 10000 * 5)))
-  }
-
-  // batch loading on streaming table
-  test("streaming table execute batch loading") {
-    executeStreamingIngest(
-      tableName = "stream_table_batch",
-      batchNums = 5,
-      rowNumsEachBatch = 100,
-      intervalOfSource = 3,
       intervalOfIngest = 5,
-      continueSeconds = 30,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      autoHandoff = false
-    )
-    checkAnswer(
-      sql("select count(*) from streaming.stream_table_batch"),
-      Seq(Row(100 * 5)))
-
-    executeBatchLoad("stream_table_batch")
-
-    checkAnswer(
-      sql("select count(*) from streaming.stream_table_batch"),
-      Seq(Row(100 * 5 + 5)))
-  }
-
-  // detail query on batch and stream segment
-  test("non-filter query on stream table with dictionary, sort_columns") {
-    executeStreamingIngest(
-      tableName = "stream_table_scan",
-      batchNums = 5,
-      rowNumsEachBatch = 10,
-      intervalOfSource = 2,
-      intervalOfIngest = 4,
-      continueSeconds = 20,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      autoHandoff = false
-    )
-
-    val result = sql("select * from streaming.stream_table_scan order by id").collect()
-    assert(result != null)
-    assert(result.length == 55)
-    // check one row of streaming data
-    assert(result(0).getInt(0) == 1)
-    assert(result(0).getString(1) == "name_1")
-    // check one row of batch loading
-    assert(result(50).getInt(0) == 100000001)
-    assert(result(50).getString(1) == "batch_1")
-  }
-
-  test("non-filter query on stream table with dictionary, sort_columns and complex column")
{
-    executeStreamingIngest(
-      tableName = "stream_table_scan_complex",
-      batchNums = 5,
-      rowNumsEachBatch = 10,
-      intervalOfSource = 2,
-      intervalOfIngest = 4,
       continueSeconds = 20,
-      generateBadRecords = false,
+      generateBadRecords = true,
       badRecordAction = "force",
       autoHandoff = false
     )
 
-    val result = sql("select * from streaming.stream_table_scan_complex order by id").collect()
+    // non-filter
+    val result = sql("select * from streaming.stream_table_filter order by id").collect()
     assert(result != null)
     assert(result.length == 55)
     // check one row of streaming data
-    assert(result(0).getInt(0) == 1)
-    assert(result(0).getString(1) == "name_1")
-    assert(result(0).getStruct(4).getInt(1) == 1)
+    assert(result(1).getInt(0) == 1)
+    assert(result(1).getString(1) == "name_1")
     // check one row of batch loading
     assert(result(50).getInt(0) == 100000001)
     assert(result(50).getString(1) == "batch_1")
-    assert(result(50).getStruct(4).getInt(1) == 20)
-  }
-
-  test("filter query on stream table with dictionary, sort_columns") {
-    executeStreamingIngest(
-      tableName = "stream_table_filter",
-      batchNums = 5,
-      rowNumsEachBatch = 10,
-      intervalOfSource = 2,
-      intervalOfIngest = 4,
-      continueSeconds = 20,
-      generateBadRecords = true,
-      badRecordAction = "force",
-      autoHandoff = false
-    )
 
+    // filter
     checkAnswer(
       sql("select * from stream_table_filter where id = 1"),
       Seq(Row(1, "name_1", "city_1", 10000.0)))
@@ -481,21 +319,77 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       sql("select * from stream_table_filter where city = ''"),
       Seq(Row(2, "name_2", "", 20000.0)))
 
+    // agg
+    checkAnswer(
+      sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
+          "from stream_table_filter where id >= 2 and id <= 100000004"),
+      Seq(Row(52, 100000004, "batch_1", 7692332, 400001278)))
+
+    checkAnswer(
+      sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
+          "max(salary), min(salary) " +
+          "from stream_table_filter " +
+          "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3')
" +
+          "and city <> '' " +
+          "group by city " +
+          "order by city"),
+      Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1),
+        Row("city_2", 1, 100000002, 100000002, 0.2, 0.2),
+        Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3)))
+
+    // batch loading
+    for(_ <- 0 to 2) {
+      executeBatchLoad("stream_table_filter")
+    }
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_filter"),
+      Seq(Row(25 * 2 + 5 + 5 * 3)))
+
+    sql("alter table streaming.stream_table_filter compact 'minor'")
+    Thread.sleep(5000)
+    val result1 = sql("show segments for table streaming.stream_table_filter").collect()
+    result1.foreach { row =>
+      if (row.getString(0).equals("1")) {
+        assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
+        assertResult(FileFormat.ROW_V1.toString)(row.getString(5))
+      } else if (row.getString(0).equals("0.1")) {
+        assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1))
+        assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+      } else {
+        assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1))
+        assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+      }
+    }
+
   }
 
-  test("filter query on stream table with dictionary, sort_columns and complex column") {
+  test("query on stream table with dictionary, sort_columns and complex column") {
     executeStreamingIngest(
       tableName = "stream_table_filter_complex",
-      batchNums = 5,
-      rowNumsEachBatch = 10,
-      intervalOfSource = 2,
-      intervalOfIngest = 4,
+      batchNums = 2,
+      rowNumsEachBatch = 25,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
       continueSeconds = 20,
       generateBadRecords = true,
       badRecordAction = "force",
       autoHandoff = false
     )
 
+    // non-filter
+    val result = sql("select * from streaming.stream_table_filter_complex order by id").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(0).isNullAt(0))
+    assert(result(0).getString(1) == "name_6")
+    assert(result(0).getStruct(4).getInt(1) == 6)
+    // check one row of batch loading
+    assert(result(50).getInt(0) == 100000001)
+    assert(result(50).getString(1) == "batch_1")
+    assert(result(50).getStruct(4).getInt(1) == 20)
+
+    // filter
     checkAnswer(
       sql("select * from stream_table_filter_complex where id = 1"),
       Seq(Row(1, "name_1", "city_1", 10000.0, Row(wrap(Array("school_1", "school_11")), 1))))
@@ -526,62 +420,16 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       sql("select * from stream_table_filter_complex where city = ''"),
       Seq(Row(2, "name_2", "", 20000.0, Row(wrap(Array("school_2", "school_22")), 2))))
 
-  }
-
-  // aggregation
-  test("aggregation query") {
-    executeStreamingIngest(
-      tableName = "stream_table_agg",
-      batchNums = 5,
-      rowNumsEachBatch = 10,
-      intervalOfSource = 2,
-      intervalOfIngest = 4,
-      continueSeconds = 20,
-      generateBadRecords = true,
-      badRecordAction = "force",
-      autoHandoff = false
-    )
-
-    checkAnswer(
-      sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
-          "from stream_table_agg where id >= 2 and id <= 100000004"),
-      Seq(Row(52, 100000004, "batch_1", 7692332, 400001278)))
-
-    checkAnswer(
-      sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
-          "max(salary), min(salary) " +
-          "from stream_table_agg " +
-          "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3')
" +
-          "and city <> '' " +
-          "group by city " +
-          "order by city"),
-      Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1),
-        Row("city_2", 1, 100000002, 100000002, 0.2, 0.2),
-        Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3)))
-  }
-
-  test("aggregation query with complex") {
-    executeStreamingIngest(
-      tableName = "stream_table_agg_complex",
-      batchNums = 5,
-      rowNumsEachBatch = 10,
-      intervalOfSource = 2,
-      intervalOfIngest = 4,
-      continueSeconds = 20,
-      generateBadRecords = true,
-      badRecordAction = "force",
-      autoHandoff = false
-    )
-
+    // agg
     checkAnswer(
       sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age)
" +
-          "from stream_table_agg_complex where id >= 2 and id <= 100000004"),
+          "from stream_table_filter_complex where id >= 2 and id <= 100000004"),
       Seq(Row(52, 100000004, "batch_1", 27, 1408)))
 
     checkAnswer(
       sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " +
           "max(salary), min(salary) " +
-          "from stream_table_agg_complex " +
+          "from stream_table_filter_complex " +
           "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3')
" +
           "and city <> '' " +
           "group by city " +
@@ -591,70 +439,19 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
         Row("city_3", 2, 100000006, 21, 30000.0, 0.3)))
   }
 
-  // compaction
-  test("test compaction on stream table") {
-    executeStreamingIngest(
-      tableName = "stream_table_compact",
-      batchNums = 5,
-      rowNumsEachBatch = 10,
-      intervalOfSource = 2,
-      intervalOfIngest = 4,
-      continueSeconds = 20,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      autoHandoff = false
-    )
-    for (_ <- 0 to 3) {
-      executeBatchLoad("stream_table_compact")
-    }
-
-    sql("alter table streaming.stream_table_compact compact 'minor'")
-
-    val result = sql("show segments for table streaming.stream_table_compact").collect()
-    result.foreach { row =>
-      if (row.getString(0).equals("1")) {
-        assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
-        assertResult(FileFormat.ROW_V1.toString)(row.getString(5))
-      }
-    }
-  }
-
-  // stream segment max size
-  test("create new stream segment if current stream segment is full") {
-    executeStreamingIngest(
-      tableName = "stream_table_new",
-      batchNums = 6,
-      rowNumsEachBatch = 10000,
-      intervalOfSource = 5,
-      intervalOfIngest = 10,
-      continueSeconds = 40,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      handoffSize = 1024L * 200,
-      autoHandoff = false
-    )
-    assert(sql("show segments for table streaming.stream_table_new").count() > 1)
-
-    checkAnswer(
-      sql("select count(*) from streaming.stream_table_new"),
-      Seq(Row(5 + 10000 * 6))
-    )
-  }
-
   test("test deleting streaming segment by ID while ingesting") {
     executeStreamingIngest(
       tableName = "stream_table_delete_id",
-      batchNums = 6,
-      rowNumsEachBatch = 10000,
-      intervalOfSource = 3,
+      batchNums = 3,
+      rowNumsEachBatch = 100,
+      intervalOfSource = 5,
       intervalOfIngest = 5,
-      continueSeconds = 20,
+      continueSeconds = 18,
       generateBadRecords = false,
       badRecordAction = "force",
-      handoffSize = 1024L * 200,
+      handoffSize = 1,
       autoHandoff = false
     )
-    Thread.sleep(10000)
     val beforeDelete = sql("show segments for table streaming.stream_table_delete_id").collect()
     val segmentIds1 = beforeDelete.filter(_.getString(1).equals("Streaming")).map(_.getString(0)).mkString(",")
     val msg = intercept[Exception] {
@@ -674,17 +471,16 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
   test("test deleting streaming segment by date while ingesting") {
     executeStreamingIngest(
       tableName = "stream_table_delete_date",
-      batchNums = 6,
-      rowNumsEachBatch = 10000,
-      intervalOfSource = 3,
+      batchNums = 3,
+      rowNumsEachBatch = 100,
+      intervalOfSource = 5,
       intervalOfIngest = 5,
-      continueSeconds = 20,
+      continueSeconds = 18,
       generateBadRecords = false,
       badRecordAction = "force",
-      handoffSize = 1024L * 200,
+      handoffSize = 1,
       autoHandoff = false
     )
-    Thread.sleep(10000)
     val beforeDelete = sql("show segments for table streaming.stream_table_delete_date").collect()
     sql(s"delete from table streaming.stream_table_delete_date where segment.starttime before
" +
         s"'2999-10-01 01:00:00'")
@@ -696,230 +492,99 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     }
   }
 
-  test("reject alter streaming properties") {
+  test("reject alter streaming properties and handoff 'streaming finish' segment to columnar
segment") {
     try {
-      sql("ALTER TABLE streaming.stream_table_alter UNSET TBLPROPERTIES IF EXISTS ('streaming')")
+      sql("ALTER TABLE streaming.stream_table_handoff UNSET TBLPROPERTIES IF EXISTS ('streaming')")
       assert(false, "unsupport to unset streaming property")
     } catch {
       case _ =>
         assert(true)
     }
     try {
-      sql("ALTER TABLE streaming.stream_table_alter SET TBLPROPERTIES('streaming'='true')")
+      sql("ALTER TABLE streaming.stream_table_handoff SET TBLPROPERTIES('streaming'='true')")
       executeStreamingIngest(
-        tableName = "stream_table_alter",
-        batchNums = 6,
-        rowNumsEachBatch = 10000,
+        tableName = "stream_table_handoff",
+        batchNums = 2,
+        rowNumsEachBatch = 100,
         intervalOfSource = 5,
-        intervalOfIngest = 10,
-        continueSeconds = 40,
+        intervalOfIngest = 5,
+        continueSeconds = 20,
         generateBadRecords = false,
         badRecordAction = "force",
-        handoffSize = 1024L * 200,
+        handoffSize = 1L,
         autoHandoff = false
       )
-      checkAnswer(
-        sql("select count(*) from streaming.stream_table_alter"),
-        Seq(Row(6 * 10000))
-      )
     } catch {
       case _ =>
         assert(false, "should support set table to streaming")
     }
-
-    try {
-      sql("ALTER TABLE stream_table_alter SET TBLPROPERTIES('streaming'='false')")
-      assert(false, "unsupport disable streaming properties")
-    } catch {
-      case _ =>
-        assert(true)
-    }
-  }
-
-  test("handoff 'streaming finish' segment to columnar segment") {
-    executeStreamingIngest(
-      tableName = "stream_table_handoff",
-      batchNums = 6,
-      rowNumsEachBatch = 10000,
-      intervalOfSource = 5,
-      intervalOfIngest = 10,
-      continueSeconds = 40,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      handoffSize = 1024L * 200,
-      autoHandoff = false
-    )
     val segments = sql("show segments for table streaming.stream_table_handoff").collect()
-    assert(segments.length == 3 || segments.length == 4)
+    assert(segments.length == 2 || segments.length == 3)
     assertResult("Streaming")(segments(0).getString(1))
-    (1 to segments.length - 1).foreach { index =>
-      assertResult("Streaming Finish")(segments(index).getString(1))
-    }
+    assertResult("Streaming Finish")(segments(1).getString(1))
     checkAnswer(
       sql("select count(*) from streaming.stream_table_handoff"),
-      Seq(Row(6 * 10000))
+      Seq(Row(2 * 100))
     )
 
     sql("alter table streaming.stream_table_handoff compact 'streaming'")
-    Thread.sleep(10000)
+    Thread.sleep(5000)
     val newSegments = sql("show segments for table streaming.stream_table_handoff").collect()
-    assertResult(5)(newSegments.length)
-    assertResult("Success")(newSegments(0).getString(1))
-    assertResult("Success")(newSegments(1).getString(1))
-    assertResult("Streaming")(newSegments(2).getString(1))
-    assertResult("Compacted")(newSegments(3).getString(1))
-    assertResult("Compacted")(newSegments(4).getString(1))
-    checkAnswer(
-      sql("select count(*) from streaming.stream_table_handoff"),
-      Seq(Row(6 * 10000))
-    )
-  }
-
-  test("auto handoff 'streaming finish' segment to columnar segment") {
-    executeStreamingIngest(
-      tableName = "stream_table_auto_handoff",
-      batchNums = 6,
-      rowNumsEachBatch = 10000,
-      intervalOfSource = 5,
-      intervalOfIngest = 10,
-      continueSeconds = 40,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      handoffSize = 1024L * 200,
-      autoHandoff = true
-    )
-    Thread.sleep(10000)
-    val segments = sql("show segments for table streaming.stream_table_auto_handoff").collect()
-    assertResult(5)(segments.length)
-    assertResult(2)(segments.filter(_.getString(1).equals("Success")).length)
-    assertResult(2)(segments.filter(_.getString(1).equals("Compacted")).length)
-    assertResult(1)(segments.filter(_.getString(1).equals("Streaming")).length)
-    checkAnswer(
-      sql("select count(*) from streaming.stream_table_auto_handoff"),
-      Seq(Row(6 * 10000))
-    )
-  }
-
-  test("alter table finish streaming") {
-    executeStreamingIngest(
-      tableName = "stream_table_finish",
-      batchNums = 6,
-      rowNumsEachBatch = 10000,
-      intervalOfSource = 5,
-      intervalOfIngest = 10,
-      continueSeconds = 40,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      handoffSize = 1024L * 200,
-      autoHandoff = false
-    )
-    sql("alter table streaming.stream_table_finish finish streaming")
-
-    val segments = sql("show segments for table streaming.stream_table_finish").collect()
-    assert(segments.length == 4 || segments.length == 5)
-    (0 to segments.length -2).foreach { index =>
-      assertResult("Streaming Finish")(segments(index).getString(1))
+    assert(newSegments.length == 3 || newSegments.length == 5)
+    assertResult("Streaming")(newSegments((newSegments.length - 1) / 2).getString(1))
+    (0 until (newSegments.length - 1) / 2).foreach{ i =>
+      assertResult("Success")(newSegments(i).getString(1))
+    }
+    ((newSegments.length + 1) / 2 until newSegments.length).foreach{ i =>
+      assertResult("Compacted")(newSegments(i).getString(1))
     }
-    assertResult("Success")(segments(segments.length - 1).getString(1))
-    checkAnswer(
-      sql("select count(*) from streaming.stream_table_finish"),
-      Seq(Row(5 + 6 * 10000))
-    )
-  }
-
-  test("alter table close streaming") {
-    executeStreamingIngest(
-      tableName = "stream_table_close",
-      batchNums = 6,
-      rowNumsEachBatch = 10000,
-      intervalOfSource = 5,
-      intervalOfIngest = 10,
-      continueSeconds = 40,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      handoffSize = 1024L * 200,
-      autoHandoff = false
-    )
-
-    val table1 = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close")(spark)
-    assertResult(true)(table1.isStreamingTable)
-    sql("alter table streaming.stream_table_close compact 'close_streaming'")
-
-    val segments = sql("show segments for table streaming.stream_table_close").collect()
-    assertResult(6)(segments.length)
-    assertResult(3)(segments.filter(_.getString(1).equals("Success")).length)
-    assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length)
-    checkAnswer(
-      sql("select count(*) from streaming.stream_table_close"),
-      Seq(Row(6 * 10000))
-    )
-    val table2 = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close")(spark)
-    assertResult(false)(table2.isStreamingTable)
-  }
-
-  test("alter table close streaming with auto handoff") {
-    executeStreamingIngest(
-      tableName = "stream_table_close_auto_handoff",
-      batchNums = 6,
-      rowNumsEachBatch = 10000,
-      intervalOfSource = 5,
-      intervalOfIngest = 10,
-      continueSeconds = 40,
-      generateBadRecords = false,
-      badRecordAction = "force",
-      handoffSize = 1024L * 200,
-      autoHandoff = true
-    )
-    Thread.sleep(10000)
 
-    val table1 =
-      CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close_auto_handoff")(spark)
-    assertResult(true)(table1.isStreamingTable)
+    sql("alter table streaming.stream_table_handoff finish streaming")
+    val newSegments1 = sql("show segments for table streaming.stream_table_handoff").collect()
+    assertResult("Streaming Finish")(newSegments1((newSegments.length - 1) / 2).getString(1))
 
-    sql("alter table streaming.stream_table_close_auto_handoff compact 'close_streaming'")
-    val segments =
-      sql("show segments for table streaming.stream_table_close_auto_handoff").collect()
-    assertResult(6)(segments.length)
-    assertResult(3)(segments.filter(_.getString(1).equals("Success")).length)
-    assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length)
     checkAnswer(
-      sql("select count(*) from streaming.stream_table_close_auto_handoff"),
-      Seq(Row(6 * 10000))
+      sql("select count(*) from streaming.stream_table_handoff"),
+      Seq(Row(2 * 100))
     )
 
-    val table2 =
-      CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close_auto_handoff")(spark)
-    assertResult(false)(table2.isStreamingTable)
+    try {
+      sql("ALTER TABLE stream_table_handoff SET TBLPROPERTIES('streaming'='false')")
+      assert(false, "unsupport disable streaming properties")
+    } catch {
+      case _ =>
+        assert(true)
+    }
   }
 
-  test("reopen streaming table") {
+  test("auto hand off, close and reopen streaming table") {
     executeStreamingIngest(
       tableName = "stream_table_reopen",
-      batchNums = 6,
-      rowNumsEachBatch = 10000,
+      batchNums = 2,
+      rowNumsEachBatch = 100,
       intervalOfSource = 5,
-      intervalOfIngest = 10,
-      continueSeconds = 40,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
       generateBadRecords = false,
       badRecordAction = "force",
-      handoffSize = 1024L * 200,
-      autoHandoff = true
+      handoffSize = 1L,
+      autoHandoff = false
     )
-    Thread.sleep(10000)
-
     val table1 =
       CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark)
     assertResult(true)(table1.isStreamingTable)
 
     sql("alter table streaming.stream_table_reopen compact 'close_streaming'")
+
     val segments =
       sql("show segments for table streaming.stream_table_reopen").collect()
-    assertResult(6)(segments.length)
-    assertResult(3)(segments.filter(_.getString(1).equals("Success")).length)
-    assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length)
+    assert(segments.length == 4 || segments.length == 6)
+    assertResult(segments.length / 2)(segments.filter(_.getString(1).equals("Success")).length)
+    assertResult(segments.length / 2)(segments.filter(_.getString(1).equals("Compacted")).length)
+
     checkAnswer(
       sql("select count(*) from streaming.stream_table_reopen"),
-      Seq(Row(6 * 10000))
+      Seq(Row(2 * 100))
     )
 
     val table2 =
@@ -934,21 +599,34 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     executeStreamingIngest(
       tableName = "stream_table_reopen",
-      batchNums = 6,
-      rowNumsEachBatch = 10000,
+      batchNums = 2,
+      rowNumsEachBatch = 100,
       intervalOfSource = 5,
-      intervalOfIngest = 10,
-      continueSeconds = 40,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
       generateBadRecords = false,
       badRecordAction = "force",
-      handoffSize = 1024L * 200,
+      handoffSize = 1L,
       autoHandoff = true
     )
     Thread.sleep(10000)
+    val newSegments1 =
+      sql("show segments for table streaming.stream_table_reopen").collect()
+    assert(newSegments1.length == 7 || newSegments1.length == 9 || newSegments1.length ==
11)
+    assertResult(1)(newSegments1.filter(_.getString(1).equals("Streaming")).length)
+    assertResult((newSegments1.length - 1) / 2)(newSegments1.filter(_.getString(1).equals("Success")).length)
+    assertResult((newSegments1.length - 1) / 2)(newSegments1.filter(_.getString(1).equals("Compacted")).length)
+
+    sql("alter table streaming.stream_table_reopen compact 'close_streaming'")
+    val newSegments =
+      sql("show segments for table streaming.stream_table_reopen").collect()
+    assert(newSegments.length == 8 || newSegments.length == 10 || newSegments.length == 12)
+    assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Success")).length)
+    assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Compacted")).length)
 
     checkAnswer(
       sql("select count(*) from streaming.stream_table_reopen"),
-      Seq(Row(6 * 10000 * 2))
+      Seq(Row(2 * 100 * 2))
     )
   }
 
@@ -960,7 +638,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     var server: ServerSocket = null
     try {
       server = getServerSocket
-      val thread1 = createWriteSocketThread(server, 2, 10, 5)
+      val thread1 = createWriteSocketThread(server, 2, 10, 3)
       val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier,
"force", 5, 1024L * 200, false)
       thread1.start()
       thread2.start()
@@ -1021,30 +699,33 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
         var index = 0
         for (_ <- 1 to writeNums) {
           // write 5 records per iteration
+          val stringBuilder = new StringBuilder()
           for (_ <- 1 to rowNums) {
             index = index + 1
             if (badRecords) {
               if (index == 2) {
                 // null value
-                socketWriter.println(index.toString + ",name_" + index
+                stringBuilder.append(index.toString + ",name_" + index
                                      + ",," + (10000.00 * index).toString +
                                      ",school_" + index + ":school_" + index + index + "$"
+ index)
               } else if (index == 6) {
                 // illegal number
-                socketWriter.println(index.toString + "abc,name_" + index
+                stringBuilder.append(index.toString + "abc,name_" + index
                                      + ",city_" + index + "," + (10000.00 * index).toString
+
                                      ",school_" + index + ":school_" + index + index + "$"
+ index)
               } else {
-                socketWriter.println(index.toString + ",name_" + index
+                stringBuilder.append(index.toString + ",name_" + index
                                      + ",city_" + index + "," + (10000.00 * index).toString
+
                                      ",school_" + index + ":school_" + index + index + "$"
+ index)
               }
             } else {
-              socketWriter.println(index.toString + ",name_" + index
+              stringBuilder.append(index.toString + ",name_" + index
                                    + ",city_" + index + "," + (10000.00 * index).toString
+
                                    ",school_" + index + ":school_" + index + index + "$"
+ index)
             }
+            stringBuilder.append("\n")
           }
+          socketWriter.append(stringBuilder.toString())
           socketWriter.flush()
           Thread.sleep(1000 * intervalSecond)
         }
@@ -1137,8 +818,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       thread1.start()
       thread2.start()
       Thread.sleep(continueSeconds * 1000)
-      thread1.interrupt()
       thread2.interrupt()
+      thread1.interrupt()
     } finally {
       if (null != server) {
         server.close()


Mime
View raw message