carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [2/2] carbondata git commit: [CARBONDATA-1288][DictionarySecureServer] Dictionary Secure Server Implementation. During single pass load Dictionary key creation is done through driver and executor communication through external ports. The communication wi
Date Mon, 18 Dec 2017 03:50:36 GMT
[CARBONDATA-1288][DictionarySecureServer] Dictionary Secure Server Implementation.
During single pass load Dictionary key creation is done through driver and executor communication through external ports.
The communication will happen through TCP communication which is not secure or encrypted by default.
But in case spark turn on Security parameters then carbon dictionary ports will also follows same authentication and encryption
i.e. communicate through SASL Authentication protocol and Digest-MD5 encryption.
This PR makes the dictionary server and client communication Secure and encrypted.
In case spark turn ON security and authentication through the below parameters then Carbon communication also becomes secure.
By default the communication is still non secure.

Parameters to set in spark-default.conf in order to Turn ON Secure Mode Dictionary Server Communication.
spark.authenticate true
spark.authenticate.enableSaslEncryption true
spark.authenticate.secret

Note- Turning ON this flag will turn on authentication and encryption in spark too.

This Closes #1662


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4daf0634
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4daf0634
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4daf0634

Branch: refs/heads/master
Commit: 4daf0634e6338c627b01188a96b6d5c539308863
Parents: 91e6f6f
Author: sounak <sounak.chakraborty@huawei.com>
Authored: Thu Jul 6 20:48:54 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Mon Dec 18 09:18:41 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   6 +
 .../dictionary/client/DictionaryClient.java     |  76 +------
 .../client/DictionaryClientHandler.java         | 126 -----------
 .../client/NonSecureDictionaryClient.java       |  97 +++++++++
 .../NonSecureDictionaryClientHandler.java       | 126 +++++++++++
 .../generator/key/DictionaryMessage.java        |  28 ++-
 .../dictionary/server/DictionaryServer.java     | 142 ++-----------
 .../server/DictionaryServerHandler.java         | 109 ----------
 .../server/NonSecureDictionaryServer.java       | 180 ++++++++++++++++
 .../NonSecureDictionaryServerHandler.java       | 112 ++++++++++
 .../service/AbstractDictionaryServer.java       |  89 ++++++++
 .../service/DictionaryOnePassService.java       |  30 +++
 .../service/DictionaryServiceProvider.java      |  27 +++
 .../NonSecureDictionaryServiceProvider.java     |  42 ++++
 .../dictionary/client/DictionaryClientTest.java |   9 +-
 .../examples/CarbonSessionExample.scala         |  30 +--
 .../client/SecureDictionaryClient.java          | 116 ++++++++++
 .../client/SecureDictionaryClientHandler.java   | 104 +++++++++
 .../SecureDictionaryServiceProvider.java        |  34 +++
 .../server/SecureDictionaryServer.java          | 211 +++++++++++++++++++
 .../server/SecureDictionaryServerHandler.java   |  98 +++++++++
 .../execution/command/carbonTableSchema.scala   |   0
 .../execution/command/carbonTableSchema.scala   |   0
 .../management/CarbonLoadDataCommand.scala      |  61 ++++--
 .../loading/CarbonDataLoadConfiguration.java    |  42 ++++
 .../loading/DataLoadProcessBuilder.java         |   3 +
 .../converter/impl/RowConverterImpl.java        |  27 ++-
 .../loading/model/CarbonLoadModel.java          |  50 +++++
 .../streaming/StreamSinkFactory.scala           |  56 ++++-
 29 files changed, 1542 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 959f4d4..f67b0c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1313,6 +1313,12 @@ public final class CarbonCommonConstants {
   public static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL =
       "carbon.lease.recovery.retry.interval";
 
+  @CarbonProperty
+  public static final String CARBON_SECURE_DICTIONARY_SERVER =
+      "carbon.secure.dictionary.server";
+
+  public static final String CARBON_SECURE_DICTIONARY_SERVER_DEFAULT = "true";
+
   /**
    * whether to use multi directories when loading data,
    * the main purpose is to avoid single-disk-hot-spot

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
index 7910190..64e6252 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
@@ -14,82 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.carbondata.core.dictionary.client;
 
-import java.net.InetSocketAddress;
+package org.apache.carbondata.core.dictionary.client;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-/**
- * Dictionary client to connect to Dictionary server and generate dictionary values
- */
-public class DictionaryClient {
-
-  private static final LogService LOGGER =
-          LogServiceFactory.getLogService(DictionaryClient.class.getName());
-
-  private DictionaryClientHandler dictionaryClientHandler = new DictionaryClientHandler();
-
-  private NioEventLoopGroup workerGroup;
+public interface DictionaryClient {
 
-  /**
-   * start dictionary client
-   *
-   * @param address
-   * @param port
-   */
-  public void startClient(String address, int port) {
-    LOGGER.audit("Starting client on " + address + " " + port);
-    long start = System.currentTimeMillis();
-    // Create an Event with 1 thread.
-    workerGroup = new NioEventLoopGroup(1);
-    Bootstrap clientBootstrap = new Bootstrap();
-    clientBootstrap.group(workerGroup).channel(NioSocketChannel.class)
-        .handler(new ChannelInitializer<SocketChannel>() {
-          @Override public void initChannel(SocketChannel ch) throws Exception {
-            ChannelPipeline pipeline = ch.pipeline();
-            // Based on length provided at header, it collects all packets
-            pipeline
-                .addLast("LengthDecoder",
-                    new LengthFieldBasedFrameDecoder(1048576, 0,
-                        2, 0, 2));
-            pipeline.addLast("DictionaryClientHandler", dictionaryClientHandler);
-          }
-        });
-    clientBootstrap.connect(new InetSocketAddress(address, port));
-    LOGGER.info(
-        "Dictionary client Started, Total time spent : " + (System.currentTimeMillis() - start));
-  }
+  public void startClient(String secretKey, String address, int port, boolean encryptSecureServer);
 
-  /**
-   * for client request
-   *
-   * @param key
-   * @return
-   */
-  public DictionaryMessage getDictionary(DictionaryMessage key) {
-    return dictionaryClientHandler.getDictionary(key);
-  }
+  public void shutDown();
 
-  /**
-   * shutdown dictionary client
-   */
-  public void shutDown() {
-    workerGroup.shutdownGracefully();
-    try {
-      workerGroup.terminationFuture().sync();
-    } catch (InterruptedException e) {
-      LOGGER.error(e);
-    }
-  }
+  public DictionaryMessage getDictionary(DictionaryMessage key);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
deleted file mode 100644
index 01ef59a..0000000
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.dictionary.client;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-
-/**
- * Client handler to get data.
- */
-public class DictionaryClientHandler extends ChannelInboundHandlerAdapter {
-
-  private static final LogService LOGGER =
-          LogServiceFactory.getLogService(DictionaryClientHandler.class.getName());
-
-  private final BlockingQueue<DictionaryMessage> responseMsgQueue = new LinkedBlockingQueue<>();
-
-  private ChannelHandlerContext ctx;
-
-  private DictionaryChannelFutureListener channelFutureListener;
-
-  @Override
-  public void channelActive(ChannelHandlerContext ctx) throws Exception {
-    this.ctx = ctx;
-    channelFutureListener = new DictionaryChannelFutureListener(ctx);
-    LOGGER.audit("Connected client " + ctx);
-    super.channelActive(ctx);
-  }
-
-  @Override
-  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-    try {
-      ByteBuf data = (ByteBuf) msg;
-      DictionaryMessage key = new DictionaryMessage();
-      key.readData(data);
-      data.release();
-      responseMsgQueue.add(key);
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw e;
-    }
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-    LOGGER.error(cause, "exceptionCaught");
-    ctx.close();
-  }
-
-  /**
-   * client send request to server
-   *
-   * @param key DictionaryMessage
-   * @return DictionaryMessage
-   */
-  public DictionaryMessage getDictionary(DictionaryMessage key) {
-    DictionaryMessage dictionaryMessage;
-    try {
-      ByteBuf buffer = ctx.alloc().buffer();
-      key.writeData(buffer);
-      ctx.writeAndFlush(buffer).addListener(channelFutureListener);
-    } catch (Exception e) {
-      LOGGER.error(e, "Error while send request to server ");
-      ctx.close();
-    }
-    try {
-      dictionaryMessage = responseMsgQueue.poll(100, TimeUnit.SECONDS);
-      if (dictionaryMessage == null) {
-        StringBuilder message = new StringBuilder();
-        message.append("DictionaryMessage { ColumnName: ")
-            .append(key.getColumnName())
-            .append(", DictionaryValue: ")
-            .append(key.getDictionaryValue())
-            .append(", type: ")
-            .append(key.getType());
-        throw new RuntimeException("Request timed out for key : " + message);
-      }
-      return dictionaryMessage;
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  private static class DictionaryChannelFutureListener implements ChannelFutureListener {
-
-    private ChannelHandlerContext ctx;
-
-    DictionaryChannelFutureListener(ChannelHandlerContext ctx) {
-      this.ctx = ctx;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (!future.isSuccess()) {
-        LOGGER.error(future.cause(), "Error while sending request to Dictionary Server");
-        ctx.close();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java
new file mode 100644
index 0000000..cf25ee1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.dictionary.client;
+
+import java.net.InetSocketAddress;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * Dictionary client to connect to Dictionary server and generate dictionary values
+ */
+public class NonSecureDictionaryClient implements DictionaryClient {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(NonSecureDictionaryClient.class.getName());
+
+  private NonSecureDictionaryClientHandler nonSecureDictionaryClientHandler =
+      new NonSecureDictionaryClientHandler();
+
+  private NioEventLoopGroup workerGroup;
+
+  /**
+   * start dictionary client
+   *
+   * @param address
+   * @param port
+   */
+  @Override public void startClient(String secretKey, String address, int port,
+      boolean encryptSecureServer) {
+    LOGGER.audit("Starting client on " + address + " " + port);
+    long start = System.currentTimeMillis();
+    // Create an Event with 1 thread.
+    workerGroup = new NioEventLoopGroup(1);
+    Bootstrap clientBootstrap = new Bootstrap();
+    clientBootstrap.group(workerGroup).channel(NioSocketChannel.class)
+        .handler(new ChannelInitializer<SocketChannel>() {
+          @Override public void initChannel(SocketChannel ch) throws Exception {
+            ChannelPipeline pipeline = ch.pipeline();
+            // Based on length provided at header, it collects all packets
+            pipeline
+                .addLast("LengthDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 2, 0, 2));
+            pipeline.addLast("NonSecureDictionaryClientHandler", nonSecureDictionaryClientHandler);
+          }
+        });
+    clientBootstrap.connect(new InetSocketAddress(address, port));
+    LOGGER.info(
+        "Dictionary client Started, Total time spent : " + (System.currentTimeMillis() - start));
+  }
+
+  /**
+   * for client request
+   *
+   * @param key
+   * @return
+   */
+  @Override
+  public DictionaryMessage getDictionary(
+      DictionaryMessage key) {
+    return nonSecureDictionaryClientHandler.getDictionary(key);
+  }
+
+  /**
+   * shutdown dictionary client
+   */
+  @Override public void shutDown() {
+    workerGroup.shutdownGracefully();
+    try {
+      workerGroup.terminationFuture().sync();
+    } catch (InterruptedException e) {
+      LOGGER.error(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java
new file mode 100644
index 0000000..3a76d84
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.dictionary.client;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+/**
+ * Client handler to get data.
+ */
+public class NonSecureDictionaryClientHandler extends ChannelInboundHandlerAdapter {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(NonSecureDictionaryClientHandler.class.getName());
+
+  private final BlockingQueue<DictionaryMessage> responseMsgQueue = new LinkedBlockingQueue<>();
+
+  private ChannelHandlerContext ctx;
+
+  private DictionaryChannelFutureListener channelFutureListener;
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    this.ctx = ctx;
+    channelFutureListener = new DictionaryChannelFutureListener(ctx);
+    LOGGER.audit("Connected client " + ctx);
+    super.channelActive(ctx);
+  }
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    try {
+      ByteBuf data = (ByteBuf) msg;
+      DictionaryMessage key = new DictionaryMessage();
+      key.readSkipLength(data);
+      data.release();
+      responseMsgQueue.add(key);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+    LOGGER.error(cause, "exceptionCaught");
+    ctx.close();
+  }
+
+  /**
+   * client send request to server
+   *
+   * @param key DictionaryMessage
+   * @return DictionaryMessage
+   */
+  public DictionaryMessage getDictionary(DictionaryMessage key) {
+    DictionaryMessage dictionaryMessage;
+    try {
+      ByteBuf buffer = ctx.alloc().buffer();
+      key.writeData(buffer);
+      ctx.writeAndFlush(buffer).addListener(channelFutureListener);
+    } catch (Exception e) {
+      LOGGER.error(e, "Error while send request to server ");
+      ctx.close();
+    }
+    try {
+      dictionaryMessage = responseMsgQueue.poll(100, TimeUnit.SECONDS);
+      if (dictionaryMessage == null) {
+        StringBuilder message = new StringBuilder();
+        message.append("DictionaryMessage { ColumnName: ")
+            .append(key.getColumnName())
+            .append(", DictionaryValue: ")
+            .append(key.getDictionaryValue())
+            .append(", type: ")
+            .append(key.getType());
+        throw new RuntimeException("Request timed out for key : " + message);
+      }
+      return dictionaryMessage;
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static class DictionaryChannelFutureListener implements ChannelFutureListener {
+
+    private ChannelHandlerContext ctx;
+
+    DictionaryChannelFutureListener(ChannelHandlerContext ctx) {
+      this.ctx = ctx;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (!future.isSuccess()) {
+        LOGGER.error(future.cause(), "Error while sending request to Dictionary Server");
+        ctx.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java
index d59e9f8..3b3f21a 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java
@@ -52,7 +52,32 @@ public class DictionaryMessage {
    */
   private DictionaryMessageType type;
 
-  public void readData(ByteBuf byteBuf) {
+  public void readSkipLength(ByteBuf byteBuf) {
+
+    byte[] tableBytes = new byte[byteBuf.readInt()];
+    byteBuf.readBytes(tableBytes);
+    tableUniqueId = new String(tableBytes, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+
+    byte[] colBytes = new byte[byteBuf.readInt()];
+    byteBuf.readBytes(colBytes);
+    columnName = new String(colBytes, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+
+    byte typeByte = byteBuf.readByte();
+    type = getKeyType(typeByte);
+
+    byte dataType = byteBuf.readByte();
+    if (dataType == 0) {
+      dictionaryValue = byteBuf.readInt();
+    } else {
+      byte[] dataBytes = new byte[byteBuf.readInt()];
+      byteBuf.readBytes(dataBytes);
+      data = new String(dataBytes, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+    }
+  }
+
+  public void readFullLength(ByteBuf byteBuf) {
+
+    byteBuf.readShort();
     byte[] tableIdBytes = new byte[byteBuf.readInt()];
     byteBuf.readBytes(tableIdBytes);
     tableUniqueId =
@@ -107,6 +132,7 @@ public class DictionaryMessage {
     byteBuf.setShort(startIndex, endIndex - startIndex - 2);
   }
 
+
   private DictionaryMessageType getKeyType(byte type) {
     switch (type) {
       case 2:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
index d8a7c9e..a23cc9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
@@ -14,146 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.carbondata.core.dictionary.server;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-/**
- * Dictionary Server to generate dictionary keys.
- */
-public class DictionaryServer {
-
-  private static final LogService LOGGER =
-          LogServiceFactory.getLogService(DictionaryServer.class.getName());
 
-  private DictionaryServerHandler dictionaryServerHandler;
+public interface DictionaryServer {
 
-  private EventLoopGroup boss;
-  private EventLoopGroup worker;
-  private int port;
-  private static Object lock = new Object();
-  private static volatile DictionaryServer INSTANCE = null;
+  public void startServer();
 
-  private DictionaryServer(int port) {
-    startServer(port);
-  }
+  public void bindToPort();
 
-  public static DictionaryServer getInstance(int port, CarbonTable carbonTable) throws Exception {
-    if (INSTANCE == null) {
-      synchronized (lock) {
-        if (INSTANCE == null) {
-          INSTANCE = new DictionaryServer(port);
-        }
-      }
-    }
-    INSTANCE.initializeDictionaryGenerator(carbonTable);
-    return INSTANCE;
-  }
+  public void shutdown()throws Exception;
 
-  /**
-   * start dictionary server
-   *
-   * @param port
-   */
-  private void startServer(int port) {
-    dictionaryServerHandler = new DictionaryServerHandler();
-    String workerThreads = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.DICTIONARY_WORKER_THREADS,
-            CarbonCommonConstants.DICTIONARY_WORKER_THREADS_DEFAULT);
-    boss = new NioEventLoopGroup(1);
-    worker = new NioEventLoopGroup(Integer.parseInt(workerThreads));
-    // Configure the server.
-    bindToPort(port);
-  }
+  public String getHost();
 
-  /**
-   * Binds dictionary server to an available port.
-   *
-   * @param port
-   */
-  private void bindToPort(int port) {
-    long start = System.currentTimeMillis();
-    // Configure the server.
-    int i = 0;
-    while (i < 10) {
-      int newPort = port + i;
-      try {
-        ServerBootstrap bootstrap = new ServerBootstrap();
-        bootstrap.group(boss, worker);
-        bootstrap.channel(NioServerSocketChannel.class);
-        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
-          @Override public void initChannel(SocketChannel ch) throws Exception {
-            ChannelPipeline pipeline = ch.pipeline();
-            pipeline
-                .addLast("LengthDecoder",
-                    new LengthFieldBasedFrameDecoder(1048576, 0,
-                        2, 0, 2));
-            pipeline.addLast("DictionaryServerHandler", dictionaryServerHandler);
-          }
-        });
-        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
-        bootstrap.bind(newPort).sync();
-        LOGGER.audit("Dictionary Server started, Time spent " + (System.currentTimeMillis() - start)
-            + " Listening on port " + newPort);
-        this.port = newPort;
-        break;
-      } catch (Exception e) {
-        LOGGER.error(e, "Dictionary Server Failed to bind to port:");
-        if (i == 9) {
-          throw new RuntimeException("Dictionary Server Could not bind to any port");
-        }
-      }
-      i++;
-    }
-  }
+  public int getPort();
 
-  /**
-   *
-   * @return Port on which the DictionaryServer has started.
-   */
-  public int getPort() {
-    return port;
-  }
+  public String getSecretKey();
 
-  /**
-   * shutdown dictionary server
-   *
-   * @throws Exception
-   */
-  public void shutdown() throws Exception {
-    LOGGER.info("Shutting down dictionary server");
-    worker.shutdownGracefully();
-    boss.shutdownGracefully();
-  }
+  public boolean isEncryptSecureServer();
 
-  public void initializeDictionaryGenerator(CarbonTable carbonTable) throws Exception {
-    dictionaryServerHandler.initializeTable(carbonTable);
-  }
+  public void writeTableDictionary(String uniqueTableName) throws Exception;
 
-  /**
-   *  Write Dictionary for one table.
-   * @throws Exception
-   */
-  public void writeTableDictionary(String tableId) throws Exception {
-    DictionaryMessage key = new DictionaryMessage();
-    key.setTableUniqueId(tableId);
-    key.setType(DictionaryMessageType.WRITE_TABLE_DICTIONARY);
-    dictionaryServerHandler.processMessage(key);
-  }
-}
\ No newline at end of file
+  public void initializeDictionaryGenerator(CarbonTable carbonTable) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
deleted file mode 100644
index cf8581b..0000000
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.dictionary.server;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.dictionary.generator.ServerDictionaryGenerator;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-
-/**
- * Handler for Dictionary server.
- */
-@ChannelHandler.Sharable public class DictionaryServerHandler extends ChannelInboundHandlerAdapter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DictionaryServerHandler.class.getName());
-
-  /**
-   * dictionary generator
-   */
-  private ServerDictionaryGenerator generatorForServer = new ServerDictionaryGenerator();
-
-  /**
-   * channel registered
-   *
-   * @param ctx
-   * @throws Exception
-   */
-  public void channelActive(ChannelHandlerContext ctx) throws Exception {
-    LOGGER.audit("Connected " + ctx);
-    super.channelActive(ctx);
-  }
-
-  @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-    try {
-      ByteBuf data = (ByteBuf) msg;
-      DictionaryMessage key = new DictionaryMessage();
-      key.readData(data);
-      data.release();
-      int outPut = processMessage(key);
-      key.setDictionaryValue(outPut);
-      // Send back the response
-      ByteBuf buffer = ctx.alloc().buffer();
-      key.writeData(buffer);
-      ctx.writeAndFlush(buffer);
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw e;
-    }
-  }
-
-  /**
-   * handle exceptions
-   *
-   * @param ctx
-   * @param cause
-   */
-  @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    LOGGER.error(cause, "exceptionCaught");
-    ctx.close();
-  }
-
-  /**
-   * process message by message type
-   *
-   * @param key
-   * @return
-   * @throws Exception
-   */
-  public int processMessage(DictionaryMessage key) throws Exception {
-    switch (key.getType()) {
-      case DICT_GENERATION:
-        return generatorForServer.generateKey(key);
-      case SIZE:
-        return generatorForServer.size(key);
-      case WRITE_TABLE_DICTIONARY:
-        generatorForServer
-            .writeTableDictionaryData(key.getTableUniqueId());
-        return 0;
-      default:
-        return -1;
-    }
-  }
-
-  void initializeTable(CarbonTable carbonTable) {
-    generatorForServer.initializeGeneratorForTable(carbonTable);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java
new file mode 100644
index 0000000..c7411d6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.dictionary.server;
+
+import java.net.InetSocketAddress;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
+import org.apache.carbondata.core.dictionary.service.AbstractDictionaryServer;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * Dictionary Server to generate dictionary keys.
+ */
+public class NonSecureDictionaryServer extends AbstractDictionaryServer
+    implements DictionaryServer {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(NonSecureDictionaryServer.class.getName());
+
+  private NonSecureDictionaryServerHandler nonSecureDictionaryServerHandler;
+
+  private EventLoopGroup boss;
+  private EventLoopGroup worker;
+  private int port;
+  private String host;
+  private static Object lock = new Object();
+  private static NonSecureDictionaryServer INSTANCE = null;
+
+  private NonSecureDictionaryServer(int port) {
+    this.port = port;
+    startServer();
+  }
+
+  public static synchronized DictionaryServer getInstance(int port, CarbonTable carbonTable)
+      throws Exception {
+    if (INSTANCE == null) {
+      INSTANCE = new NonSecureDictionaryServer(port);
+    }
+    INSTANCE.initializeDictionaryGenerator(carbonTable);
+    return INSTANCE;
+  }
+
+  /**
+   * start dictionary server
+   *
+   */
+  @Override public void startServer() {
+    LOGGER.info("Starting Dictionary Server in Non Secure Mode");
+    nonSecureDictionaryServerHandler = new NonSecureDictionaryServerHandler();
+    String workerThreads = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.DICTIONARY_WORKER_THREADS,
+            CarbonCommonConstants.DICTIONARY_WORKER_THREADS_DEFAULT);
+    boss = new NioEventLoopGroup(1);
+    worker = new NioEventLoopGroup(Integer.parseInt(workerThreads));
+    // Configure the server.
+    bindToPort();
+  }
+
+  /**
+   * Binds dictionary server to an available port.
+   *
+   */
+  @Override public void bindToPort() {
+    long start = System.currentTimeMillis();
+    // Configure the server.
+    int i = 0;
+    while (i < 10) {
+      int newPort = port + i;
+      try {
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.group(boss, worker);
+        bootstrap.channel(NioServerSocketChannel.class);
+        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+          @Override public void initChannel(SocketChannel ch) throws Exception {
+            ChannelPipeline pipeline = ch.pipeline();
+            pipeline
+                .addLast("LengthDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 2, 0, 2));
+            pipeline.addLast("NonSecureDictionaryServerHandler", nonSecureDictionaryServerHandler);
+          }
+        });
+        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
+        String hostToBind = findLocalIpAddress(LOGGER);
+        InetSocketAddress address = hostToBind == null ?
+            new InetSocketAddress(newPort) :
+            new InetSocketAddress(hostToBind, newPort);
+        bootstrap.bind(address).sync();
+        LOGGER.audit("Dictionary Server started, Time spent " + (System.currentTimeMillis() - start)
+            + " Listening on port " + newPort);
+        this.port = newPort;
+        this.host = hostToBind;
+        break;
+      } catch (Exception e) {
+        LOGGER.error(e, "Dictionary Server Failed to bind to port:");
+        if (i == 9) {
+          throw new RuntimeException("Dictionary Server Could not bind to any port");
+        }
+      }
+      i++;
+    }
+  }
+
+  /**
+   * @return Port on which the NonSecureDictionaryServer has started.
+   */
+  @Override public int getPort() {
+    return port;
+  }
+
+  @Override public String getSecretKey() {
+    return null;
+  }
+
+  @Override public boolean isEncryptSecureServer() {
+    return false;
+  }
+
+  @Override public String getHost() {
+    return host;
+  }
+
+  /**
+   * shutdown dictionary server
+   *
+   * @throws Exception
+   */
+  @Override public void shutdown() throws Exception {
+    LOGGER.info("Shutting down dictionary server");
+    worker.shutdownGracefully();
+    boss.shutdownGracefully();
+  }
+
+  /**
+   * Write Dictionary for one table.
+   *
+   * @throws Exception
+   */
+
+  @Override
+  public void writeTableDictionary(String uniqueTableName) throws Exception {
+    DictionaryMessage key = new DictionaryMessage();
+    key.setTableUniqueId(uniqueTableName);
+    key.setType(DictionaryMessageType.WRITE_TABLE_DICTIONARY);
+    nonSecureDictionaryServerHandler.processMessage(key);
+  }
+
+  public void initializeDictionaryGenerator(CarbonTable carbonTable) throws Exception {
+    nonSecureDictionaryServerHandler.initializeTable(carbonTable);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServerHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServerHandler.java
new file mode 100644
index 0000000..dc3d078
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServerHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.dictionary.server;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.dictionary.generator.ServerDictionaryGenerator;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+/**
+ * Handler for Dictionary server.
+ */
+@ChannelHandler.Sharable public class NonSecureDictionaryServerHandler
+    extends ChannelInboundHandlerAdapter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(NonSecureDictionaryServerHandler.class.getName());
+
+  /**
+   * dictionary generator
+   */
+  private ServerDictionaryGenerator generatorForServer = new ServerDictionaryGenerator();
+
+  /**
+   * channel registered
+   *
+   * @param ctx
+   * @throws Exception
+   */
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    LOGGER.audit("Connected " + ctx);
+    super.channelActive(ctx);
+  }
+
+  @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    try {
+      ByteBuf data = (ByteBuf) msg;
+      DictionaryMessage key = new DictionaryMessage();
+      key.readSkipLength(data);
+      data.release();
+      int outPut = processMessage(key);
+      key.setDictionaryValue(outPut);
+      // Send back the response
+      ByteBuf buffer = ctx.alloc().buffer();
+      key.writeData(buffer);
+      ctx.writeAndFlush(buffer);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw e;
+    }
+  }
+
+  /**
+   * handle exceptions
+   *
+   * @param ctx
+   * @param cause
+   */
+  @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    LOGGER.error(cause, "exceptionCaught");
+    ctx.close();
+  }
+
+  /**
+   * process message by message type
+   *
+   * @param key
+   * @return
+   * @throws Exception
+   */
+  public int processMessage(DictionaryMessage key) throws Exception {
+    switch (key.getType()) {
+      case DICT_GENERATION:
+        generatorForServer.initializeGeneratorForColumn(key);
+        return generatorForServer.generateKey(key);
+      case SIZE:
+        generatorForServer.initializeGeneratorForColumn(key);
+        return generatorForServer.size(key);
+      case WRITE_TABLE_DICTIONARY:
+        generatorForServer
+            .writeTableDictionaryData(key.getTableUniqueId());
+        return 0;
+      default:
+        return -1;
+    }
+  }
+
+  void initializeTable(CarbonTable carbonTable) {
+    generatorForServer.initializeGeneratorForTable(carbonTable);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/service/AbstractDictionaryServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/service/AbstractDictionaryServer.java b/core/src/main/java/org/apache/carbondata/core/dictionary/service/AbstractDictionaryServer.java
new file mode 100644
index 0000000..754f253
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/service/AbstractDictionaryServer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.dictionary.service;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+
+import org.apache.commons.lang3.SystemUtils;
+
+public abstract class AbstractDictionaryServer {
+
+  public String findLocalIpAddress(LogService LOGGER) {
+    try {
+      String defaultIpOverride = System.getenv("SPARK_LOCAL_IP");
+      if (defaultIpOverride != null) {
+        return defaultIpOverride;
+      } else {
+        InetAddress address = InetAddress.getLocalHost();
+        if (address.isLoopbackAddress()) {
+          // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
+          // a better address using the local network interfaces
+          // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order
+          // on unix-like system. On windows, it returns in index order.
+          // It's more proper to pick ip address following system output order.
+          Enumeration<NetworkInterface> activeNetworkIFs = NetworkInterface.getNetworkInterfaces();
+          List<NetworkInterface> reOrderedNetworkIFs = new ArrayList<NetworkInterface>();
+          while (activeNetworkIFs.hasMoreElements()) {
+            reOrderedNetworkIFs.add(activeNetworkIFs.nextElement());
+          }
+
+          if (!SystemUtils.IS_OS_WINDOWS) {
+            Collections.reverse(reOrderedNetworkIFs);
+          }
+
+          for (NetworkInterface ni : reOrderedNetworkIFs) {
+            Enumeration<InetAddress> inetAddresses = ni.getInetAddresses();
+            while (inetAddresses.hasMoreElements()) {
+              InetAddress addr = inetAddresses.nextElement();
+              if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress()
+                  && addr instanceof Inet4Address) {
+                // We've found an address that looks reasonable!
+                LOGGER.warn("Your hostname, " + InetAddress.getLocalHost().getHostName()
+                    + " resolves to a loopback address: " + address.getHostAddress() + "; using "
+                    + addr.getHostAddress() + " instead (on interface " + ni.getName() + ")");
+                LOGGER.warn("Set SPARK_LOCAL_IP if you need to bind to another address");
+                return addr.getHostAddress();
+              }
+            }
+            LOGGER.warn("Your hostname, " + InetAddress.getLocalHost().getHostName()
+                + " resolves to a loopback address: " + address.getHostAddress()
+                + ", but we couldn't find any external IP address!");
+            LOGGER.warn("Set SPARK_LOCAL_IP if you need to bind to another address");
+          }
+        }
+        return address.getHostAddress();
+      }
+    } catch (UnknownHostException e) {
+      LOGGER.error("do not get local host address:" + e.getMessage());
+      throw new RuntimeException(e.getMessage());
+    } catch (SocketException e) {
+      LOGGER.error("do not get net work interface:" + e.getMessage());
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/service/DictionaryOnePassService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/service/DictionaryOnePassService.java b/core/src/main/java/org/apache/carbondata/core/dictionary/service/DictionaryOnePassService.java
new file mode 100644
index 0000000..1625f40
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/service/DictionaryOnePassService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.dictionary.service;
+
+public class DictionaryOnePassService {
+  private static DictionaryServiceProvider dictionaryServiceProvider = null;
+
+  public static void setDictionaryServiceProvider(DictionaryServiceProvider dictionaryServiceProv) {
+    dictionaryServiceProvider = dictionaryServiceProv;
+  }
+
+  public static synchronized DictionaryServiceProvider getDictionayProvider() {
+    return dictionaryServiceProvider;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/service/DictionaryServiceProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/service/DictionaryServiceProvider.java b/core/src/main/java/org/apache/carbondata/core/dictionary/service/DictionaryServiceProvider.java
new file mode 100644
index 0000000..4dfa1e9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/service/DictionaryServiceProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.dictionary.service;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+
+public interface DictionaryServiceProvider extends Serializable {
+
+  public DictionaryClient getDictionaryClient();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/main/java/org/apache/carbondata/core/dictionary/service/NonSecureDictionaryServiceProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/service/NonSecureDictionaryServiceProvider.java b/core/src/main/java/org/apache/carbondata/core/dictionary/service/NonSecureDictionaryServiceProvider.java
new file mode 100644
index 0000000..70354cb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/service/NonSecureDictionaryServiceProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.dictionary.service;
+
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.client.NonSecureDictionaryClient;
+
+public class NonSecureDictionaryServiceProvider implements DictionaryServiceProvider {
+  private int port = 0;
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  public NonSecureDictionaryServiceProvider(int port) {
+    this.port = port;
+  }
+
+  //  @Override public DictionaryServer getDictionaryServer() {
+  //    return NonSecureDictionaryServer.getInstance(port);
+  //  }
+
+  @Override public DictionaryClient getDictionaryClient() {
+    return new NonSecureDictionaryClient();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
index 5318f3d..2fd69f8 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
 import org.apache.carbondata.core.dictionary.server.DictionaryServer;
+import org.apache.carbondata.core.dictionary.server.NonSecureDictionaryServer;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -56,6 +57,7 @@ public class DictionaryClientTest {
   private static TableInfo tableInfo;
   private static String storePath;
   private static DictionaryServer server;
+  private static String host;
 
   @BeforeClass public static void setUp() throws Exception {
     // enable lru cache by setting cache size
@@ -96,12 +98,13 @@ public class DictionaryClientTest {
     metadata.addCarbonTable(carbonTable);
 
     // Start the server for testing the client
-    server = DictionaryServer.getInstance(5678, carbonTable);
+    server = NonSecureDictionaryServer.getInstance(5678, carbonTable);
+    host = server.getHost();
   }
 
   @Test public void testClient() throws Exception {
-    DictionaryClient client = new DictionaryClient();
-    client.startClient("localhost", 5678);
+    NonSecureDictionaryClient client = new NonSecureDictionaryClient();
+    client.startClient(null, host, 5678, false);
 
     Thread.sleep(1000);
     // Create a dictionary key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index a42b366..76afcbf 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -62,9 +62,9 @@ object CarbonSessionExample {
 
     spark.sql(
       s"""
-        | SELECT *
-        | FROM carbon_table
-        | WHERE stringfield = 'spark' AND decimalField > 40
+         | SELECT *
+         | FROM carbon_table
+         | WHERE stringfield = 'spark' AND decimalField > 40
       """.stripMargin).show()
 
     spark.sql(
@@ -90,21 +90,21 @@ object CarbonSessionExample {
 
     spark.sql(
       s"""
-        | SELECT t1.*, t2.*
-        | FROM carbon_table t1, carbon_table t2
-        | WHERE t1.stringField = t2.stringField
+         | SELECT t1.*, t2.*
+         | FROM carbon_table t1, carbon_table t2
+         | WHERE t1.stringField = t2.stringField
       """.stripMargin).show()
 
     spark.sql(
       s"""
-        | WITH t1 AS (
-        | SELECT * FROM carbon_table
-        | UNION ALL
-        | SELECT * FROM carbon_table
-        | )
-        | SELECT t1.*, t2.*
-        | FROM t1, carbon_table t2
-        | WHERE t1.stringField = t2.stringField
+         | WITH t1 AS (
+         | SELECT * FROM carbon_table
+         | UNION ALL
+         | SELECT * FROM carbon_table
+         | )
+         | SELECT t1.*, t2.*
+         | FROM t1, carbon_table t2
+         | WHERE t1.stringField = t2.stringField
       """.stripMargin).show()
 
     spark.sql(
@@ -120,4 +120,4 @@ object CarbonSessionExample {
     spark.stop()
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java
new file mode 100644
index 0000000..1c8e2d2
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.dictionary.client;
+
+import java.nio.charset.Charset;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+
+import com.google.common.collect.Lists;
+import io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.spark.SecurityManager;
+import org.apache.spark.SparkConf;
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientBootstrap;
+import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.netty.SparkTransportConf;
+import org.apache.spark.network.sasl.SaslClientBootstrap;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Dictionary client to connect to Dictionary server and generate dictionary values
+ */
+public class SecureDictionaryClient implements DictionaryClient {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SecureDictionaryClient.class.getName());
+
+  private SecureDictionaryClientHandler dictionaryClientHandler =
+      new SecureDictionaryClientHandler();
+
+  private NioEventLoopGroup workerGroup;
+  private TransportClient client;
+  private TransportClientFactory clientFactory;
+
+  /**
+   * start dictionary client
+   *
+   * @param address
+   * @param port
+   */
+  @Override public void startClient(String secretKey, String address, int port,
+      boolean encryptSecureServer) {
+    LOGGER.audit("Starting client on " + address + " " + port);
+    long start = System.currentTimeMillis();
+
+    SecurityManager securityMgr;
+    SparkConf conf = new SparkConf().setAppName("Carbon Dictionary Client");
+
+    conf.set("spark.authenticate", "true");
+
+    if (null != secretKey) {
+      conf.set("spark.authenticate.secret", secretKey);
+    }
+
+    if (encryptSecureServer) {
+      conf.set("spark.authenticate.enableSaslEncryption", "true");
+    }
+
+    TransportConf transportConf =
+        SparkTransportConf.fromSparkConf(conf, "Carbon Dictionary Client", 0);
+    if (null != secretKey) {
+      securityMgr = new SecurityManager(conf, scala.Option.apply(secretKey.getBytes(Charset.forName(
+          CarbonCommonConstants.DEFAULT_CHARSET))));
+    } else {
+      securityMgr = new SecurityManager(conf, null);
+    }
+
+    TransportContext context = new TransportContext(transportConf, dictionaryClientHandler);
+    clientFactory = context.createClientFactory(Lists.<TransportClientBootstrap>newArrayList(
+        new SaslClientBootstrap(transportConf, "Carbon Dictionary Client", securityMgr)));
+
+    try {
+      client = clientFactory.createClient(address, port);
+    } catch (Exception e) {
+      LOGGER.error(e, "Dictionary Client Failed to bind to port:");
+    }
+    LOGGER.info(
+        "Dictionary client Started, Total time spent : " + (System.currentTimeMillis() - start));
+  }
+
+  /**
+   * for client request
+   *
+   * @param key
+   * @return
+   */
+  @Override public DictionaryMessage getDictionary(DictionaryMessage key) {
+    return dictionaryClientHandler.getDictionary(key, this.client);
+  }
+
+  /**
+   * shutdown dictionary client
+   */
+  @Override public void shutDown() {
+    clientFactory.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClientHandler.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClientHandler.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClientHandler.java
new file mode 100644
index 0000000..cdf2553
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClientHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.dictionary.client;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.OneForOneStreamManager;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.StreamManager;
+
+/**
+ * Client handler to get data.
+ */
+public class SecureDictionaryClientHandler extends RpcHandler {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SecureDictionaryClientHandler.class.getName());
+
+  private final BlockingQueue<DictionaryMessage> responseMsgQueue = new LinkedBlockingQueue<>();
+
+  /**
+   * client send request to server
+   *
+   * @param key DictionaryMessage
+   * @return DictionaryMessage
+   */
+  public DictionaryMessage getDictionary(DictionaryMessage key, TransportClient client) {
+    DictionaryMessage dictionaryMessage;
+    ByteBuffer resp = null;
+    try {
+
+      ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer();
+      key.writeData(buffer);
+      resp = client.sendRpcSync(buffer.nioBuffer(), 100000);
+    } catch (Exception e) {
+      LOGGER.error(e, "Error while send request to server ");
+    }
+    try {
+      if (resp == null) {
+        StringBuilder message = new StringBuilder();
+        message.append("DictionaryMessage { ColumnName: ").append(key.getColumnName())
+            .append(", DictionaryValue: ").append(key.getDictionaryValue()).append(", type: ")
+            .append(key.getType()).append(" }");
+        throw new RuntimeException("Request timed out for key : " + message);
+      }
+      DictionaryMessage newKey = new DictionaryMessage();
+      ByteBuf data = Unpooled.wrappedBuffer(resp);
+      newKey.readFullLength(data);
+      data.release();
+      return newKey;
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override public void receive(TransportClient transportClient, ByteBuffer byteBuffer,
+      RpcResponseCallback rpcResponseCallback) {
+    try {
+      ByteBuf data = Unpooled.wrappedBuffer(byteBuffer);
+      DictionaryMessage key = new DictionaryMessage();
+      key.readFullLength(data);
+      data.release();
+      if (responseMsgQueue.offer(key)) {
+        LOGGER.info("key: " + key + " added to queue");
+      } else {
+        LOGGER.error("Failed to add key: " + key + " to queue");
+      }
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw e;
+    }
+  }
+
+  @Override public StreamManager getStreamManager() {
+    return new OneForOneStreamManager();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/provider/SecureDictionaryServiceProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/provider/SecureDictionaryServiceProvider.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/provider/SecureDictionaryServiceProvider.java
new file mode 100644
index 0000000..3db344b
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/provider/SecureDictionaryServiceProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.dictionary.provider;
+
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
+import org.apache.carbondata.spark.dictionary.client.SecureDictionaryClient;
+
+public class SecureDictionaryServiceProvider implements DictionaryServiceProvider {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  @Override public DictionaryClient getDictionaryClient() {
+    return new SecureDictionaryClient();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
new file mode 100644
index 0000000..6dd6581
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.dictionary.server;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
+import org.apache.carbondata.core.dictionary.server.DictionaryServer;
+import org.apache.carbondata.core.dictionary.service.AbstractDictionaryServer;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import com.google.common.collect.Lists;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.spark.SecurityManager;
+import org.apache.spark.SparkConf;
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.netty.SparkTransportConf;
+import org.apache.spark.network.sasl.SaslServerBootstrap;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.server.TransportServerBootstrap;
+import org.apache.spark.network.util.TransportConf;
+import scala.Some;
+
+/**
+ * Dictionary Server to generate dictionary keys.
+ */
+public class SecureDictionaryServer extends AbstractDictionaryServer implements DictionaryServer  {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SecureDictionaryServer.class.getName());
+
+  private SecureDictionaryServerHandler secureDictionaryServerHandler;
+
+  private EventLoopGroup boss;
+  private EventLoopGroup worker;
+  private int port;
+  private String host;
+  private SparkConf conf;
+  private String secretKey = null;
+  private boolean encryptSecureServer;
+  private static Object lock = new Object();
+  private static SecureDictionaryServer INSTANCE = null;
+
+  private SecureDictionaryServer(SparkConf conf, String host, int port) {
+    this.conf = conf;
+    this.host = host;
+    this.port = port;
+    startServer();
+  }
+
+  public static synchronized DictionaryServer getInstance(SparkConf conf, String host, int port,
+      CarbonTable carbonTable) throws Exception {
+    if (INSTANCE == null) {
+      INSTANCE = new SecureDictionaryServer(conf, host, port);
+    }
+    INSTANCE.initializeDictionaryGenerator(carbonTable);
+    return INSTANCE;
+  }
+
+  /**
+   * start dictionary server
+   *
+   */
+  @Override
+  public void startServer() {
+    LOGGER.info("Starting Dictionary Server in Secure Mode");
+    secureDictionaryServerHandler = new SecureDictionaryServerHandler();
+    String workerThreads = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.DICTIONARY_WORKER_THREADS,
+            CarbonCommonConstants.DICTIONARY_WORKER_THREADS_DEFAULT);
+    boss = new NioEventLoopGroup(1);
+    worker = new NioEventLoopGroup(Integer.parseInt(workerThreads));
+    // Configure the server.
+    bindToPort();
+  }
+
+  /**
+   * Binds dictionary server to an available port.
+   *
+   */
+  @Override
+  public void bindToPort() {
+    long start = System.currentTimeMillis();
+    // Configure the server.
+    int i = 0;
+    while (i < 10) {
+      int newPort = port + i;
+      try {
+        SecurityManager securityManager;
+        SparkConf conf = this.conf.clone();
+        conf.setAppName("Carbon Dictionary Server");
+
+        // As spark.network.sasl.serverAlwaysEncrypt is not exposed parameter
+        // set it explicitly so that Dictionary Server and Client communication
+        // occurs encrypted. The below parameter can be removed once spark Documents it.
+        // conf.set("spark.network.sasl.serverAlwaysEncrypt", "true");
+        conf.set("spark.authenticate.enableSaslEncryption", "true");
+
+        if (conf.get("spark.authenticate.enableSaslEncryption", "false").equalsIgnoreCase("true")) {
+          setEncryptSecureServer(true);
+        } else {
+          setEncryptSecureServer(false);
+        }
+
+        TransportConf transportConf =
+            SparkTransportConf.fromSparkConf(conf, "Carbon Dictionary Server", 0);
+        securityManager = new SecurityManager(conf, Some.<byte[]>empty());
+        secretKey = securityManager.getSecretKey();
+        TransportContext context =
+            new TransportContext(transportConf, secureDictionaryServerHandler);
+        TransportServerBootstrap bootstrap =
+            new SaslServerBootstrap(transportConf, securityManager);
+        String host = findLocalIpAddress(LOGGER);
+        TransportServer transportServer = context
+            .createServer(host, port, Lists.<TransportServerBootstrap>newArrayList(bootstrap));
+        LOGGER.audit("Dictionary Server started, Time spent " + (System.currentTimeMillis() - start)
+            + " Listening on port " + newPort);
+        this.port = newPort;
+        this.host = host;
+        break;
+      } catch (Exception e) {
+        LOGGER.error(e, "Dictionary Server Failed to bind to port:");
+        if (i == 9) {
+          throw new RuntimeException("Dictionary Server Could not bind to any port");
+        }
+      }
+      i++;
+    }
+  }
+
+  private void setEncryptSecureServer(boolean encryptSecureServer) {
+    this.encryptSecureServer = encryptSecureServer;
+  }
+
+  /**
+   * @return Port on which the SecureDictionaryServer has started.
+   */
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  /**
+   * @return IP address on which the SecureDictionaryServer has Started.
+   */
+  @Override
+  public String getHost() {
+    return host;
+  }
+
+  /**
+   * @return Secret Key of Dictionary Server.
+   */
+  @Override
+  public String getSecretKey() {
+    return secretKey;
+  }
+
+  @Override public boolean isEncryptSecureServer() {
+    return encryptSecureServer;
+  }
+
+  /**
+   * shutdown dictionary server
+   *
+   * @throws Exception
+   */
+  @Override
+  public void shutdown() throws Exception {
+    LOGGER.info("Shutting down dictionary server");
+    worker.shutdownGracefully();
+    boss.shutdownGracefully();
+  }
+
+  public void initializeDictionaryGenerator(CarbonTable carbonTable) throws Exception {
+    secureDictionaryServerHandler.initializeTable(carbonTable);
+  }
+
+  /**
+   * Write Dictionary for one table.
+   *
+   * @throws Exception
+   */
+
+  @Override
+  public void writeTableDictionary(String uniqueTableName) throws Exception {
+    DictionaryMessage key = new DictionaryMessage();
+    key.setTableUniqueId(uniqueTableName);
+    key.setType(DictionaryMessageType.WRITE_TABLE_DICTIONARY);
+    secureDictionaryServerHandler.processMessage(key);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServerHandler.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServerHandler.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServerHandler.java
new file mode 100644
index 0000000..aaa4cf0
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServerHandler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.dictionary.server;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.dictionary.generator.ServerDictionaryGenerator;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.OneForOneStreamManager;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.StreamManager;
+
+/**
+ * Handler for Dictionary server.
+ */
+@ChannelHandler.Sharable public class SecureDictionaryServerHandler extends RpcHandler {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SecureDictionaryServerHandler.class.getName());
+
+  /**
+   * dictionary generator
+   */
+  private ServerDictionaryGenerator generatorForServer = new ServerDictionaryGenerator();
+
+  /**
+   * process message by message type
+   *
+   * @param key
+   * @return
+   * @throws Exception
+   */
+  public int processMessage(DictionaryMessage key) throws Exception {
+    switch (key.getType()) {
+      case DICT_GENERATION:
+        generatorForServer.initializeGeneratorForColumn(key);
+        return generatorForServer.generateKey(key);
+      case SIZE:
+        generatorForServer.initializeGeneratorForColumn(key);
+        return generatorForServer.size(key);
+      case WRITE_TABLE_DICTIONARY:
+        generatorForServer.writeTableDictionaryData(key.getTableUniqueId());
+        return 0;
+      default:
+        return -1;
+    }
+  }
+
+  @Override public void receive(TransportClient transportClient, ByteBuffer byteBuffer,
+      RpcResponseCallback rpcResponseCallback) {
+    try {
+      ByteBuf data = Unpooled.wrappedBuffer(byteBuffer);
+      DictionaryMessage key = new DictionaryMessage();
+      key.readFullLength(data);
+      data.release();
+      int outPut = processMessage(key);
+      key.setDictionaryValue(outPut);
+      // Send back the response
+      ByteBuf buff = ByteBufAllocator.DEFAULT.buffer();
+      key.writeData(buff);
+      rpcResponseCallback.onSuccess(buff.nioBuffer());
+    } catch (Exception e) {
+      LOGGER.error(e);
+    }
+  }
+
+  @Override public StreamManager getStreamManager() {
+    return new OneForOneStreamManager();
+  }
+
+  public void initializeTable(CarbonTable carbonTable) {
+    generatorForServer.initializeGeneratorForTable(carbonTable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4daf0634/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
new file mode 100644
index 0000000..e69de29


Mime
View raw message