carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-2261] Support Set segment command for Streaming Table
Date Wed, 21 Mar 2018 14:54:21 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 4b113b865 -> 65b69a9cd


[CARBONDATA-2261] Support Set segment command for Streaming Table

Problem Statement: Currently Set Segment is not supported for Streaming segments.
This PR is to support Set segment command for "Streaming " ,"Streaming Finished" segments

This closes #2075


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/65b69a9c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/65b69a9c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/65b69a9c

Branch: refs/heads/master
Commit: 65b69a9cd902168ffec16cda9d84457d08bae22c
Parents: 4b113b8
Author: BJangir <babulaljangir111@gmail.com>
Authored: Sun Mar 18 23:00:30 2018 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Wed Mar 21 20:27:16 2018 +0530

----------------------------------------------------------------------
 .../hadoop/api/CarbonTableInputFormat.java      |  3 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  4 +++
 .../TestStreamingTableOperation.scala           | 30 ++++++++++++++++++++
 3 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/65b69a9c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index bcc487e..11121e9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -358,6 +358,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     if (getValidateSegmentsToAccess(job.getConfiguration())) {
       List<Segment> validSegments = segments.getValidSegments();
       streamSegments = segments.getStreamSegments();
+      streamSegments = getFilteredSegment(job,streamSegments);
       if (validSegments.size() == 0) {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       }
@@ -366,7 +367,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
 
       List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments());
       if (filteredSegmentToAccess.size() == 0) {
-        return new ArrayList<>(0);
+        return getSplitsOfStreaming(job, identifier, streamSegments);
       } else {
         setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/65b69a9c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 49a8023..b5c9543 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -130,6 +130,10 @@ class CarbonScanRDD(
       if (batchPartitions.isEmpty) {
         streamPartitions.toArray
       } else {
+        logInfo(
+          s"""
+             | Identified no.of Streaming Blocks: ${streamPartitions.size},
+          """.stripMargin)
         // should keep the order by index of partition
         batchPartitions.appendAll(streamPartitions)
         batchPartitions.toArray

http://git-wip-us.apache.org/repos/asf/carbondata/blob/65b69a9c/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index aa00d07..00e1140 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -997,6 +997,36 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       sql("select count(*) from streaming.stream_table_handoff"),
       Seq(Row(2 * 100))
     )
+    try{
+      sql("set carbon.input.segments.streaming.stream_table_handoff = 1")
+      checkAnswer(
+        sql("select count(*) from streaming.stream_table_handoff"),
+        Seq(Row(100))
+      )
+      sql("set carbon.input.segments.streaming.stream_table_handoff = *")
+      checkAnswer(
+        sql("select count(*) from streaming.stream_table_handoff"),
+        Seq(Row(2 * 100))
+      )
+      sql("set carbon.input.segments.streaming.stream_table_handoff = 2")
+      checkAnswer(
+        sql("select count(*) from streaming.stream_table_handoff"),
+        Seq(Row(1 * 100))
+      )
+      sql("set carbon.input.segments.streaming.stream_table_handoff = 1,2")
+      checkAnswer(
+        sql("select count(*) from streaming.stream_table_handoff"),
+        Seq(Row(2 * 100))
+      )
+      sql("set carbon.input.segments.streaming.stream_table_handoff = 3")
+      checkAnswer(
+        sql("select count(*) from streaming.stream_table_handoff"),
+        Seq(Row(0))
+      )
+    }
+    finally {
+      sql("set carbon.input.segments.streaming.stream_table_handoff = *")
+    }
   }
 
   test("auto hand off, close and reopen streaming table") {


Mime
View raw message