carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-2261] Support Set segment command for Streaming Table
Date Wed, 21 Mar 2018 15:19:38 GMT
Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 87de69729 -> 53200ccff


[CARBONDATA-2261] Support Set segment command for Streaming Table

Problem Statement: Currently Set Segment is not supported for Streaming segments.
This PR is to support Set segment command for "Streaming " ,"Streaming Finished" segments

This closes #2075


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/53200ccf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/53200ccf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/53200ccf

Branch: refs/heads/branch-1.3
Commit: 53200ccffa65b11a7be4dbfa86794e4657126619
Parents: 87de697
Author: BJangir <babulaljangir111@gmail.com>
Authored: Sun Mar 18 23:00:30 2018 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Wed Mar 21 20:39:55 2018 +0530

----------------------------------------------------------------------
 .../hadoop/api/CarbonTableInputFormat.java      |  3 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  4 +++
 .../TestStreamingTableOperation.scala           | 30 ++++++++++++++++++++
 3 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/53200ccf/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index f6624cd..2d6c03a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -364,6 +364,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     if (getValidateSegmentsToAccess(job.getConfiguration())) {
       List<Segment> validSegments = segments.getValidSegments();
       streamSegments = segments.getStreamSegments();
+      streamSegments = getFilteredSegment(job,streamSegments);
       if (validSegments.size() == 0) {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       }
@@ -372,7 +373,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
 
       List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments());
       if (filteredSegmentToAccess.size() == 0) {
-        return new ArrayList<>(0);
+        return getSplitsOfStreaming(job, identifier, streamSegments);
       } else {
         setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/53200ccf/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 772f702..a62f60a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -130,6 +130,10 @@ class CarbonScanRDD(
       if (batchPartitions.isEmpty) {
         streamPartitions.toArray
       } else {
+        logInfo(
+          s"""
+             | Identified no.of Streaming Blocks: ${streamPartitions.size},
+          """.stripMargin)
         // should keep the order by index of partition
         batchPartitions.appendAll(streamPartitions)
         batchPartitions.toArray

http://git-wip-us.apache.org/repos/asf/carbondata/blob/53200ccf/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index a7dfabd..53edb9d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -994,6 +994,36 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       sql("select count(*) from streaming.stream_table_handoff"),
       Seq(Row(2 * 100))
     )
+    try{
+      sql("set carbon.input.segments.streaming.stream_table_handoff = 1")
+      checkAnswer(
+        sql("select count(*) from streaming.stream_table_handoff"),
+        Seq(Row(100))
+      )
+      sql("set carbon.input.segments.streaming.stream_table_handoff = *")
+      checkAnswer(
+        sql("select count(*) from streaming.stream_table_handoff"),
+        Seq(Row(2 * 100))
+      )
+      sql("set carbon.input.segments.streaming.stream_table_handoff = 2")
+      checkAnswer(
+        sql("select count(*) from streaming.stream_table_handoff"),
+        Seq(Row(1 * 100))
+      )
+      sql("set carbon.input.segments.streaming.stream_table_handoff = 1,2")
+      checkAnswer(
+        sql("select count(*) from streaming.stream_table_handoff"),
+        Seq(Row(2 * 100))
+      )
+      sql("set carbon.input.segments.streaming.stream_table_handoff = 3")
+      checkAnswer(
+        sql("select count(*) from streaming.stream_table_handoff"),
+        Seq(Row(0))
+      )
+    }
+    finally {
+      sql("set carbon.input.segments.streaming.stream_table_handoff = *")
+    }
 
     try {
       sql("ALTER TABLE stream_table_handoff SET TBLPROPERTIES('streaming'='false')")


Mime
View raw message