carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-1818] Make carbon.streaming.segment.max.size as configurable
Date Tue, 28 Nov 2017 08:17:35 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 4c4814854 -> 0f407de8c


[CARBONDATA-1818] Make carbon.streaming.segment.max.size as configurable

Make carbon.streaming.segment.max.size as configurable

This closes #1577


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0f407de8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0f407de8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0f407de8

Branch: refs/heads/master
Commit: 0f407de8c95ac908a877c9830408c049da8705b9
Parents: 4c48148
Author: QiangCai <qiangcai@qq.com>
Authored: Tue Nov 28 10:24:48 2017 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue Nov 28 16:17:23 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 16 ++++++++
 .../carbondata/core/util/CarbonProperties.java  | 41 ++++++++++++++++++++
 .../core/CarbonPropertiesValidationTest.java    |  7 ++++
 .../streaming/CarbonStreamOutputFormat.java     | 16 --------
 .../TestStreamingTableOperation.scala           |  7 ++--
 .../streaming/StreamSinkFactory.scala           | 10 ++---
 .../streaming/CarbonAppendableStreamSink.scala  |  7 +++-
 7 files changed, 78 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0f407de8/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 4046538..fc20e6d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1412,6 +1412,22 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_SKIP_EMPTY_LINE_DEFAULT = "false";
 
+  /**
+   * if the byte size of streaming segment reach this value,
+   * the system will create a new stream segment
+   */
+  public static final String HANDOFF_SIZE = "carbon.streaming.segment.max.size";
+
+  /**
+   * the min handoff size of streaming segment, the unit is byte
+   */
+  public static final long HANDOFF_SIZE_MIN = 1024L * 1024 * 64;
+
+  /**
+   * the default handoff size of streaming segment, the unit is byte
+   */
+  public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0f407de8/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 436950b..daad410 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -105,6 +105,7 @@ public final class CarbonProperties {
     validateEnableVectorReader();
     validateLockType();
     validateCarbonCSVReadBufferSizeByte();
+    validateHandoffSize();
   }
 
   private void validateCarbonCSVReadBufferSizeByte() {
@@ -257,6 +258,31 @@ public final class CarbonProperties {
     }
   }
 
+  private void validateHandoffSize() {
+    String handoffSizeStr = carbonProperties.getProperty(CarbonCommonConstants.HANDOFF_SIZE);
+    if (null == handoffSizeStr || handoffSizeStr.length() == 0) {
+      carbonProperties.setProperty(CarbonCommonConstants.HANDOFF_SIZE,
+          "" + CarbonCommonConstants.HANDOFF_SIZE_DEFAULT);
+    } else {
+      try {
+        long handoffSize = Long.parseLong(handoffSizeStr);
+        if (handoffSize < CarbonCommonConstants.HANDOFF_SIZE_MIN) {
+          LOGGER.info("The streaming segment max size configured value " + handoffSizeStr
+
+              " is invalid. Using the default value "
+              + CarbonCommonConstants.HANDOFF_SIZE_DEFAULT);
+          carbonProperties.setProperty(CarbonCommonConstants.HANDOFF_SIZE,
+              "" + CarbonCommonConstants.HANDOFF_SIZE_DEFAULT);
+        }
+      } catch (NumberFormatException e) {
+        LOGGER.info("The streaming segment max size value \"" + handoffSizeStr
+            + "\" is invalid. Using the default value \""
+            + CarbonCommonConstants.HANDOFF_SIZE_DEFAULT + "\"");
+        carbonProperties.setProperty(CarbonCommonConstants.HANDOFF_SIZE,
+            "" + CarbonCommonConstants.HANDOFF_SIZE_DEFAULT);
+      }
+    }
+  }
+
   /**
    * This method validates the number of pages per blocklet column
    */
@@ -735,6 +761,21 @@ public final class CarbonProperties {
     return batchSize;
   }
 
+  public long getHandoffSize() {
+    Long handoffSize;
+    try {
+      handoffSize = Long.parseLong(
+          CarbonProperties.getInstance().getProperty(
+              CarbonCommonConstants.HANDOFF_SIZE,
+              "" + CarbonCommonConstants.HANDOFF_SIZE_DEFAULT
+          )
+      );
+    } catch (NumberFormatException exc) {
+      handoffSize = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT;
+    }
+    return handoffSize;
+  }
+
   /**
    * Validate the restrictions
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0f407de8/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
index 10b93ed..1e4569d 100644
--- a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
@@ -145,4 +145,11 @@ public class CarbonPropertiesValidationTest extends TestCase {
     assertTrue(
         CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT.equalsIgnoreCase(valueAfterValidation));
   }
+
+  @Test public void testValidateHandoffSize() {
+    assertEquals(CarbonCommonConstants.HANDOFF_SIZE_DEFAULT, carbonProperties.getHandoffSize());
+    long newSize = 1024L * 1024 * 100;
+    carbonProperties.addProperty(CarbonCommonConstants.HANDOFF_SIZE, "" + newSize);
+    assertEquals(newSize, carbonProperties.getHandoffSize());
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0f407de8/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
index 8497d2b..2599fa7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
@@ -53,22 +53,6 @@ public class CarbonStreamOutputFormat extends FileOutputFormat<Void,
Object> {
 
   private static final String SEGMENT_ID = "carbon.segment.id";
 
-  /**
-   * if the byte size of streaming segment reach this value,
-   * the system will create a new stream segment
-   */
-  public static final String HANDOFF_SIZE = "carbon.streaming.segment.max.size";
-
-  /**
-   * the min handoff size of streaming segment, the unit is byte
-   */
-  public static final long HANDOFF_SIZE_MIN = 1024L * 1024 * 64;
-
-  /**
-   * the default handoff size of streaming segment, the unit is byte
-   */
-  public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
-
   @Override public RecordWriter<Void, Object> getRecordWriter(TaskAttemptContext job)
       throws IOException, InterruptedException {
     return new CarbonStreamRecordWriter(job);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0f407de8/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index add4fca..cb641e1 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.types.StructType
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
@@ -759,7 +760,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       tableIdentifier: TableIdentifier,
       badRecordAction: String = "force",
       intervalSecond: Int = 2,
-      handoffSize: Long = CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT): Thread = {
+      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT): Thread = {
     new Thread() {
       override def run(): Unit = {
         var qry: StreamingQuery = null
@@ -779,7 +780,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
             .option("bad_records_action", badRecordAction)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
-            .option(CarbonStreamOutputFormat.HANDOFF_SIZE, handoffSize)
+            .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
             .start()
           qry.awaitTermination()
         } catch {
@@ -806,7 +807,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       continueSeconds: Int,
       generateBadRecords: Boolean,
       badRecordAction: String,
-      handoffSize: Long = CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT
+      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT
   ): Unit = {
     val identifier = new TableIdentifier(tableName, Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0f407de8/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index ab7e059..3df78b3 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -79,18 +79,18 @@ object StreamSinkFactory {
   }
 
   private def validateParameters(parameters: Map[String, String]): Unit = {
-    val segmentSize = parameters.get(CarbonStreamOutputFormat.HANDOFF_SIZE)
+    val segmentSize = parameters.get(CarbonCommonConstants.HANDOFF_SIZE)
     if (segmentSize.isDefined) {
       try {
         val value = java.lang.Long.parseLong(segmentSize.get)
-        if (value < CarbonStreamOutputFormat.HANDOFF_SIZE_MIN) {
-          new CarbonStreamException(CarbonStreamOutputFormat.HANDOFF_SIZE +
+        if (value < CarbonCommonConstants.HANDOFF_SIZE_MIN) {
+          new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE +
                                     "should be bigger than or equal " +
-                                    CarbonStreamOutputFormat.HANDOFF_SIZE_MIN)
+                                    CarbonCommonConstants.HANDOFF_SIZE_MIN)
         }
       } catch {
         case ex: NumberFormatException =>
-          new CarbonStreamException(CarbonStreamOutputFormat.HANDOFF_SIZE +
+          new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE +
                                     s" $segmentSize is an illegal number")
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0f407de8/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index b5517cb..064ba37 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -35,10 +35,12 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.stats.QueryStatistic
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -74,8 +76,9 @@ class CarbonAppendableStreamSink(
   }
   // segment max size(byte)
   private val segmentMaxSize = hadoopConf.getLong(
-    CarbonStreamOutputFormat.HANDOFF_SIZE,
-    CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT)
+    CarbonCommonConstants.HANDOFF_SIZE,
+    CarbonProperties.getInstance().getHandoffSize
+  )
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {


Mime
View raw message