carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [3/8] carbondata git commit: [CARBONDATA-2532][Integration] Carbon to support spark 2.3.1 version(Make API changes in carbon to be compatible with spark 2.3)
Date Wed, 05 Sep 2018 12:40:11 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b14e328..d55c21a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -569,6 +569,66 @@
                 <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.2</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/store/search/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/store/search/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
+              </sourceDirectories>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.3</id>
+      <properties>
+        <spark.version>2.3.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <modules>
+        <module>integration/spark2</module>
+        <module>integration/hive</module>
+        <module>integration/presto</module>
+        <module>streaming</module>
+        <module>examples/spark2</module>
+        <module>datamap/lucene</module>
+        <module>datamap/bloom</module>
+      </modules>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.eluder.coveralls</groupId>
+            <artifactId>coveralls-maven-plugin</artifactId>
+            <version>4.3.0</version>
+            <configuration>
+              <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
+              <sourceEncoding>UTF-8</sourceEncoding>
+              <jacocoReports>
+                <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
+                </jacocoReport>
+              </jacocoReports>
+              <sourceDirectories>
+                <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.3</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index b7630fb..97951ea 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -81,7 +81,7 @@ class Master(sparkConf: SparkConf) {
           do {
             try {
               LOG.info(s"starting registry-service on $hostAddress:$port")
-              val config = RpcEnvConfig(
+              val config = RpcUtil.getRpcEnvConfig(
                 sparkConf, "registry-service", hostAddress, "", port,
                 new SecurityManager(sparkConf), clientMode = false)
               rpcEnv = new NettyRpcEnvFactory().create(config)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala b/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala
new file mode 100644
index 0000000..f15bb8f
--- /dev/null
+++ b/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf}
+import org.apache.spark.util.Utils
+
+object RpcUtil {
+
+  def getRpcEnvConfig(conf: SparkConf,
+      name: String,
+      bindAddress: String,
+      advertiseAddress: String,
+      port: Int,
+      securityManager: SecurityManager,
+      clientMode: Boolean): RpcEnvConfig = {
+    val className = "org.apache.spark.rpc.RpcEnvConfig"
+    if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) {
+      createObject(className, conf, name, bindAddress,
+        advertiseAddress, port.asInstanceOf[Object],
+        securityManager, clientMode.asInstanceOf[Object])._1.asInstanceOf[RpcEnvConfig]
+    } else if (SPARK_VERSION.startsWith("2.3")) {
+      // numUsableCores if it is 0 then spark will consider the available CPUs on the host.
+      val numUsableCores: Int = 0
+      createObject(className, conf, name, bindAddress,
+        advertiseAddress, port.asInstanceOf[Object],
+        securityManager, numUsableCores.asInstanceOf[Object],
+        clientMode.asInstanceOf[Object])._1.asInstanceOf[RpcEnvConfig]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
+
+  def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
+    val clazz = Utils.classForName(className)
+    val ctor = clazz.getConstructors.head
+    ctor.setAccessible(true)
+    (ctor.newInstance(conArgs: _*), clazz)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
index 0f2138a..08baeeb 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
@@ -60,7 +60,7 @@ object Worker {
         do {
           try {
             LOG.info(s"starting search-service on $hostAddress:$port")
-            val config = RpcEnvConfig(
+            val config = RpcUtil.getRpcEnvConfig(
               conf, s"worker-$hostAddress", hostAddress, "", port,
               new SecurityManager(conf), clientMode = false)
             rpcEnv = new NettyRpcEnvFactory().create(config)
@@ -89,7 +89,7 @@ object Worker {
   private def registerToMaster(masterHostAddress: String, masterPort: Int): String = {
     LOG.info(s"trying to register to master $masterHostAddress:$masterPort")
     val conf = new SparkConf()
-    val config = RpcEnvConfig(conf, "registry-client", masterHostAddress, "", masterPort,
+    val config = RpcUtil.getRpcEnvConfig(conf, "registry-client", masterHostAddress, "",
masterPort,
       new SecurityManager(conf), clientMode = true)
     val rpcEnv: RpcEnv = new NettyRpcEnvFactory().create(config)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
index c0e270c..835b115 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
@@ -64,7 +64,7 @@ public class CarbonStreamInputFormat extends FileInputFormat<Void, Object>
{
     this.inputMetricsStats = inputMetricsStats;
   }
 
-  public void setVectorReader(boolean vectorReader) {
+  public void setIsVectorReader(boolean vectorReader) {
     isVectorReader = vectorReader;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java
b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java
deleted file mode 100644
index a224446..0000000
--- a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.statusmanager.FileFormat;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class CarbonStreamInputFormatTest extends TestCase {
-
-  private TaskAttemptID taskAttemptId;
-  private TaskAttemptContext taskAttemptContext;
-  private Configuration hadoopConf;
-  private AbsoluteTableIdentifier identifier;
-  private String tablePath;
-
-
-  @Override protected void setUp() throws Exception {
-    tablePath = new File("target/stream_input").getCanonicalPath();
-    String dbName = "default";
-    String tableName = "stream_table_input";
-    identifier = AbsoluteTableIdentifier.from(
-        tablePath,
-        new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
-
-    JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
-    TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
-    taskAttemptId = new TaskAttemptID(taskId, 0);
-
-    hadoopConf = new Configuration();
-    taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
-  }
-
-  private InputSplit buildInputSplit() throws IOException {
-    CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
-    List<CarbonInputSplit> splitList = new ArrayList<>();
-    splitList.add(carbonInputSplit);
-    return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" },
-        FileFormat.ROW_V1);
-  }
-
-  @Test public void testCreateRecordReader() {
-    try {
-      InputSplit inputSplit = buildInputSplit();
-      CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
-      RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
-      Assert.assertNotNull("Failed to create record reader", recordReader);
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.assertTrue(e.getMessage(), false);
-    }
-  }
-
-  @Override protected void tearDown() throws Exception {
-    super.tearDown();
-    if (tablePath != null) {
-      FileFactory.deleteAllFilesOfDir(new File(tablePath));
-    }
-  }
-}


Mime
View raw message