carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [09/22] incubator-carbondata git commit: IUD delete flow support
Date Fri, 06 Jan 2017 13:57:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/fileoperations/AtomicFileOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/fileoperations/AtomicFileOperationsImpl.java
b/core/src/main/java/org/apache/carbondata/fileoperations/AtomicFileOperationsImpl.java
new file mode 100644
index 0000000..8e4c502
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/fileoperations/AtomicFileOperationsImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.fileoperations;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
+
+public class AtomicFileOperationsImpl implements AtomicFileOperations {
+
+  private String filePath;
+
+  private FileType fileType;
+
+  private String tempWriteFilePath;
+
+  private DataOutputStream dataOutStream;
+
+  public AtomicFileOperationsImpl(String filePath, FileType fileType) {
+    this.filePath = filePath;
+
+    this.fileType = fileType;
+  }
+
+  @Override public DataInputStream openForRead() throws IOException {
+    return FileFactory.getDataInputStream(filePath, fileType);
+  }
+
+  @Override public DataOutputStream openForWrite(FileWriteOperation operation) throws IOException
{
+
+    filePath = filePath.replace("\\", "/");
+
+    tempWriteFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
+
+    if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
+      FileFactory.getCarbonFile(tempWriteFilePath, fileType).delete();
+    }
+
+    FileFactory.createNewFile(tempWriteFilePath, fileType);
+
+    dataOutStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType);
+
+    return dataOutStream;
+
+  }
+
+  /* (non-Javadoc)
+   * @see com.huawei.unibi.carbon.datastorage.store.fileperations.AtomicFileOperations#close()
+   */
+  @Override public void close() throws IOException {
+
+    if (null != dataOutStream) {
+      dataOutStream.close();
+
+      CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+
+      if (!tempFile.renameForce(filePath)) {
+        throw new IOException("temporary file renaming failed, src="
+            + tempFile.getPath() + ", dest=" + filePath);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/locks/AbstractCarbonLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/locks/AbstractCarbonLock.java b/core/src/main/java/org/apache/carbondata/locks/AbstractCarbonLock.java
new file mode 100644
index 0000000..934f447
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/locks/AbstractCarbonLock.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.locks;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * This is the abstract class of the lock implementations.This handles the
+ * retrying part of the locking.
+ */
+public abstract class AbstractCarbonLock implements ICarbonLock {
+  private int retryCount;
+
+  private int retryTimeout;
+
+  public abstract boolean lock();
+
+  /**
+   * API for enabling the locking of file with retries.
+   */
+  public boolean lockWithRetries() {
+    try {
+      for (int i = 0; i < retryCount; i++) {
+        if (lock()) {
+          return true;
+        } else {
+          Thread.sleep(retryTimeout * 1000L);
+        }
+      }
+    } catch (InterruptedException e) {
+      return false;
+    }
+    return false;
+  }
+
+  /**
+   * Initializes the retry count and retry timeout.
+   * This will determine how many times to retry to acquire lock and the retry timeout.
+   */
+  protected void initRetry() {
+    String retries = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK);
+    try {
+      retryCount = Integer.parseInt(retries);
+    } catch (NumberFormatException e) {
+      retryCount = CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK_DEFAULT;
+    }
+
+    String maxTimeout = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK);
+    try {
+      retryTimeout = Integer.parseInt(maxTimeout);
+    } catch (NumberFormatException e) {
+      retryTimeout = CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK_DEFAULT;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/locks/CarbonLockUtil.java
new file mode 100644
index 0000000..181e92b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/locks/CarbonLockUtil.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.locks;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * This class contains all carbon lock utilities
+ */
+public class CarbonLockUtil {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonLockUtil.class.getName());
+
+  /**
+   * unlocks given file
+   *
+   * @param carbonLock
+   */
+  public static void fileUnlock(ICarbonLock carbonLock, String locktype) {
+    if (carbonLock.unlock()) {
+      if (locktype.equals(LockUsage.METADATA_LOCK)) {
+        LOGGER.info("Metadata lock has been successfully released");
+      } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) {
+        LOGGER.info("Table status lock has been successfully released");
+      }
+      else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) {
+        LOGGER.info("Clean files lock has been successfully released");
+      }
+      else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) {
+        LOGGER.info("Delete segments lock has been successfully released");
+      }
+    } else {
+      if (locktype.equals(LockUsage.METADATA_LOCK)) {
+        LOGGER.error("Not able to release the metadata lock");
+      } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) {
+        LOGGER.error("Not able to release the table status lock");
+      }
+      else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) {
+        LOGGER.info("Not able to release the clean files lock");
+      }
+      else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) {
+        LOGGER.info("Not able to release the delete segments lock");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/locks/ICarbonLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/locks/ICarbonLock.java b/core/src/main/java/org/apache/carbondata/locks/ICarbonLock.java
new file mode 100644
index 0000000..fbb2030
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/locks/ICarbonLock.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.locks;
+
+/**
+ * Carbon Lock Interface which handles the locking and unlocking.
+ */
+public interface ICarbonLock {
+
+  /**
+   * Does the unlocking of the acquired lock.
+   *
+   * @return
+   */
+  boolean unlock();
+
+  /**
+   * This will acquire the lock and if it doesnt get then it will retry after the confiured
time.
+   *
+   * @return
+   */
+  boolean lockWithRetries();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/locks/LockUsage.java b/core/src/main/java/org/apache/carbondata/locks/LockUsage.java
new file mode 100644
index 0000000..b323447
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/locks/LockUsage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.locks;
+
+/**
+ * This enum is used to define the usecase of the lock.
+ * Each enum value is one specific lock case.
+ */
+public class LockUsage {
+  public static final String LOCK = ".lock";
+  public static final String METADATA_LOCK = "meta.lock";
+  public static final String COMPACTION_LOCK = "compaction.lock";
+  public static final String SYSTEMLEVEL_COMPACTION_LOCK = "system_level_compaction.lock";
+  public static final String TABLE_STATUS_LOCK = "tablestatus.lock";
+  public static final String TABLE_UPDATE_STATUS_LOCK = "tableupdatestatus.lock";
+  public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock";
+  public static final String CLEAN_FILES_LOCK = "clean_files.lock";
+  public static final String DROP_TABLE_LOCK = "droptable.lock";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/locks/ZooKeeperLocking.java b/core/src/main/java/org/apache/carbondata/locks/ZooKeeperLocking.java
new file mode 100644
index 0000000..034cbd0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/locks/ZooKeeperLocking.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.locks;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * For Handling the zookeeper locking implementation
+ */
+public class ZooKeeperLocking extends AbstractCarbonLock {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ZooKeeperLocking.class.getName());
+
+  /**
+   * zk is the zookeeper client instance
+   */
+  private static ZooKeeper zk;
+
+  /**
+   * zooKeeperLocation is the location in the zoo keeper file system where the locks will
be
+   * maintained.
+   */
+  private static final String zooKeeperLocation = CarbonCommonConstants.ZOOKEEPER_LOCATION;
+
+  /**
+   * Unique folder for each table with DatabaseName_TableName
+   */
+  private final String tableIdFolder;
+
+  /**
+   * lockName is the name of the lock to use. This name should be same for every process
that want
+   * to share the same lock
+   */
+  private String lockName;
+
+  /**
+   * lockPath is the unique path created for the each instance of the carbon lock.
+   */
+  private String lockPath;
+
+  private String lockTypeFolder;
+
+  public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) {
+    this(tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName(),
+        lockFile);
+  }
+
+  /**
+   * @param lockLocation
+   * @param lockFile
+   */
+  public ZooKeeperLocking(String lockLocation, String lockFile) {
+    this.lockName = lockFile;
+    this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation;
+
+    String zooKeeperUrl =
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ZOOKEEPER_URL);
+    zk = ZookeeperInit.getInstance(zooKeeperUrl).getZookeeper();
+
+    this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation
+        + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+    try {
+      createBaseNode();
+      // if exists returns null then path doesnt exist. so creating.
+      if (null == zk.exists(this.tableIdFolder, true)) {
+        createRecursivly(this.tableIdFolder);
+      }
+      // if exists returns null then path doesnt exist. so creating.
+      if (null == zk.exists(this.lockTypeFolder, true)) {
+        zk.create(this.lockTypeFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      }
+    } catch (KeeperException | InterruptedException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    initRetry();
+  }
+
+  /**
+   * Creating a znode in which all the znodes (lock files )are maintained.
+   */
+  private void createBaseNode() throws KeeperException, InterruptedException {
+    if (null == zk.exists(zooKeeperLocation, true)) {
+      // creating a znode in which all the znodes (lock files )are maintained.
+      zk.create(zooKeeperLocation, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+  }
+
+  /**
+   * Create zookeepr node if not exist
+   * @param path
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  private void createRecursivly(String path) throws KeeperException, InterruptedException
{
+    try {
+      if (zk.exists(path, true) == null && path.length() > 0) {
+        String temp = path.substring(0, path.lastIndexOf(File.separator));
+        createRecursivly(temp);
+        zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      } else {
+        return;
+      }
+    } catch (KeeperException e) {
+      throw e;
+    } catch (InterruptedException e) {
+      throw e;
+    }
+
+  }
+  /**
+   * Handling of the locking mechanism using zoo keeper.
+   */
+  @Override public boolean lock() {
+    try {
+      // create the lock file with lockName.
+      lockPath =
+          zk.create(this.lockTypeFolder + CarbonCommonConstants.FILE_SEPARATOR + lockName,
null,
+              Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+      // get the children present in zooKeeperLocation.
+      List<String> nodes = zk.getChildren(this.lockTypeFolder, null);
+
+      // sort the childrens
+      Collections.sort(nodes);
+
+      // here the logic is , for each lock request zookeeper will create a file ending with
+      // incremental digits.
+      // so first request will be 00001 next is 00002 and so on.
+      // if the current request is 00002 and already one previous request(00001) is present
then get
+      // children will give both nodes.
+      // after the sort we are checking if the lock path is first or not .if it is first
then lock
+      // has been acquired.
+
+      if (lockPath.endsWith(nodes.get(0))) {
+        return true;
+      } else {
+        // if locking failed then deleting the created lock as next time again new lock file
will be
+        // created.
+        zk.delete(lockPath, -1);
+        return false;
+      }
+    } catch (KeeperException | InterruptedException e) {
+      LOGGER.error(e, e.getMessage());
+      return false;
+    }
+  }
+
+  /**
+   * @return status where lock file is unlocked or not.
+   */
+  @Override public boolean unlock() {
+    try {
+      // exists will return null if the path doesn't exists.
+      if (null != zk.exists(lockPath, true)) {
+        zk.delete(lockPath, -1);
+        lockPath = null;
+      }
+    } catch (KeeperException | InterruptedException e) {
+      LOGGER.error(e, e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
index 1c21fab..5c0625a 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -32,7 +32,8 @@ import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
 
 public final class LoadMetadataUtil {
   private LoadMetadataUtil() {
@@ -47,7 +48,7 @@ public final class LoadMetadataUtil {
     if (details != null && details.length != 0) {
       for (LoadMetadataDetails oneRow : details) {
         if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus())
-            || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneRow.getLoadStatus()))
+            || CarbonCommonConstants.COMPACTED.equalsIgnoreCase(oneRow.getLoadStatus()))
             && oneRow.getVisibility().equalsIgnoreCase("true")) {
           return true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 208f993..b8479ce 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.execution.command
 import java.util
 import java.util.UUID
 
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager
+import org.apache.carbondata.spark.load.FailureCauses
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
 
@@ -75,6 +78,8 @@ case class BucketFields(bucketColumns: Seq[String], numberOfBuckets: Int)
 
 case class DataLoadTableFileMapping(table: String, loadPath: String)
 
+case class ExecutionErrors(var failureCauses: FailureCauses, var errorMsg: String )
+
 case class CarbonMergerMapping(storeLocation: String,
     storePath: String,
     metadataFilePath: String,
@@ -85,6 +90,7 @@ case class CarbonMergerMapping(storeLocation: String,
     factTableName: String,
     validSegments: Array[String],
     tableId: String,
+    campactionType: CompactionType,
     // maxSegmentColCardinality is Cardinality of last segment of compaction
     var maxSegmentColCardinality: Array[Int],
     // maxSegmentColumnSchemaList is list of column schema of last segment of compaction
@@ -92,8 +98,16 @@ case class CarbonMergerMapping(storeLocation: String,
 
 case class NodeInfo(TaskId: String, noOfBlocks: Int)
 
-case class AlterTableModel(dbName: Option[String], tableName: String,
-    compactionType: String, alterSql: String)
+case class AlterTableModel(dbName: Option[String],
+                           tableName: String,
+                           segmentUpdateStatusManager: Option[SegmentUpdateStatusManager],
+                           compactionType: String,
+                           factTimeStamp: Option[Long],
+                           alterSql: String)
+
+case class UpdateTableModel(isUpdate: Boolean,
+                            updatedTimeStamp: Long,
+                            var executorErrors: ExecutionErrors)
 
 case class CompactionModel(compactionSize: Long,
     compactionType: CompactionType,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 62ea24d..0904bbb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -40,6 +40,7 @@ import org.codehaus.jackson.map.ObjectMapper
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
@@ -54,7 +55,7 @@ import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
 
 object Checker {
@@ -198,6 +199,8 @@ private[sql] case class DeleteLoadsById(
     databaseNameOp: Option[String],
     tableName: String) extends RunnableCommand {
 
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   def run(sqlContext: SQLContext): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
     CarbonStore.deleteLoadById(
@@ -206,6 +209,16 @@ private[sql] case class DeleteLoadsById(
       tableName
     )
     Seq.empty
+
+  }
+
+  // validates load ids
+  private def validateLoadIds: Unit = {
+    if (loadids.isEmpty) {
+      val errorMessage = "Error: Segment id(s) should not be empty."
+      throw new MalformedCarbonCommandException(errorMessage)
+
+    }
   }
 }
 
@@ -215,6 +228,8 @@ private[sql] case class DeleteLoadsByLoadDate(
     dateField: String,
     loadDate: String) extends RunnableCommand {
 
+  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema")
+
   def run(sqlContext: SQLContext): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
     CarbonStore.deleteLoadByDate(
@@ -223,6 +238,45 @@ private[sql] case class DeleteLoadsByLoadDate(
       tableName
     )
     Seq.empty
+
+  }
+
+}
+
+object LoadTable {
+
+  def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      model: DictionaryLoadModel,
+      noDictDimension: Array[CarbonDimension]): Unit = {
+
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
+      model.table)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+
+    // read TableInfo
+    val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
+
+    // modify TableInfo
+    val columns = tableInfo.getFact_table.getTable_columns
+    for (i <- 0 until columns.size) {
+      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId)))
{
+        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
+      }
+    }
+
+    // write TableInfo
+    CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
+
+    // update Metadata
+    val catalog = CarbonEnv.get.carbonMetastore
+    catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
+      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+
+    // update CarbonDataLoadSchema
+    val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
+      model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
   }
 
 }
@@ -310,6 +364,12 @@ case class LoadTable(
       carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
       carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
       carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+      if (dimFilesPath.isEmpty) {
+        carbonLoadModel.setDimFolderPath(null)
+      } else {
+        val x = dimFilesPath.map(f => f.table + ":" + CarbonUtil.checkAndAppendHDFSUrl(f.loadPath))
+        carbonLoadModel.setDimFolderPath(x.mkString(","))
+      }
 
       val table = relation.tableMeta.carbonTable
       carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
@@ -642,6 +702,7 @@ private[sql] case class ShowLoads(
     limit: Option[String],
     override val output: Seq[Attribute]) extends RunnableCommand {
 
+
   override def run(sqlContext: SQLContext): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
     CarbonStore.showSegments(
@@ -650,6 +711,7 @@ private[sql] case class ShowLoads(
       limit
     )
   }
+
 }
 
 private[sql] case class DescribeCommandFormatted(
@@ -730,10 +792,61 @@ private[sql] case class DescribeCommandFormatted(
   }
 }
 
+private[sql] case class DeleteLoadByDate(
+    databaseNameOp: Option[String],
+    tableName: String,
+    dateField: String,
+    dateValue: String
+) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sqlContext: SQLContext): Seq[Row] = {
+    val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
+    LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
+    var level: String = ""
+    val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+      .getInstance().getCarbonTable(dbName + '_' + tableName)
+    if (relation == null) {
+      LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not
exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
+      filter => filter.name.equalsIgnoreCase(dateField) &&
+                filter.dataType.isInstanceOf[TimestampType]).toList
+    if (matches.isEmpty) {
+      LOGGER.audit("The delete load by date is failed. " +
+                   s"Table $dbName.$tableName does not contain date field: $dateField")
+      sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
+    } else {
+      level = matches.asJava.get(0).name
+    }
+    val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
+      .getColName
+    DataManagementFunc.deleteLoadByDate(
+      sqlContext,
+      new CarbonDataLoadSchema(carbonTable),
+      dbName,
+      tableName,
+      CarbonEnv.get.carbonMetastore.storePath,
+      level,
+      actualColName,
+      dateValue)
+    LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
+    Seq.empty
+  }
+
+}
+
 private[sql] case class CleanFiles(
     databaseNameOp: Option[String],
     tableName: String) extends RunnableCommand {
 
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   def run(sqlContext: SQLContext): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
     CarbonStore.cleanFiles(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index 6d3cdec..d44c076 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -22,6 +22,8 @@ package org.apache.carbondata.spark.testsuite.dataretention
 import java.io.File
 import java.text.SimpleDateFormat
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+import org.apache.carbondata.locks.{LockUsage, CarbonLockFactory, ICarbonLock}
 import org.apache.commons.lang3.time.DateUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.CarbonHiveContext._
@@ -33,8 +35,6 @@ import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableId
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.LoadMetadataDetails
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
@@ -114,7 +114,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
   private def getSegmentStartTime(segments: Array[LoadMetadataDetails],
       segmentId: Integer): String = {
     val segmentLoadTimeString = segments(segmentId).getLoadStartTime()
-    var loadTime = carbonDateFormat.parse(segmentLoadTimeString)
+    var loadTime = carbonDateFormat.parse(String.valueOf(segmentLoadTimeString))
     // add one min to execute delete before load start time command
     loadTime = DateUtils.addMinutes(loadTime, 1)
     defaultDateFormat.format(loadTime)


Mime
View raw message