carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-1288][DictionarySecureServer] Dictionary Secure Server Implementation. During single pass load Dictionary key creation is done through driver and executor communication through external ports. The communication wi
Date Mon, 18 Dec 2017 03:50:35 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 91e6f6f43 -> 4daf0634e


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 58671b7..2afd040 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -32,7 +32,8 @@ import org.apache.spark.util.{CausedBy, FileUtils}
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
+import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
@@ -47,6 +48,8 @@ import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
+import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
@@ -163,6 +166,18 @@ case class CarbonLoadDataCommand(
         if (carbonLoadModel.isAggLoadRequest) {
           carbonLoadModel.setUseOnePass(false)
         }
+
+        // start dictionary server when use one pass load and dimension with DICTIONARY
+        // encoding is present.
+        val allDimensions =
+        carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
+        val createDictionary = allDimensions.exists {
+          carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+                             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+        }
+        if (!createDictionary) {
+          carbonLoadModel.setUseOnePass(false)
+        }
         // Create table and metadata folders if not exist
         val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
         val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
@@ -283,18 +298,25 @@ case class CarbonLoadDataCommand(
     val sparkDriverHost = sparkSession.sqlContext.sparkContext.
       getConf.get("spark.driver.host")
     carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
-    // start dictionary server when use one pass load and dimension with DICTIONARY
-    // encoding is present.
-    val allDimensions =
-      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
-    val createDictionary = allDimensions.exists {
-      carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-                         !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
-    }
-    val server: Option[DictionaryServer] = if (createDictionary) {
-      val dictionaryServer = DictionaryServer
-        .getInstance(dictionaryServerPort.toInt, carbonTable)
+
+    val carbonSecureModeDictServer = CarbonProperties.getInstance.
+      getProperty(CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER,
+      CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER_DEFAULT)
+
+    val sparkConf = sparkSession.sqlContext.sparkContext.getConf
+    // For testing.
+    // sparkConf.set("spark.authenticate", "true")
+    // sparkConf.set("spark.authenticate.secret", "secret")
+
+    val server: Option[DictionaryServer] = if (sparkConf.get("spark.authenticate", "false").
+      equalsIgnoreCase("true") && carbonSecureModeDictServer.toBoolean) {
+      val dictionaryServer = SecureDictionaryServer.getInstance(sparkConf,
+        sparkDriverHost.toString, dictionaryServerPort.toInt, carbonTable)
       carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+      carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost)
+      carbonLoadModel.setDictionaryServerSecretKey(dictionaryServer.getSecretKey)
+      carbonLoadModel.setDictionaryEncryptServerSecure(dictionaryServer.isEncryptSecureServer)
+      carbonLoadModel.setDictionaryServiceProvider(new SecureDictionaryServiceProvider())
       sparkSession.sparkContext.addSparkListener(new SparkListener() {
         override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
           dictionaryServer.shutdown()
@@ -302,7 +324,20 @@ case class CarbonLoadDataCommand(
       })
       Some(dictionaryServer)
     } else {
-      None
+      val dictionaryServer = NonSecureDictionaryServer
+        .getInstance(dictionaryServerPort.toInt, carbonTable)
+      carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+      carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost)
+      carbonLoadModel.setDictionaryEncryptServerSecure(false)
+      carbonLoadModel
+        .setDictionaryServiceProvider(new NonSecureDictionaryServiceProvider(dictionaryServer
+          .getPort))
+      sparkSession.sparkContext.addSparkListener(new SparkListener() {
+        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+          dictionaryServer.shutdown()
+        }
+      })
+      Some(dictionaryServer)
     }
     val loadDataFrame = if (updateModel.isDefined) {
        Some(getDataFrameWithTupleID())

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 7309c91..19c8f03 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -64,6 +65,21 @@ public class CarbonDataLoadConfiguration {
    */
   private int dictionaryServerPort;
 
+  /**
+   * dictionary server secret key
+   */
+  private String dictionaryServerSecretKey;
+
+  /**
+   * Dictionary Service Provider.
+   */
+  private DictionaryServiceProvider dictionaryServiceProvider;
+
+  /**
+   * Secure Mode or not.
+   */
+  private Boolean dictionaryEncryptServerSecure;
+
   private boolean preFetch;
 
   private int dimensionCount;
@@ -232,6 +248,30 @@ public class CarbonDataLoadConfiguration {
     this.dictionaryServerPort = dictionaryServerPort;
   }
 
+  public String getDictionaryServerSecretKey() {
+    return dictionaryServerSecretKey;
+  }
+
+  public void setDictionaryServerSecretKey(String dictionaryServerSecretKey) {
+    this.dictionaryServerSecretKey = dictionaryServerSecretKey;
+  }
+
+  public DictionaryServiceProvider getDictionaryServiceProvider() {
+    return dictionaryServiceProvider;
+  }
+
+  public void setDictionaryServiceProvider(DictionaryServiceProvider dictionaryServiceProvider)
{
+    this.dictionaryServiceProvider = dictionaryServiceProvider;
+  }
+
+  public Boolean getDictionaryEncryptServerSecure() {
+    return dictionaryEncryptServerSecure;
+  }
+
+  public void setDictionaryEncryptServerSecure(Boolean dictionaryEncryptServerSecure) {
+    this.dictionaryEncryptServerSecure = dictionaryEncryptServerSecure;
+  }
+
   public boolean isPreFetch() {
     return preFetch;
   }
@@ -310,4 +350,6 @@ public class CarbonDataLoadConfiguration {
   public void setTableSpec(TableSpec tableSpec) {
     this.tableSpec = tableSpec;
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index ab857e4..787cb7b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -218,6 +218,9 @@ public final class DataLoadProcessBuilder {
     configuration.setUseOnePass(loadModel.getUseOnePass());
     configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
     configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
+    configuration.setDictionaryServerSecretKey(loadModel.getDictionaryServerSecretKey());
+    configuration.setDictionaryEncryptServerSecure(loadModel.getDictionaryEncryptServerSecure());
+    configuration.setDictionaryServiceProvider(loadModel.getDictionaryServiceProvider());
     configuration.setPreFetch(loadModel.isPreFetch());
     configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
     configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index 16c4a22..83245a3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.service.DictionaryOnePassService;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.loading.BadRecordsLogger;
@@ -119,16 +120,22 @@ public class RowConverterImpl implements RowConverter {
             "DictionaryClientPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
                 .getTableName()));
       }
-      Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>()
{
-        @Override
-        public DictionaryClient call() throws Exception {
-          Thread.currentThread().setName("Dictionary client");
-          DictionaryClient dictionaryClient = new DictionaryClient();
-          dictionaryClient.startClient(configuration.getDictionaryServerHost(),
-              configuration.getDictionaryServerPort());
-          return dictionaryClient;
-        }
-      });
+      DictionaryOnePassService
+          .setDictionaryServiceProvider(configuration.getDictionaryServiceProvider());
+
+      Future<DictionaryClient> result =
+          executorService.submit(new Callable<DictionaryClient>() {
+            @Override public DictionaryClient call() throws Exception {
+              Thread.currentThread().setName("Dictionary client");
+              DictionaryClient client =
+                  DictionaryOnePassService.getDictionayProvider().getDictionaryClient();
+              client.startClient(configuration.getDictionaryServerSecretKey(),
+                  configuration.getDictionaryServerHost(), configuration.getDictionaryServerPort(),
+                  configuration.getDictionaryEncryptServerSecure());
+              return client;
+            }
+          });
+
 
       try {
         // wait for client initialization finished, or will raise null pointer exception

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 7b952e3..3031b8e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -21,10 +21,12 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 
+
 public class CarbonLoadModel implements Serializable {
 
   private static final long serialVersionUID = 6580168429197697465L;
@@ -147,6 +149,20 @@ public class CarbonLoadModel implements Serializable {
   private int dictionaryServerPort;
 
   /**
+   * dictionary server communication Secret Key.
+   */
+  private String dictionaryServerSecretKey;
+
+  /**
+   * dictionary service provider.
+   */
+  private DictionaryServiceProvider dictionaryServiceProvider;
+
+  /**
+   * Dictionary Secure or not.
+   */
+  private Boolean dictionaryEncryptServerSecure;
+  /**
    * Pre fetch data from csv reader
    */
   private boolean preFetch;
@@ -330,6 +346,15 @@ public class CarbonLoadModel implements Serializable {
     this.colDictFilePath = colDictFilePath;
   }
 
+
+  public DictionaryServiceProvider getDictionaryServiceProvider() {
+    return dictionaryServiceProvider;
+  }
+
+  public void setDictionaryServiceProvider(DictionaryServiceProvider dictionaryServiceProvider)
{
+    this.dictionaryServiceProvider = dictionaryServiceProvider;
+  }
+
   /**
    * get copy with partition
    *
@@ -366,6 +391,9 @@ public class CarbonLoadModel implements Serializable {
     copy.useOnePass = useOnePass;
     copy.dictionaryServerHost = dictionaryServerHost;
     copy.dictionaryServerPort = dictionaryServerPort;
+    copy.dictionaryServerSecretKey = dictionaryServerSecretKey;
+    copy.dictionaryEncryptServerSecure = dictionaryEncryptServerSecure;
+    copy.dictionaryServiceProvider = dictionaryServiceProvider;
     copy.preFetch = preFetch;
     copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
     copy.skipEmptyLine = skipEmptyLine;
@@ -416,6 +444,9 @@ public class CarbonLoadModel implements Serializable {
     copy.useOnePass = useOnePass;
     copy.dictionaryServerHost = dictionaryServerHost;
     copy.dictionaryServerPort = dictionaryServerPort;
+    copy.dictionaryServerSecretKey = dictionaryServerSecretKey;
+    copy.dictionaryServiceProvider = dictionaryServiceProvider;
+    copy.dictionaryEncryptServerSecure = dictionaryEncryptServerSecure;
     copy.preFetch = preFetch;
     copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
     copy.skipEmptyLine = skipEmptyLine;
@@ -468,6 +499,9 @@ public class CarbonLoadModel implements Serializable {
     copyObj.useOnePass = useOnePass;
     copyObj.dictionaryServerHost = dictionaryServerHost;
     copyObj.dictionaryServerPort = dictionaryServerPort;
+    copyObj.dictionaryServerSecretKey = dictionaryServerSecretKey;
+    copyObj.dictionaryServiceProvider = dictionaryServiceProvider;
+    copyObj.dictionaryEncryptServerSecure = dictionaryEncryptServerSecure;
     copyObj.preFetch = preFetch;
     copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
     copyObj.skipEmptyLine = skipEmptyLine;
@@ -722,6 +756,22 @@ public class CarbonLoadModel implements Serializable {
     this.dictionaryServerPort = dictionaryServerPort;
   }
 
+  public String getDictionaryServerSecretKey() {
+    return dictionaryServerSecretKey;
+  }
+
+  public void setDictionaryServerSecretKey(String dictionaryServerSecretKey) {
+    this.dictionaryServerSecretKey = dictionaryServerSecretKey;
+  }
+
+  public Boolean getDictionaryEncryptServerSecure() {
+    return dictionaryEncryptServerSecure;
+  }
+
+  public void setDictionaryEncryptServerSecure(Boolean dictionaryEncryptServerSecure) {
+    this.dictionaryEncryptServerSecure = dictionaryEncryptServerSecure;
+  }
+
   public String getDictionaryServerHost() {
     return dictionaryServerHost;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 3df78b3..02a076e 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -26,13 +26,16 @@ import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink,
Sin
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
+import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
+import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.util.DataLoadingUtil
 import org.apache.carbondata.streaming.segment.StreamSegment
 
@@ -61,10 +64,9 @@ object StreamSinkFactory {
     val server = startDictionaryServer(
       sparkSession,
       carbonTable,
-      carbonLoadModel.getDictionaryServerPort)
+      carbonLoadModel)
     if (server.isDefined) {
       carbonLoadModel.setUseOnePass(true)
-      carbonLoadModel.setDictionaryServerPort(server.get.getPort)
     } else {
       carbonLoadModel.setUseOnePass(false)
     }
@@ -118,7 +120,7 @@ object StreamSinkFactory {
   def startDictionaryServer(
       sparkSession: SparkSession,
       carbonTable: CarbonTable,
-      port: Int): Option[DictionaryServer] = {
+      carbonLoadModel: CarbonLoadModel): Option[DictionaryServer] = {
     // start dictionary server when use one pass load and dimension with DICTIONARY
     // encoding is present.
     val allDimensions = carbonTable.getAllDimensions.asScala.toList
@@ -126,14 +128,46 @@ object StreamSinkFactory {
       carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
                          !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
     }
+    val carbonSecureModeDictServer = CarbonProperties.getInstance.
+      getProperty(CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER,
+        CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER_DEFAULT)
+
+    val sparkConf = sparkSession.sqlContext.sparkContext.getConf
+    val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+      getConf.get("spark.driver.host")
+
     val server: Option[DictionaryServer] = if (createDictionary) {
-      val dictionaryServer = DictionaryServer.getInstance(port, carbonTable)
-      sparkSession.sparkContext.addSparkListener(new SparkListener() {
-        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
-          dictionaryServer.shutdown()
-        }
-      })
-      Some(dictionaryServer)
+      if (sparkConf.get("spark.authenticate", "false").equalsIgnoreCase("true") &&
+          carbonSecureModeDictServer.toBoolean) {
+        val dictionaryServer = SecureDictionaryServer.getInstance(sparkConf,
+          sparkDriverHost.toString, carbonLoadModel.getDictionaryServerPort, carbonTable)
+        carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+        carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost)
+        carbonLoadModel.setDictionaryServerSecretKey(dictionaryServer.getSecretKey)
+        carbonLoadModel.setDictionaryEncryptServerSecure(dictionaryServer.isEncryptSecureServer)
+        carbonLoadModel.setDictionaryServiceProvider(new SecureDictionaryServiceProvider())
+        sparkSession.sparkContext.addSparkListener(new SparkListener() {
+          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+            dictionaryServer.shutdown()
+          }
+        })
+        Some(dictionaryServer)
+      } else {
+        val dictionaryServer = NonSecureDictionaryServer
+          .getInstance(carbonLoadModel.getDictionaryServerPort, carbonTable)
+        carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+        carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost)
+        carbonLoadModel.setDictionaryEncryptServerSecure(false)
+        carbonLoadModel
+          .setDictionaryServiceProvider(new NonSecureDictionaryServiceProvider(dictionaryServer
+            .getPort))
+        sparkSession.sparkContext.addSparkListener(new SparkListener() {
+          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+            dictionaryServer.shutdown()
+          }
+        })
+        Some(dictionaryServer)
+      }
     } else {
       None
     }


Mime
View raw message