carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From indhumuthumurug...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4078] Add external segment and query with index server fails
Date Thu, 17 Dec 2020 15:06:06 GMT
This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fc8225  [CARBONDATA-4078] Add external segment and query with index server fails
8fc8225 is described below

commit 8fc822559e80db3db38953bafe72e0ff8a7286a0
Author: ShreelekhyaG <shreelu_gampa@yahoo.com>
AuthorDate: Mon Dec 7 19:08:01 2020 +0530

    [CARBONDATA-4078] Add external segment and query with index server fails
    
    Why is this PR needed?
    Query after adding an external segment to carbon table tries to getSplits from Index server
    and throws an exception as it cannot read the external(orc/parquet) file format.
    When the fallback mode is disabled, it throws an exception and fails.
    
    What changes were proposed in this PR?
    To avoid the exception, filtered only valid carbon segments to cache in the index server.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No, tested in cluster.
    
    This closes #4047
---
 .../main/java/org/apache/carbondata/core/index/IndexInputFormat.java | 4 ++++
 .../src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala | 5 +++++
 .../scala/org/apache/spark/sql/listeners/PrePrimingListener.scala    | 4 +++-
 3 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
index a6e02527..dbb5b4f 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
@@ -418,6 +418,10 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
     return validSegments;
   }
 
+  public void setValidSegments(List<Segment> validSegments) {
+    this.validSegments = validSegments;
+  }
+
   public void createIndexChooser() throws IOException {
     if (null != filterResolverIntf) {
       this.indexChooser = new IndexChooser(table);
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
index a81c202..ce344a6 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
@@ -47,6 +47,11 @@ class DistributedIndexJob extends AbstractIndexJob {
 
   override def execute(indexFormat: IndexInputFormat,
       configuration: Configuration): util.List[ExtendedBlocklet] = {
+    // get only carbon segments.
+    if (indexFormat.getValidSegments != null) {
+      indexFormat.setValidSegments(indexFormat.getValidSegments.asScala
+        .filter(segment => segment.isCarbonSegment).toList.asJava)
+    }
     if (LOGGER.isDebugEnabled) {
       val messageSize = SizeEstimator.estimate(indexFormat)
       LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/listeners/PrePrimingListener.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/listeners/PrePrimingListener.scala
index 492f244..e6cf40d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/listeners/PrePrimingListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/listeners/PrePrimingListener.scala
@@ -33,9 +33,11 @@ object PrePrimingEventListener extends OperationEventListener {
       operationContext: OperationContext): Unit = {
     val prePrimingEvent = event.asInstanceOf[IndexServerLoadEvent]
     val carbonTable = prePrimingEvent.carbonTable
+    // get only carbon segments.
+    val validSegments = prePrimingEvent.segment.filter(segment => segment.isCarbonSegment).asJava
     val indexInputFormat = new IndexInputFormat(carbonTable,
       null,
-      prePrimingEvent.segment.asJava,
+      validSegments,
       prePrimingEvent.invalidSegment.asJava,
       null,
       false,


Mime
View raw message