carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajan...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4073] Added FT for missing scenarios and removed dead code in Presto integration
Date Wed, 30 Dec 2020 08:07:42 GMT
This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8831af4  [CARBONDATA-4073] Added FT for missing scenarios and removed dead code in Presto integration
8831af4 is described below

commit 8831af4884d2670ea35c140cd39f19f6f329c46a
Author: akkio-97 <akshay.nuthala@gmail.com>
AuthorDate: Mon Nov 30 00:54:12 2020 +0530

    [CARBONDATA-4073] Added FT for missing scenarios and removed dead code in Presto integration
    
    Why is this PR needed?
    FT for following cases has been added. Here store is created by spark and it is read by Presto.
    
    update without local-dict
    delete operations on table
    minor, major, custom compaction
    add and delete segments
    test update with inverted index
    read with partition columns
    Filter on partition columns
    Bloom index
    test range columns
    read streaming data
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4031
---
 .../presto/CarbonColumnVectorWrapper.java          | 365 -------------------
 .../presto/PrestoCarbonVectorizedRecordReader.java |   7 +-
 .../presto/CarbondataColumnConstraint.java         |  93 -----
 .../presto/CarbondataColumnConstraint.java         |  93 -----
 .../apache/carbondata/presto/PrestoFilterUtil.java |  74 ----
 .../carbondata/presto/impl/CarbonTableReader.java  |  23 --
 .../PrestoTestUsingSparkStore.scala                | 328 ++++++++++++++++++
 .../dataload/SparkStoreCreatorForPresto.scala      | 385 +++++++++++++++++++++
 8 files changed, 715 insertions(+), 653 deletions(-)

diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
deleted file mode 100644
index b3ffb01..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.math.BigDecimal;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
-import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
-import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
-import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
-
-public class CarbonColumnVectorWrapper implements CarbonColumnVector {
-
-  private CarbonColumnVectorImpl columnVector;
-  private CarbonColumnVectorWrapper dictionaryVectorWrapper;
-
-  private boolean[] filteredRows;
-
-  private int counter;
-
-  private boolean filteredRowsExist;
-
-  private DataType blockDataType;
-
-  public CarbonColumnVectorWrapper(CarbonColumnVectorImpl columnVector, boolean[] filteredRows) {
-    this.columnVector = columnVector;
-    this.filteredRows = filteredRows;
-
-    if (columnVector.getDictionaryVector() != null) {
-      dictionaryVectorWrapper = new CarbonColumnVectorWrapper(
-              (CarbonColumnVectorImpl)columnVector.getDictionaryVector(), filteredRows);
-    }
-  }
-
-  public CarbonColumnVector getColumnVector() {
-    return columnVector;
-  }
-
-  @Override
-  public void putBoolean(int rowId, boolean value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putBoolean(counter++, value);
-    }
-  }
-
-  @Override
-  public void putFloat(int rowId, float value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putFloat(counter++, value);
-    }
-  }
-
-  @Override
-  public void putShort(int rowId, short value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putShort(counter++, value);
-    }
-  }
-
-  @Override
-  public void putShorts(int rowId, int count, short value) {
-    if (filteredRowsExist) {
-      for (int i = 0; i < count; i++) {
-        if (!filteredRows[rowId]) {
-          columnVector.putShort(counter++, value);
-        }
-        rowId++;
-      }
-    } else {
-      columnVector.putShorts(rowId, count, value);
-    }
-  }
-
-  @Override
-  public void putInt(int rowId, int value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putInt(counter++, value);
-    }
-  }
-
-  @Override
-  public void putInts(int rowId, int count, int value) {
-    if (filteredRowsExist) {
-      for (int i = 0; i < count; i++) {
-        if (!filteredRows[rowId]) {
-          columnVector.putInt(counter++, value);
-        }
-        rowId++;
-      }
-    } else {
-      columnVector.putInts(rowId, count, value);
-    }
-  }
-
-  @Override
-  public void putLong(int rowId, long value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putLong(counter++, value);
-    }
-  }
-
-  @Override
-  public void putLongs(int rowId, int count, long value) {
-    if (filteredRowsExist) {
-      for (int i = 0; i < count; i++) {
-        if (!filteredRows[rowId]) {
-          columnVector.putLong(counter++, value);
-        }
-        rowId++;
-      }
-    } else {
-      columnVector.putLongs(rowId, count, value);
-    }
-  }
-
-  @Override
-  public void putDecimal(int rowId, BigDecimal value, int precision) {
-    if (!filteredRows[rowId]) {
-      columnVector.putDecimal(counter++, value, precision);
-    }
-  }
-
-  @Override
-  public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
-    for (int i = 0; i < count; i++) {
-      if (!filteredRows[rowId]) {
-        columnVector.putDecimal(counter++, value, precision);
-      }
-      rowId++;
-    }
-  }
-
-  @Override
-  public void putDouble(int rowId, double value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putDouble(counter++, value);
-    }
-  }
-
-  @Override
-  public void putDoubles(int rowId, int count, double value) {
-    if (filteredRowsExist) {
-      for (int i = 0; i < count; i++) {
-        if (!filteredRows[rowId]) {
-          columnVector.putDouble(counter++, value);
-        }
-        rowId++;
-      }
-    } else {
-      columnVector.putDoubles(rowId, count, value);
-    }
-  }
-
-  @Override
-  public void putByte(int rowId, byte value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putByte(counter++, value);
-    }
-  }
-
-  @Override
-  public void putByteArray(int rowId, byte[] value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putByteArray(counter++, value);
-    }
-  }
-
-  @Override
-  public void putByteArray(int rowId, int count, byte[] value) {
-    for (int i = 0; i < count; i++) {
-      if (!filteredRows[rowId]) {
-        columnVector.putByteArray(counter++, value);
-      }
-      rowId++;
-    }
-  }
-
-  @Override
-  public void putByteArray(int rowId, int offset, int length, byte[] value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putByteArray(counter++, offset, length, value);
-    }
-  }
-
-  @Override
-  public void putNull(int rowId) {
-    if (!filteredRows[rowId]) {
-      columnVector.putNull(counter++);
-    }
-  }
-
-  @Override
-  public void putNulls(int rowId, int count) {
-    if (filteredRowsExist) {
-      for (int i = 0; i < count; i++) {
-        if (!filteredRows[rowId]) {
-          columnVector.putNull(counter++);
-        }
-        rowId++;
-      }
-    } else {
-      columnVector.putNulls(rowId, count);
-    }
-  }
-
-  @Override
-  public void putNotNull(int rowId) {
-
-  }
-
-  @Override
-  public void putNotNull(int rowId, int count) {
-
-  }
-
-  @Override
-  public boolean isNull(int rowId) {
-    return columnVector.isNullAt(rowId);
-  }
-
-  @Override
-  public void putObject(int rowId, Object obj) {
-    //TODO handle complex types
-  }
-
-  @Override
-  public Object getData(int rowId) {
-    //TODO handle complex types
-    return null;
-  }
-
-  @Override
-  public void reset() {
-    counter = 0;
-    filteredRowsExist = false;
-  }
-
-  @Override
-  public DataType getType() {
-    return columnVector.getType();
-  }
-
-  @Override
-  public DataType getBlockDataType() {
-    return blockDataType;
-  }
-
-  @Override
-  public void setBlockDataType(DataType blockDataType) {
-    this.blockDataType = blockDataType;
-  }
-
-  @Override
-  public void setFilteredRowsExist(boolean filteredRowsExist) {
-    this.filteredRowsExist = filteredRowsExist;
-  }
-
-  @Override
-  public void setDictionary(CarbonDictionary dictionary) {
-    this.columnVector.setDictionary(dictionary);
-  }
-
-  @Override
-  public boolean hasDictionary() {
-    return this.columnVector.hasDictionary();
-  }
-
-  @Override
-  public CarbonColumnVector getDictionaryVector() {
-    return this.dictionaryVectorWrapper;
-  }
-
-  @Override
-  public void putFloats(int rowId, int count, float[] src, int srcIndex) {
-    for (int i = srcIndex; i < count; i++) {
-      if (!filteredRows[rowId]) {
-        columnVector.putFloat(counter++, src[i]);
-      }
-      rowId++;
-    }
-  }
-
-  @Override
-  public void putShorts(int rowId, int count, short[] src, int srcIndex) {
-    for (int i = srcIndex; i < count; i++) {
-      if (!filteredRows[rowId]) {
-        columnVector.putShort(counter++, src[i]);
-      }
-      rowId++;
-    }
-  }
-
-  @Override
-  public void putInts(int rowId, int count, int[] src, int srcIndex) {
-    for (int i = srcIndex; i < count; i++) {
-      if (!filteredRows[rowId]) {
-        columnVector.putInt(counter++, src[i]);
-      }
-      rowId++;
-    }
-  }
-
-  @Override
-  public void putLongs(int rowId, int count, long[] src, int srcIndex) {
-    for (int i = srcIndex; i < count; i++) {
-      if (!filteredRows[rowId]) {
-        columnVector.putLong(counter++, src[i]);
-      }
-      rowId++;
-    }
-  }
-
-  @Override
-  public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
-    for (int i = srcIndex; i < count; i++) {
-      if (!filteredRows[rowId]) {
-        columnVector.putDouble(counter++, src[i]);
-      }
-      rowId++;
-    }
-  }
-
-  @Override
-  public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
-    for (int i = srcIndex; i < count; i++) {
-      if (!filteredRows[rowId]) {
-        columnVector.putByte(counter++, src[i]);
-      }
-      rowId++;
-    }
-  }
-
-  @Override
-  public void setLazyPage(LazyPageLoader lazyPage) {
-    lazyPage.loadPage();
-  }
-
-  @Override
-  public void putArray(int rowId, int offset, int length) {
-    if (!filteredRows[rowId]) {
-      columnVector.putArray(counter++, offset, length);
-    }
-  }
-
-  @Override
-  public void putAllByteArray(byte[] data, int offset, int length) {
-    columnVector.putAllByteArray(data, offset, length);
-  }
-}
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
index 6a3c5ed..66b02a9 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -221,11 +221,8 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
     CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
     boolean[] filteredRows = new boolean[columnarBatch.capacity()];
     for (int i = 0; i < fields.length; i++) {
-      if (queryModel.isDirectVectorFill()) {
-        vectors[i] = new ColumnarVectorWrapperDirect(columnarBatch.column(i));
-      } else {
-        vectors[i] = new CarbonColumnVectorWrapper(columnarBatch.column(i), filteredRows);
-      }
+      // TODO: Prepare ColumnarVectorWrapper if row level filtering is needed
+      vectors[i] = new ColumnarVectorWrapperDirect(columnarBatch.column(i));
     }
     carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
   }
diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataColumnConstraint.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataColumnConstraint.java
deleted file mode 100755
index 30d5cf6..0000000
--- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataColumnConstraint.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-import java.util.Optional;
-
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.predicate.Domain;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSetter;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-/**
- * Encapsulating presto Tuple-domain
- */
-public class CarbondataColumnConstraint {
-  private final String name;
-  private final boolean invertedIndexed;
-  private Optional<Domain> domain;
-
-  @JsonCreator public CarbondataColumnConstraint(@JsonProperty("name") String name,
-      @JsonProperty("domain") Optional<Domain> domain,
-      @JsonProperty("invertedIndexed") boolean invertedIndexed) {
-    this.name = requireNonNull(name, "name is null");
-    this.invertedIndexed = requireNonNull(invertedIndexed, "invertedIndexed is null");
-    this.domain = requireNonNull(domain, "domain is null");
-  }
-
-  @JsonProperty
-  public boolean isInvertedIndexed() {
-    return invertedIndexed;
-  }
-
-  @JsonProperty
-  public String getName() {
-    return name;
-  }
-
-  @JsonProperty
-  public Optional<Domain> getDomain() {
-    return domain;
-  }
-
-  @JsonSetter
-  public void setDomain(Optional<Domain> domain) {
-    this.domain = domain;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(name, domain, invertedIndexed);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if ((obj == null) || (getClass() != obj.getClass())) {
-      return false;
-    }
-
-    CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
-    return Objects.equals(this.name, other.name) && Objects.equals(this.domain, other.domain)
-        && Objects.equals(this.invertedIndexed, other.invertedIndexed);
-  }
-
-  @Override
-  public String toString() {
-    return toStringHelper(this).add("name", this.name).add("invertedindexed", this.invertedIndexed)
-        .add("domain", this.domain).toString();
-  }
-}
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataColumnConstraint.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataColumnConstraint.java
deleted file mode 100755
index 0fbc4f5..0000000
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataColumnConstraint.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-import java.util.Optional;
-
-import static java.util.Objects.requireNonNull;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSetter;
-import io.prestosql.spi.predicate.Domain;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-/**
- * Encapsulating presto Tuple-domain
- */
-public class CarbondataColumnConstraint {
-  private final String name;
-  private final boolean invertedIndexed;
-  private Optional<Domain> domain;
-
-  @JsonCreator public CarbondataColumnConstraint(@JsonProperty("name") String name,
-      @JsonProperty("domain") Optional<Domain> domain,
-      @JsonProperty("invertedIndexed") boolean invertedIndexed) {
-    this.name = requireNonNull(name, "name is null");
-    this.invertedIndexed = requireNonNull(invertedIndexed, "invertedIndexed is null");
-    this.domain = requireNonNull(domain, "domain is null");
-  }
-
-  @JsonProperty
-  public boolean isInvertedIndexed() {
-    return invertedIndexed;
-  }
-
-  @JsonProperty
-  public String getName() {
-    return name;
-  }
-
-  @JsonProperty
-  public Optional<Domain> getDomain() {
-    return domain;
-  }
-
-  @JsonSetter
-  public void setDomain(Optional<Domain> domain) {
-    this.domain = domain;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(name, domain, invertedIndexed);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if ((obj == null) || (getClass() != obj.getClass())) {
-      return false;
-    }
-
-    CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
-    return Objects.equals(this.name, other.name) && Objects.equals(this.domain, other.domain)
-        && Objects.equals(this.invertedIndexed, other.invertedIndexed);
-  }
-
-  @Override
-  public String toString() {
-    return toStringHelper(this).add("name", this.name).add("invertedindexed", this.invertedIndexed)
-        .add("domain", this.domain).toString();
-  }
-}
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/PrestoFilterUtil.java
index 310b214..45e7158 100755
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/PrestoFilterUtil.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/PrestoFilterUtil.java
@@ -107,80 +107,6 @@ public class PrestoFilterUtil {
   }
 
   /**
-   * Return partition filters using domain constraints
-   * @param carbonTable
-   * @param originalConstraint
-   * @return
-   */
-  public static List<String> getPartitionFilters(CarbonTable carbonTable,
-      TupleDomain<HiveColumnHandle> originalConstraint) {
-    List<ColumnSchema> columnSchemas = carbonTable.getPartitionInfo().getColumnSchemaList();
-    List<String> filter = new ArrayList<>();
-    for (HiveColumnHandle columnHandle : originalConstraint.getDomains().get().keySet()) {
-      List<ColumnSchema> partitionedColumnSchema = columnSchemas.stream().filter(
-          columnSchema -> columnHandle.getName()
-              .equals(columnSchema.getColumnName())).collect(toList());
-      if (partitionedColumnSchema.size() != 0) {
-        filter.addAll(createPartitionFilters(originalConstraint, columnHandle));
-      }
-    }
-    return filter;
-  }
-
-  /** Returns list of partition key and values using domain constraints
-   * @param originalConstraint
-   * @param columnHandle
-   */
-  private static List<String> createPartitionFilters(
-      TupleDomain<HiveColumnHandle> originalConstraint, HiveColumnHandle columnHandle) {
-    List<String> filter = new ArrayList<>();
-    if (!originalConstraint.getDomains().isPresent()) {
-      return filter;
-    }
-    Domain domain = originalConstraint.getDomains().get().get(columnHandle);
-    if (domain != null && domain.isNullableSingleValue()) {
-      Object value = domain.getNullableSingleValue();
-      Type type = domain.getType();
-      if (value == null) {
-        filter.add(columnHandle.getName() + "=" + HIVE_DEFAULT_DYNAMIC_PARTITION);
-      } else if (columnHandle.getHiveType().getTypeInfo() instanceof DecimalTypeInfo) {
-        int scale = ((DecimalTypeInfo) columnHandle.getHiveType().getTypeInfo()).getScale();
-        if (value instanceof Long) {
-          //create decimal value from Long
-          BigDecimal decimalValue = new BigDecimal(new BigInteger(String.valueOf(value)), scale);
-          filter.add(columnHandle.getName() + "=" + decimalValue.toString());
-        } else if (value instanceof Slice) {
-          //create decimal value from Slice
-          BigDecimal decimalValue =
-              new BigDecimal(Decimals.decodeUnscaledValue((Slice) value), scale);
-          filter.add(columnHandle.getName() + "=" + decimalValue.toString());
-        }
-      } else if (value instanceof Slice) {
-        filter.add(columnHandle.getName() + "=" + ((Slice) value).toStringUtf8());
-      } else if (value instanceof Long && columnHandle.getHiveType()
-          .equals(HiveType.HIVE_DATE)) {
-        Calendar c = Calendar.getInstance();
-        c.setTime(new java.sql.Date(0));
-        c.add(Calendar.DAY_OF_YEAR, ((Long) value).intValue());
-        java.sql.Date date = new java.sql.Date(c.getTime().getTime());
-        filter.add(columnHandle.getName() + "=" + date.toString());
-      } else if (value instanceof Long && columnHandle.getHiveType()
-          .equals(HiveType.HIVE_TIMESTAMP)) {
-        String timeStamp = new Timestamp((Long) value).toString();
-        filter.add(columnHandle.getName() + "=" + timeStamp
-            .substring(0, timeStamp.indexOf('.')));
-      } else if ((value instanceof Boolean) || (value instanceof Double)
-          || (value instanceof Long)) {
-        filter.add(columnHandle.getName() + "=" + value.toString());
-      } else {
-        throw new PrestoException(NOT_SUPPORTED,
-            format("Unsupported partition key type: %s", type.getDisplayName()));
-      }
-    }
-    return filter;
-  }
-
-  /**
    * Convert presto-TupleDomain predication into Carbon scan express condition
    *
    * @param originalConstraint presto-TupleDomain
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 45c82f3..5a6bcc5 100755
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -309,29 +309,6 @@ public class CarbonTableReader {
     return multiBlockSplitList;
   }
 
-  /**
-   * Returns list of partition specs to query based on the domain constraints
-   *
-   * @param constraints presto filter
-   * @param carbonTable carbon table
-   * @throws IOException
-   */
-  private List<PartitionSpec> findRequiredPartitions(TupleDomain<HiveColumnHandle> constraints,
-      CarbonTable carbonTable, LoadMetadataDetails[] loadMetadataDetails) throws IOException {
-    Set<PartitionSpec> partitionSpecs = new HashSet<>();
-    for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
-      SegmentFileStore segmentFileStore = null;
-      segmentFileStore =
-          new SegmentFileStore(carbonTable.getTablePath(), loadMetadataDetail.getSegmentFile());
-      partitionSpecs.addAll(segmentFileStore.getPartitionSpecs());
-    }
-    List<String> partitionValuesFromExpression =
-        PrestoFilterUtil.getPartitionFilters(carbonTable, constraints);
-    return partitionSpecs.stream().filter(partitionSpec -> CollectionUtils
-        .isSubCollection(partitionValuesFromExpression, partitionSpec.getPartitions()))
-        .collect(Collectors.toList());
-  }
-
   private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
       AbsoluteTableIdentifier identifier, IndexFilter indexFilter,
       List<PartitionSpec> filteredPartitions) {
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestUsingSparkStore.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestUsingSparkStore.scala
new file mode 100644
index 0000000..345a855
--- /dev/null
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestUsingSparkStore.scala
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.integrationtest
+
+import java.io.{File}
+import java.util
+
+import org.apache.commons.io.FileUtils
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.presto.server.{PrestoServer}
+
+class PrestoTestUsingSparkStore
+  extends FunSuiteLike with BeforeAndAfterAll with BeforeAndAfterEach {
+
+  private val logger = LogServiceFactory
+    .getLogService(classOf[PrestoTestNonTransactionalTableFiles].getCanonicalName)
+
+  private val rootPath = new File(this.getClass.getResource("/").getPath
+                                  + "../../../..").getCanonicalPath
+  private val storePath = s"$rootPath/integration/presto/target/store"
+  private val writerPath = storePath + "/presto_spark_db/files"
+  private val sparkStorePath = s"$rootPath/integration/spark/target/spark_store"
+  private val prestoServer = new PrestoServer
+
+  override def beforeAll: Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+      "Presto")
+    val map = new util.HashMap[String, String]()
+    map.put("hive.metastore", "file")
+    map.put("hive.metastore.catalog.dir", s"file://$storePath")
+    prestoServer.startServer("presto_spark_db", map)
+    prestoServer.execute("drop schema if exists presto_spark_db")
+    prestoServer.execute("create schema presto_spark_db")
+  }
+
+  override def afterAll(): Unit = {
+    prestoServer.stopServer()
+    CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(storePath))
+     CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile
+     (s"$sparkStorePath"))
+  }
+
+  def copyStoreContents(tableName: String): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    import java.io.IOException
+    val source = s"$sparkStorePath/$tableName/"
+    val srcDir = new File(source)
+
+    val destination = s"$storePath/presto_spark_db" +
+                      s"/$tableName/"
+    val destDir = new File(destination)
+    try {
+      // Move spark store to presto store path
+      FileUtils.copyDirectory(srcDir, destDir)
+    }
+    catch {
+      case e: IOException =>
+        throw e
+    }
+  }
+
+  test("Test update operations without local dictionary") {
+    prestoServer.execute("drop table if exists presto_spark_db.update_table")
+    prestoServer.execute("drop table if exists presto_spark_db.actual_update_table")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.update_table(smallintColumn smallint, intColumn int, " +
+        "bigintColumn bigint, doubleColumn double, decimalColumn decimal(10,3), " +
+        "timestampColumn timestamp, dateColumn date, " +
+        "stringColumn varchar, booleanColumn boolean ) with(format='CARBON') ")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.actual_update_table(smallintColumn smallint, intColumn int," +
+        " " +
+        "bigintColumn bigint, doubleColumn double, decimalColumn decimal(10,3), " +
+        "timestampColumn timestamp, dateColumn date, " +
+        "stringColumn varchar, booleanColumn boolean ) with(format='CARBON') ")
+    copyStoreContents("update_table")
+    copyStoreContents("actual_update_table")
+    assert(prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.update_table").equals(prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.actual_update_table")))
+  }
+
+  test("Test delete operations") {
+    prestoServer.execute("drop table if exists presto_spark_db.iud_table")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.iud_table(smallintColumn smallint, intColumn int, " +
+        "bigintColumn bigint, doubleColumn double, decimalColumn decimal(10,3), " +
+        "timestampColumn timestamp, dateColumn date, " +
+        "stringColumn varchar, booleanColumn boolean ) with(format='CARBON') ")
+    copyStoreContents("iud_table")
+    val result: List[Map[String, Any]] = prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.iud_table")
+    assert(result.size == 2)
+  }
+
+  test("Test major compaction") {
+    prestoServer.execute("drop table if exists presto_spark_db.testmajor")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.testmajor(country varchar, arrayInt array(int) ) with" +
+        "(format='CARBON') ")
+    copyStoreContents("testmajor")
+    val result: List[Map[String, Any]] = prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.testmajor")
+    assert(result.size == 4)
+    for (i <- 0 to 3) {
+      val value = result(i)("country")
+      assert(value.equals("India") || value.equals("Egypt") || value.equals("Iceland") ||
+             value.equals("China"))
+    }
+  }
+
+  test("Test minor compaction") {
+    prestoServer.execute("drop table if exists presto_spark_db.minor_compaction")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.minor_compaction(empno int, empname varchar, arrayInt array" +
+        "(int)) with(format='CARBON') ")
+    copyStoreContents("minor_compaction")
+    val result: List[Map[String, Any]] = prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.minor_compaction")
+    assert(result.size == 2)
+    for (i <- 0 to 1) {
+      val value = result(i)("empno")
+      assert(value.equals(11) || value.equals(12))
+    }
+  }
+
+  test("Test custom compaction") {
+    prestoServer.execute("drop table if exists presto_spark_db.custom_compaction_table")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.custom_compaction_table(ID Int, date Date, country varchar," +
+        " name varchar, phonetype varchar, serialname varchar, salary Int, floatField real) with" +
+        "(format='CARBON') ")
+    copyStoreContents("custom_compaction_table")
+    val result: List[Map[String, Any]] = prestoServer
+      .executeQuery("SELECT ID FROM presto_spark_db.custom_compaction_table where ID = 5")
+    assert(result.size == 4)
+  }
+
+  test("test with add segment") {
+    prestoServer.execute("drop table if exists presto_spark_db.segment_table")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.segment_table(a varchar, b int, arrayInt array<int>) with" +
+        "(format='CARBON') ")
+    copyStoreContents("segment_table")
+    val result: List[Map[String, Any]] = prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.segment_table")
+    assert(result.size == 3)
+    for (i <- 0 to 2) {
+      val value = result(i)("b")
+      assert(value.equals(1) || value.equals(2) || value.equals(3))
+    }
+  }
+
+  test("test with delete segment") {
+    prestoServer.execute("drop table if exists presto_spark_db.delete_segment_table")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.delete_segment_table(a varchar, b int, arrayInt array<int>)" +
+        " with(format='CARBON') ")
+    copyStoreContents("delete_segment_table")
+    val result = prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.delete_segment_table")
+    assert(result.size == 2)
+    for (i <- 0 to 1) {
+      val value = result(i)("b")
+      assert(value.equals(1) || value.equals(3))
+    }
+  }
+
+  test("test inverted index with update operation") {
+    prestoServer.execute("drop table if exists presto_spark_db.inv_table")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.inv_table(name varchar, c_code int, arrayInt array<int>) " +
+        "with(format='CARBON') ")
+    copyStoreContents("inv_table")
+    val result: List[Map[String, Any]] = prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.inv_table")
+    assert(result.size == 4)
+    for (i <- 0 to 3) {
+      val id = result(i)("c_code")
+      val name = result(i)("name")
+      assert((id.equals(1) && name.equals("Alex")) || (id.equals(2) && name.equals("John")) ||
+             (id.equals(3) && name.equals("Neil")) ||
+             (id.equals(4) && name.equals("Neil")))
+    }
+
+  }
+
+  test("Test partition columns") {
+    prestoServer.execute("drop table if exists presto_spark_db.partition_table")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.partition_table(name varchar, id int, department varchar) " +
+        "with (partitioned_by = ARRAY['department'], format='CARBON') ")
+    copyStoreContents("partition_table")
+    val result: List[Map[String, Any]] = prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.partition_table")
+    assert(result.size == 4)
+    for (i <- 0 to 3) {
+      val id = result(i)("id")
+      val name = result(i)("name")
+      val department = result(i)("department")
+      assert(id.equals(1) && name.equals("John") && department.equals("dev") ||
+             (id.equals(2) && name.equals("Neil")) && department.equals("test") ||
+             (id.equals(4) && name.equals("Alex")) && department.equals("Carbon-dev"))
+    }
+  }
+
+  test("Test bloom index") {
+    prestoServer.execute("drop table if exists presto_spark_db.carbon_normal")
+    prestoServer.execute("drop table if exists presto_spark_db.carbon_bloom")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.carbon_normal(id INT, name varchar, city varchar, age INT, " +
+        "s1 " +
+        "varchar, s2 varchar, s3 varchar, s4 varchar, s5 varchar, s6 varchar, s7 varchar, s8 " +
+        "varchar) with" +
+        "(format='CARBON') ")
+    prestoServer
+      .execute(
+        "create table presto_spark_db.carbon_bloom(id INT, name varchar, city varchar, age INT, " +
+        "s1 " +
+        "varchar, s2 varchar, s3 varchar, s4 varchar, s5 varchar, s6 varchar, s7 varchar, s8 " +
+        "varchar) with" +
+        "(format='CARBON') ")
+    copyStoreContents("carbon_normal")
+    copyStoreContents("carbon_bloom")
+
+    assert(prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.carbon_normal where id = 1").equals(
+      prestoServer
+        .executeQuery("SELECT * FROM presto_spark_db.carbon_bloom where id = 1")))
+
+    assert(prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.carbon_normal  where id in (999)").equals(
+      prestoServer
+        .executeQuery("SELECT * FROM presto_spark_db.carbon_bloom  where id in (999)")))
+
+    assert(prestoServer
+      .executeQuery(
+        "SELECT * FROM presto_spark_db.carbon_normal where id in (999) and city in ('city_999')")
+      .equals(
+        prestoServer
+          .executeQuery(
+            "SELECT * FROM presto_spark_db.carbon_bloom where id in (999) and city in " +
+            "('city_999')")))
+
+    assert(prestoServer
+      .executeQuery(
+        "SELECT min(id), max(id), min(name), max(name), min(city), max(city) FROM presto_spark_db" +
+        ".carbon_normal where id = 1")
+      .equals(
+        prestoServer
+          .executeQuery(
+            "SELECT min(id), max(id), min(name), max(name), min(city), max(city) FROM " +
+            "presto_spark_db" +
+            ".carbon_bloom where id = 1")))
+
+  }
+
+  test("Test range columns") {
+    prestoServer
+      .execute(
+        "create table presto_spark_db.range_table(name varchar, id int) with" +
+        "(format='CARBON') ")
+    copyStoreContents("range_table")
+    val result: List[Map[String, Any]] = prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.range_table")
+    assert(result.size == 4)
+    for (i <- 0 to 3) {
+      val id = result(i)("id")
+      val name = result(i)("name")
+      assert(id.equals(1000) && name.equals("John") || (id.equals(1001) && name.equals("Alex")) ||
+             (id.equals(5000) && name.equals("Neil")) || (id.equals(4999) && name.equals("Jack")))
+    }
+  }
+
+  test("Test streaming ") {
+    prestoServer
+      .execute(
+        "create table presto_spark_db.streaming_table(c1 varchar, c2 int, c3 varchar, c5 varchar)" +
+        " " +
+        "with" +
+        "(format='CARBON') ")
+    copyStoreContents("streaming_table")
+    val result: List[Map[String, Any]] = prestoServer
+      .executeQuery("SELECT * FROM presto_spark_db.streaming_table")
+    assert(result.size == 5)
+    for (i <- 0 to 4) {
+      val c2 = result(i)("c2")
+      val c5 = result(i)("c5")
+      assert(c2.equals(1) && c5.equals("aaa") || (c2.equals(2) && c5.equals("bbb")) ||
+             (c2.equals(3) && c5.equals("ccc")) ||
+             (c2.equals(4) && c5.equals("ddd") || (c2.equals(3) && c5.equals("ccc")) ||
+              (c2.equals(5) && c5.equals("eee"))))
+    }
+
+  }
+
+}
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkStoreCreatorForPresto.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkStoreCreatorForPresto.scala
new file mode 100644
index 0000000..112218f
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkStoreCreatorForPresto.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import java.io.{File, PrintWriter}
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.sdk.file.{CarbonSchemaReader, CarbonWriterBuilder}
+
+class SparkStoreCreatorForPresto extends QueryTest with BeforeAndAfterAll{
+
+  private val timestampFormat = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+  private val dateFormat = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT)
+  private val rootPath = new File(this.getClass.getResource("/").getPath
+                                  + "../../../..").getCanonicalPath
+  private val sparkStorePath = s"$rootPath/integration/spark/target/spark_store"
+
+  val storePath = storeLocation
+
+  override def beforeAll: Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    sql("drop database if exists presto_spark_db cascade")
+    sql("create database presto_spark_db")
+    sql("use presto_spark_db")
+     CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile
+        (s"$sparkStorePath"))
+  }
+
+  override def afterAll: Unit = {
+    if (null != dateFormat) {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, dateFormat)
+    }
+    if(null != timestampFormat) {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestampFormat)
+    }
+    val source = s"$rootPath/integration/spark/target/warehouse/presto_spark_db.db/"
+    val srcDir = new File(source)
+
+    // Presto will later use this store path to query
+    val destination = s"$rootPath/integration/spark/target/spark_store/"
+    val destDir = new File(destination)
+    FileUtils.copyDirectory(srcDir, destDir)
+    FileUtils.deleteDirectory(srcDir)
+    sql("drop table if exists update_table")
+    sql("drop table if exists actual_update_table")
+    sql("drop table if exists iud_table")
+    sql("drop table if exists testmajor")
+    sql("drop table if exists minor_compaction")
+    sql("drop table if exists custom_compaction_table")
+    sql("drop table if exists segment_table")
+    sql("drop table if exists delete_segment_table")
+    sql("drop table if exists inv_table")
+    sql("drop table if exists partition_table")
+    sql("drop table if exists carbon_normal")
+    sql("drop table if exists carbon_bloom")
+    sql("drop table if exists range_table")
+    sql("drop table if exists streaming_table")
+    sql("use default ")
+  }
+
+  test("Test update operations without local dictionary") {
+    sql("drop table if exists update_table")
+    sql("drop table if exists actual_update_table")
+    sql(
+      "CREATE TABLE IF NOT EXISTS update_table (smallintColumn short, intColumn " +
+      "int, bigintColumn bigint, doubleColumn " +
+      "double, decimalColumn decimal(10,3)," +
+      "timestampColumn timestamp, dateColumn date, " +
+      "stringColumn string, booleanColumn boolean) STORED AS carbondata tblproperties" +
+      "('local_dictionary_enable'='false')"
+    )
+    sql(
+      "insert into update_table values(1, 2, 3333333, 4.1,5.1,'2017-01-01 12:00:00.0', " +
+      "'2017-09-08','abc',true)")
+    sql(
+      "CREATE TABLE IF NOT EXISTS actual_update_table (smallintColumn short, intColumn " +
+      "int, bigintColumn bigint, doubleColumn " +
+      "double, decimalColumn decimal(10,3)," +
+      "timestampColumn timestamp, dateColumn date, " +
+      "stringColumn string, booleanColumn boolean) STORED AS carbondata tblproperties" +
+      "('local_dictionary_enable'='false')"
+    )
+    sql(
+      "insert into actual_update_table values(11, 22, 39999, 4.4,5.5,'2020-01-11 12:00:45.0', " +
+      "'2020-01-11','defgh',false)")
+
+    sql("update update_table set (smallintColumn) = (11)")
+    sql("update update_table set (intColumn) = (22)")
+    sql("update update_table set (bigintColumn) = (39999)")
+    sql("update update_table set (doubleColumn) = (4.4)")
+    sql("update update_table set (decimalColumn) = (5.5)")
+    sql("update update_table set (timestampColumn) = ('2020-01-11 12:00:45.0')")
+    sql("update update_table set (dateColumn) = ('2020-01-11')")
+    sql("update update_table set (stringColumn) = ('defgh')")
+    sql("update update_table set (booleanColumn) = (false)")
+  }
+
+  test("Test delete operations") {
+    sql("drop table if exists iud_table")
+    sql(
+      "CREATE TABLE IF NOT EXISTS iud_table (smallintColumn short, intColumn " +
+      "int, bigintColumn bigint, doubleColumn " +
+      "double, decimalColumn decimal(10,3), " +
+      "timestampColumn timestamp, dateColumn date, " +
+      "stringColumn string, booleanColumn boolean) STORED AS carbondata"
+    )
+    sql(
+      "insert into iud_table values(1, 2, 3333333, 4.1,5.1,'2017-01-01 12:00:00.0', '2017-09-08'," +
+      "'row1',true)")
+    sql(
+      "insert into iud_table values(32, 33, 3555555, 4.1,5.1,'2017-01-01 12:00:00.0', " +
+      "'2017-05-05','row2',false)")
+    sql(
+      "insert into iud_table values(42, 43, 4555555, 4.15,5.15,'2017-01-01 12:00:00.0', " +
+      "'2017-05-05','row3',true)")
+    sql("DELETE FROM iud_table WHERE smallintColumn = 32").show()
+  }
+
+  test("Test major compaction") {
+    sql("drop table if exists testmajor")
+    sql(
+      "CREATE TABLE IF NOT EXISTS testmajor (country String, arrayInt array<int>) STORED AS " +
+      "carbondata"
+    )
+    sql("insert into testmajor select 'India', array(1,2,3) ")
+    sql("insert into testmajor select 'China', array(1,2) ")
+    // compaction will happen here.
+    sql("alter table testmajor compact 'major'")
+    sql("insert into testmajor select 'Iceland', array(4,5,6) ")
+    sql("insert into testmajor select 'Egypt', array(4,5) ")
+
+    sql("alter table testmajor compact 'major'")
+  }
+
+  test("Test minor compaction") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2")
+      sql("DROP TABLE IF EXISTS minor_compaction")
+      sql(
+        "CREATE table minor_compaction (empno int, empname String, arrayInt array<int>) STORED " +
+        "AS carbondata")
+      sql("insert into minor_compaction select 11,'arvind',array(1,2,3)")
+      sql("insert into minor_compaction select 12,'krithi',array(1,2)")
+      // perform compaction operation
+      sql("alter table minor_compaction compact 'minor'")
+    } finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+          CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+    }
+  }
+
+  test("Test custom compaction") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
+
+    sql("DROP TABLE IF EXISTS custom_compaction_table")
+
+    sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS custom_compaction_table(
+         |   ID Int,
+         |   date Date,
+         |   country String,
+         |   name String,
+         |   phonetype String,
+         |   serialname String,
+         |   salary Int,
+         |   floatField float
+         | )
+         | STORED AS carbondata
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val path = s"$rootPath/examples/spark/src/main/resources/dataSample.csv"
+
+    // load 4 segments
+    // scalastyle:off
+    (1 to 4).foreach(_ => sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE custom_compaction_table
+         | OPTIONS('HEADER'='true')
+       """.stripMargin))
+    // scalastyle:on
+
+    sql("SHOW SEGMENTS FOR TABLE custom_compaction_table").show()
+
+    sql("ALTER TABLE custom_compaction_table COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2)")
+
+    sql("SHOW SEGMENTS FOR TABLE custom_compaction_table").show()
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+
+  }
+
+  test("test with add segment") {
+    val newSegmentPath: String = storePath + "presto_spark_db/newsegment/"
+    FileFactory.getCarbonFile(newSegmentPath).delete()
+    sql("drop table if exists segment_table")
+    sql("create table segment_table(a string, b int, arrayInt array<int>) stored as carbondata")
+    sql("insert into segment_table select 'k', 1, array(1,2,3)")
+    sql("insert into segment_table select 'l', 2, array(1,2)")
+    val carbonTable = CarbonEnv.getCarbonTable(None, "segment_table")(sqlContext.sparkSession)
+    val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
+    val schema = CarbonSchemaReader.readSchema(segmentPath).asOriginOrder()
+    val writer = new CarbonWriterBuilder()
+      .outputPath(newSegmentPath)
+      .withCsvInput(schema)
+      .writtenBy("SparkStoreCreatorForPresto")
+      .build()
+    writer.write(Array[String]("m", "3", "1" + "\001" + "5"))
+    writer.close()
+    sql(s"alter table segment_table add segment options('path'='${ newSegmentPath }', " +
+        s"'format'='carbon')")
+  }
+
+  test("test with delete segment") {
+    sql("drop table if exists delete_segment_table")
+    sql(
+      "create table delete_segment_table(a string, b int, arrayInt array<int>) stored as " +
+      "carbondata")
+    sql("insert into delete_segment_table select 'k',1,array(1,2,3)")
+    sql("insert into delete_segment_table select 'l',2,array(1,2)")
+    sql("insert into delete_segment_table select 'm',3,array(1)")
+    sql("delete from table delete_segment_table where segment.id in (1)")
+  }
+
+  test("Test inverted index with update operation") {
+    sql("drop table IF EXISTS inv_table")
+    sql(
+      "create table inv_table(name string, c_code int, arrayInt array<int>) STORED AS carbondata " +
+      "tblproperties('sort_columns'='name', 'inverted_index'='name','sort_scope'='local_sort')")
+    sql("insert into table inv_table select 'John',1,array(1)")
+    sql("insert into table inv_table select 'John',2,array(1,2)")
+    sql("insert into table inv_table select 'Neil',3,array(1,2,3)")
+    sql("insert into table inv_table select 'Neil',4,array(1,2,3,4)")
+
+    sql("update inv_table set (name) = ('Alex') where c_code = 1")
+  }
+
+  test("Test partition columns") {
+    sql("drop table IF EXISTS partition_table")
+    sql(
+      "create table partition_table(name string, id int) PARTITIONED by (department string) " +
+      "stored " +
+      "as carbondata")
+    sql("insert into table partition_table select 'John','1','dev'")
+    sql("insert into table partition_table select 'John','4','dev'")
+    sql("insert into table partition_table select 'Neil','2','test'")
+    sql("insert into table partition_table select 'Neil','2','test'")
+
+    // update
+    sql("update partition_table set (name) = ('Alex') where id = 4")
+    sql("update partition_table set (department) = ('Carbon-dev') where id = 4")
+  }
+
+  test("test create bloom index on table with existing data") {
+    val bigFile = s"$resourcesPath/bloom_index_input_big.csv"
+
+    val normalTable = "carbon_normal"
+    val bloomSampleTable = "carbon_bloom"
+    val indexName = "bloom_dm"
+    createFile(bigFile, line = 50000)
+
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomSampleTable")
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED AS carbondata TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomSampleTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED AS carbondata TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE INDEX $indexName
+         | ON $bloomSampleTable (city, id)
+         | AS 'bloomfilter'
+         | properties('BLOOM_SIZE'='640000')
+      """.stripMargin)
+
+    // load two segments
+    (1 to 2).foreach { i =>
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable
+           | OPTIONS('header'='false')
+         """.stripMargin)
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomSampleTable
+           | OPTIONS('header'='false')
+         """.stripMargin)
+    }
+
+  }
+
+  test("Test range columns") {
+    sql("drop table IF EXISTS range_table")
+    sql(
+      "create table range_table(name string, id int) stored " +
+      "as carbondata TBLPROPERTIES('RANGE_COLUMN' = 'name')")
+    sql("insert into table range_table select 'John','1000'")
+    sql("insert into table range_table select 'Alex','1001'")
+    sql("insert into table range_table select 'Neil','5000'")
+    sql("insert into table range_table select 'Jack','4999'")
+  }
+
+  test("Test streaming") {
+    sql("drop table IF EXISTS streaming_table")
+    sql(
+      """
+        | CREATE TABLE streaming_table(
+        |    c1 string,
+        |    c2 int,
+        |    c3 string,
+        |    c5 string
+        | ) STORED AS carbondata
+        | TBLPROPERTIES ('streaming' = 'true')
+      """.stripMargin)
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO TABLE streaming_table""")
+  }
+
+  private def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
+    if (!new File(fileName).exists()) {
+      val write = new PrintWriter(new File(fileName))
+      for (i <- start until (start + line)) {
+        // scalastyle:off println
+        write.println(
+          s"$i,n$i,city_$i,${ Random.nextInt(80) }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }")
+        // scalastyle:on println
+      }
+      write.close()
+    }
+  }
+
+}


Mime
View raw message