carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [40/50] [abbrv] carbondata git commit: Fix concurrent testcase random failure
Date Sun, 28 Jan 2018 06:46:09 GMT
Fix concurrent testcase random failure

Fix IUDConcurrentTest to run sql concurrently in correct order

This closes #1800


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6e77f2b9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6e77f2b9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6e77f2b9

Branch: refs/heads/fgdatamap
Commit: 6e77f2b9d96ed32898111c3fc7551a9dde3f8cf4
Parents: ff10bbb
Author: Jacky Li <jacky.likun@qq.com>
Authored: Mon Jan 15 09:49:38 2018 +0800
Committer: chenliang613 <chenliang613@huawei.com>
Committed: Thu Jan 18 10:06:30 2018 +0800

----------------------------------------------------------------------
 .../hadoop/ft/CarbonOutputMapperTest.java       | 115 ----------
 .../hadoop/ft/CarbonTableInputFormatTest.java   | 167 +++++++++++++-
 .../hadoop/ft/CarbonTableInputMapperTest.java   | 219 -------------------
 .../hadoop/ft/CarbonTableOutputFormatTest.java  | 120 ++++++++++
 .../hadoop/test/util/StoreCreator.java          |  18 +-
 .../spark/testsuite/iud/IUDConcurrentTest.scala | 114 ++++++++--
 .../store/writer/AbstractFactDataWriter.java    |   3 +-
 7 files changed, 388 insertions(+), 368 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
deleted file mode 100644
index 22a6e53..0000000
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.ft;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
-import org.apache.carbondata.hadoop.test.util.StoreCreator;
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Test;
-
-public class CarbonOutputMapperTest extends TestCase {
-
-  CarbonLoadModel carbonLoadModel;
-
-  // changed setUp to static init block to avoid un wanted multiple time store creation
-  static {
-    CarbonProperties.getInstance().
-        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
-  }
-
-
-  @Test public void testOutputFormat() throws Exception {
-    runJob("");
-    String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0");
-    File file = new File(segmentPath);
-    assert (file.exists());
-    File[] listFiles = file.listFiles(new FilenameFilter() {
-      @Override public boolean accept(File dir, String name) {
-        return name.endsWith(".carbondata") ||
-            name.endsWith(".carbonindex") ||
-            name.endsWith(".carbonindexmerge");
-      }
-    });
-
-    assert (listFiles.length == 2);
-  }
-
-
-  @Override public void tearDown() throws Exception {
-    super.tearDown();
-    CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true");
-  }
-
-  @Override public void setUp() throws Exception {
-    super.setUp();
-    CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false");
-    carbonLoadModel = StoreCreator.getCarbonLoadModel();
-  }
-
- public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable,
StringArrayWritable> {
-
-   @Override protected void map(NullWritable key, StringArrayWritable value, Context context)
-       throws IOException, InterruptedException {
-     context.write(key, value);
-   }
- }
-
-  private void runJob(String outPath) throws Exception {
-    Configuration configuration = new Configuration();
-    configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath());
-    Job job = Job.getInstance(configuration);
-    job.setJarByClass(CarbonOutputMapperTest.class);
-    job.setOutputKeyClass(NullWritable.class);
-    job.setOutputValueClass(StringArrayWritable.class);
-    job.setMapperClass(Map.class);
-    job.setNumReduceTasks(0);
-
-    FileInputFormat.addInputPath(job, new Path(carbonLoadModel.getFactFilePath()));
-    CarbonTableOutputFormat.setLoadModel(job.getConfiguration(), carbonLoadModel);
-    CarbonTableOutputFormat.setCarbonTable(job.getConfiguration(), carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable());
-    CSVInputFormat.setHeaderExtractionEnabled(job.getConfiguration(), true);
-    job.setInputFormatClass(CSVInputFormat.class);
-    job.setOutputFormatClass(CarbonTableOutputFormat.class);
-    CarbonUtil.deleteFoldersAndFiles(new File(carbonLoadModel.getTablePath() + "1"));
-    FileOutputFormat.setOutputPath(job, new Path(carbonLoadModel.getTablePath() + "1"));
-    job.getConfiguration().set("outpath", outPath);
-    job.getConfiguration().set("query.id", String.valueOf(System.nanoTime()));
-    job.waitForCompletion(true);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
index 1df8a1a..2f029ab 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
@@ -17,28 +17,41 @@
 
 package org.apache.carbondata.hadoop.ft;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 
 import junit.framework.TestCase;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 import org.apache.carbondata.hadoop.test.util.StoreCreator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -49,7 +62,11 @@ public class CarbonTableInputFormatTest {
   static {
     CarbonProperties.getInstance().
         addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
-    StoreCreator.createCarbonStore();
+    try {
+      StoreCreator.createCarbonStore();
+    } catch (Exception e) {
+      Assert.fail("create table failed: " + e.getMessage());
+    }
   }
 
   @Test public void testGetFilteredSplits() throws Exception {
@@ -99,4 +116,152 @@ public class CarbonTableInputFormatTest {
     Assert.assertTrue(splits != null && splits.size() == 1);
   }
 
+  @Test public void testInputFormatMapperReadAllRowsAndColumns() throws Exception {
+    try {
+      String outPath = "target/output";
+      CarbonProjection carbonProjection = new CarbonProjection();
+      carbonProjection.addColumn("ID");
+      carbonProjection.addColumn("date");
+      carbonProjection.addColumn("country");
+      carbonProjection.addColumn("name");
+      carbonProjection.addColumn("phonetype");
+      carbonProjection.addColumn("serialname");
+      carbonProjection.addColumn("salary");
+      runJob(outPath, carbonProjection, null);
+      Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath));
+      Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath));
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertTrue("failed", false);
+      throw e;
+    } finally {
+      StoreCreator.clearDataMaps();
+    }
+  }
+
+  @Test public void testInputFormatMapperReadAllRowsAndFewColumns() throws Exception {
+    try {
+      String outPath = "target/output2";
+      CarbonProjection carbonProjection = new CarbonProjection();
+      carbonProjection.addColumn("ID");
+      carbonProjection.addColumn("country");
+      carbonProjection.addColumn("salary");
+      runJob(outPath, carbonProjection, null);
+
+      Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath));
+      Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath));
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertTrue("failed", false);
+    } finally {
+      StoreCreator.clearDataMaps();
+    }
+  }
+
+  @Test public void testInputFormatMapperReadAllRowsAndFewColumnsWithFilter() throws Exception
{
+    try {
+      String outPath = "target/output3";
+      CarbonProjection carbonProjection = new CarbonProjection();
+      carbonProjection.addColumn("ID");
+      carbonProjection.addColumn("country");
+      carbonProjection.addColumn("salary");
+      Expression expression =
+          new EqualToExpression(new ColumnExpression("country", DataTypes.STRING),
+              new LiteralExpression("france", DataTypes.STRING));
+      runJob(outPath, carbonProjection, expression);
+      Assert.assertEquals("Count lines are not matching", 101, countTheLines(outPath));
+      Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath));
+    } catch (Exception e) {
+      Assert.assertTrue("failed", false);
+    } finally {
+      StoreCreator.clearDataMaps();
+    }
+  }
+
+
+  private int countTheLines(String outPath) throws Exception {
+    File file = new File(outPath);
+    if (file.exists()) {
+      BufferedReader reader = new BufferedReader(new FileReader(file));
+      int i = 0;
+      while (reader.readLine() != null) {
+        i++;
+      }
+      reader.close();
+      return i;
+    }
+    return 0;
+  }
+
+  private int countTheColumns(String outPath) throws Exception {
+    File file = new File(outPath);
+    if (file.exists()) {
+      BufferedReader reader = new BufferedReader(new FileReader(file));
+      String[] split = reader.readLine().split(",");
+      reader.close();
+      return split.length;
+    }
+    return 0;
+  }
+
+  public static class Map extends Mapper<Void, Object[], Text, Text> {
+
+    private BufferedWriter fileWriter;
+
+    public void setup(Context context) throws IOException, InterruptedException {
+      String outPath = context.getConfiguration().get("outpath");
+      File outFile = new File(outPath);
+      try {
+        fileWriter = new BufferedWriter(new FileWriter(outFile));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public void map(Void key, Object[] value, Context context) throws IOException {
+      StringBuilder builder = new StringBuilder();
+      for (int i = 0; i < value.length; i++) {
+        builder.append(value[i]).append(",");
+      }
+      fileWriter.write(builder.toString().substring(0, builder.toString().length() - 1));
+      fileWriter.newLine();
+    }
+
+    @Override public void cleanup(Context context) throws IOException, InterruptedException
{
+      super.cleanup(context);
+      fileWriter.close();
+      context.write(new Text(), new Text());
+    }
+  }
+
+  private void runJob(String outPath, CarbonProjection projection, Expression filter)
+      throws Exception {
+
+    Configuration configuration = new Configuration();
+    configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath());
+    Job job = Job.getInstance(configuration);
+    job.setJarByClass(CarbonTableInputFormatTest.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setMapperClass(Map.class);
+    job.setInputFormatClass(CarbonTableInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    AbsoluteTableIdentifier abs = StoreCreator.getAbsoluteTableIdentifier();
+    if (projection != null) {
+      CarbonTableInputFormat.setColumnProjection(job.getConfiguration(), projection);
+    }
+    if (filter != null) {
+      CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(), filter);
+    }
+    CarbonTableInputFormat.setDatabaseName(job.getConfiguration(),
+        abs.getCarbonTableIdentifier().getDatabaseName());
+    CarbonTableInputFormat.setTableName(job.getConfiguration(),
+        abs.getCarbonTableIdentifier().getTableName());
+    FileInputFormat.addInputPath(job, new Path(abs.getTablePath()));
+    CarbonUtil.deleteFoldersAndFiles(new File(outPath + "1"));
+    FileOutputFormat.setOutputPath(job, new Path(outPath + "1"));
+    job.getConfiguration().set("outpath", outPath);
+    job.getConfiguration().set("query.id", String.valueOf(System.nanoTime()));
+    boolean status = job.waitForCompletion(true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputMapperTest.java
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputMapperTest.java
deleted file mode 100644
index bb37959..0000000
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputMapperTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.ft;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.core.scan.expression.ColumnExpression;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.expression.LiteralExpression;
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.hadoop.test.util.StoreCreator;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CarbonTableInputMapperTest extends TestCase {
-
-  // changed setUp to static init block to avoid un wanted multiple time store creation
-  static {
-    CarbonProperties.getInstance().
-        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
-    StoreCreator.createCarbonStore();
-  }
-
-  @Test public void testInputFormatMapperReadAllRowsAndColumns() throws Exception {
-    try {
-      String outPath = "target/output";
-      CarbonProjection carbonProjection = new CarbonProjection();
-      carbonProjection.addColumn("ID");
-      carbonProjection.addColumn("date");
-      carbonProjection.addColumn("country");
-      carbonProjection.addColumn("name");
-      carbonProjection.addColumn("phonetype");
-      carbonProjection.addColumn("serialname");
-      carbonProjection.addColumn("salary");
-      runJob(outPath, carbonProjection, null);
-      Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath));
-      Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath));
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.assertTrue("failed", false);
-      throw e;
-    } finally {
-      StoreCreator.clearDataMaps();
-    }
-  }
-
-  @Test public void testInputFormatMapperReadAllRowsAndFewColumns() throws Exception {
-    try {
-      String outPath = "target/output2";
-      CarbonProjection carbonProjection = new CarbonProjection();
-      carbonProjection.addColumn("ID");
-      carbonProjection.addColumn("country");
-      carbonProjection.addColumn("salary");
-      runJob(outPath, carbonProjection, null);
-
-      Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath));
-      Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath));
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.assertTrue("failed", false);
-    } finally {
-      StoreCreator.clearDataMaps();
-    }
-  }
-
-  @Test public void testInputFormatMapperReadAllRowsAndFewColumnsWithFilter() throws Exception
{
-    try {
-      String outPath = "target/output3";
-      CarbonProjection carbonProjection = new CarbonProjection();
-      carbonProjection.addColumn("ID");
-      carbonProjection.addColumn("country");
-      carbonProjection.addColumn("salary");
-      Expression expression =
-          new EqualToExpression(new ColumnExpression("country", DataTypes.STRING),
-              new LiteralExpression("france", DataTypes.STRING));
-      runJob(outPath, carbonProjection, expression);
-      Assert.assertEquals("Count lines are not matching", 101, countTheLines(outPath));
-      Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath));
-    } catch (Exception e) {
-      Assert.assertTrue("failed", false);
-    } finally {
-      StoreCreator.clearDataMaps();
-    }
-  }
-
-  private int countTheLines(String outPath) throws Exception {
-    File file = new File(outPath);
-    if (file.exists()) {
-      BufferedReader reader = new BufferedReader(new FileReader(file));
-      int i = 0;
-      while (reader.readLine() != null) {
-        i++;
-      }
-      reader.close();
-      return i;
-    }
-    return 0;
-  }
-
-  private int countTheColumns(String outPath) throws Exception {
-    File file = new File(outPath);
-    if (file.exists()) {
-      BufferedReader reader = new BufferedReader(new FileReader(file));
-      String[] split = reader.readLine().split(",");
-      reader.close();
-      return split.length;
-    }
-    return 0;
-  }
-
-  @Override public void tearDown() throws Exception {
-    super.tearDown();
-    CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true");
-  }
-
- public static class Map extends Mapper<Void, Object[], Text, Text> {
-
-    private BufferedWriter fileWriter;
-
-    public void setup(Context context) throws IOException, InterruptedException {
-      String outPath = context.getConfiguration().get("outpath");
-      File outFile = new File(outPath);
-      try {
-        fileWriter = new BufferedWriter(new FileWriter(outFile));
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public void map(Void key, Object[] value, Context context) throws IOException {
-      StringBuilder builder = new StringBuilder();
-      for (int i = 0; i < value.length; i++) {
-        builder.append(value[i]).append(",");
-      }
-      fileWriter.write(builder.toString().substring(0, builder.toString().length() - 1));
-      fileWriter.newLine();
-    }
-
-    @Override public void cleanup(Context context) throws IOException, InterruptedException
{
-      super.cleanup(context);
-      fileWriter.close();
-      context.write(new Text(), new Text());
-    }
-  }
-
-  private void runJob(String outPath, CarbonProjection projection, Expression filter)
-      throws Exception {
-
-    Configuration configuration = new Configuration();
-    configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath());
-    Job job = Job.getInstance(configuration);
-    job.setJarByClass(CarbonTableInputMapperTest.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(IntWritable.class);
-    job.setMapperClass(Map.class);
-    job.setInputFormatClass(CarbonTableInputFormat.class);
-    job.setOutputFormatClass(TextOutputFormat.class);
-    AbsoluteTableIdentifier abs = StoreCreator.getAbsoluteTableIdentifier();
-    if (projection != null) {
-      CarbonTableInputFormat.setColumnProjection(job.getConfiguration(), projection);
-    }
-    if (filter != null) {
-      CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(), filter);
-    }
-    CarbonTableInputFormat.setDatabaseName(job.getConfiguration(),
-        abs.getCarbonTableIdentifier().getDatabaseName());
-    CarbonTableInputFormat.setTableName(job.getConfiguration(),
-        abs.getCarbonTableIdentifier().getTableName());
-    FileInputFormat.addInputPath(job, new Path(abs.getTablePath()));
-    CarbonUtil.deleteFoldersAndFiles(new File(outPath + "1"));
-    FileOutputFormat.setOutputPath(job, new Path(outPath + "1"));
-    job.getConfiguration().set("outpath", outPath);
-    job.getConfiguration().set("query.id", String.valueOf(System.nanoTime()));
-    boolean status = job.waitForCompletion(true);
-  }
-
-  public static void main(String[] args) throws Exception {
-    new CarbonTableInputMapperTest().runJob("target/output", null, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
new file mode 100644
index 0000000..9bb2f53
--- /dev/null
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.ft;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hadoop.test.util.StoreCreator;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CarbonTableOutputFormatTest {
+
+  static CarbonLoadModel carbonLoadModel;
+
+  // changed setUp to static init block to avoid un wanted multiple time store creation
+  static {
+    CarbonProperties.getInstance().
+        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
+    try {
+      carbonLoadModel = StoreCreator.createTableAndLoadModel();
+    } catch (Exception e) {
+      Assert.fail("create table failed: " + e.getMessage());
+    }
+  }
+
+
+  @Test public void testOutputFormat() throws Exception {
+    runJob("");
+    String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0");
+    File file = new File(segmentPath);
+    assert (file.exists());
+    File[] listFiles = file.listFiles(new FilenameFilter() {
+      @Override public boolean accept(File dir, String name) {
+        return name.endsWith(".carbondata") ||
+            name.endsWith(".carbonindex") ||
+            name.endsWith(".carbonindexmerge");
+      }
+    });
+
+    assert (listFiles.length == 2);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true");
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false");
+  }
+
+ public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable,
StringArrayWritable> {
+
+   @Override protected void map(NullWritable key, StringArrayWritable value, Context context)
+       throws IOException, InterruptedException {
+     context.write(key, value);
+   }
+ }
+
+  private void runJob(String outPath) throws Exception {
+    Configuration configuration = new Configuration();
+    configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath());
+    Job job = Job.getInstance(configuration);
+    job.setJarByClass(CarbonTableOutputFormatTest.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(StringArrayWritable.class);
+    job.setMapperClass(Map.class);
+    job.setNumReduceTasks(0);
+
+    FileInputFormat.addInputPath(job, new Path(carbonLoadModel.getFactFilePath()));
+    CarbonTableOutputFormat.setLoadModel(job.getConfiguration(), carbonLoadModel);
+    CarbonTableOutputFormat.setCarbonTable(job.getConfiguration(), carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable());
+    CSVInputFormat.setHeaderExtractionEnabled(job.getConfiguration(), true);
+    job.setInputFormatClass(CSVInputFormat.class);
+    job.setOutputFormatClass(CarbonTableOutputFormat.class);
+    CarbonUtil.deleteFoldersAndFiles(new File(carbonLoadModel.getTablePath() + "1"));
+    FileOutputFormat.setOutputPath(job, new Path(carbonLoadModel.getTablePath() + "1"));
+    job.getConfiguration().set("outpath", outPath);
+    job.getConfiguration().set("query.id", String.valueOf(System.nanoTime()));
+    job.waitForCompletion(true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index fc54238..fbf33d6 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -163,15 +163,9 @@ public class StoreCreator {
   /**
    * Create store without any restructure
    */
-  public static void createCarbonStore() {
-    try {
-      CarbonLoadModel loadModel = getCarbonLoadModel();
-
-      executeGraph(loadModel, storePath);
-
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+  public static void createCarbonStore() throws Exception {
+    CarbonLoadModel loadModel = createTableAndLoadModel();
+    loadData(loadModel, storePath);
   }
 
   /**
@@ -181,7 +175,7 @@ public class StoreCreator {
     DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier);
   }
 
-  public static CarbonLoadModel getCarbonLoadModel() throws Exception {
+  public static CarbonLoadModel createTableAndLoadModel() throws Exception {
     String factFilePath =
         new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
     File storeDir = new File(storePath);
@@ -387,7 +381,7 @@ public class StoreCreator {
    * @param storeLocation
    * @throws Exception
    */
-  public static void executeGraph(CarbonLoadModel loadModel, String storeLocation)
+  public static void loadData(CarbonLoadModel loadModel, String storeLocation)
       throws Exception {
     new File(storeLocation).mkdirs();
     String outPutLoc = storeLocation + "/etl";
@@ -519,7 +513,7 @@ public class StoreCreator {
     return date;
   }
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     StoreCreator.createCarbonStore();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala
index dbe7445..bb1d26f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala
@@ -25,18 +25,31 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{DataFrame, SaveMode}
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.schema.FilterType
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
 
-class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll {
+class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach
{
   private val executorService: ExecutorService = Executors.newFixedThreadPool(10)
   var df: DataFrame = _
 
   override def beforeAll {
     dropTable()
     buildTestData()
+
+    // register hook to the table to sleep, thus the other command will be executed
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"),
+      classOf[WaitingDataMap].getName,
+      "test")
   }
 
   private def buildTestData(): Unit = {
@@ -48,7 +61,7 @@ class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll {
     import sqlContext.implicits._
 
     val sdf = new SimpleDateFormat("yyyy-MM-dd")
-    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 1500000)
+    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000)
       .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime),
         "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,
         "ordersTable" + value))
@@ -62,8 +75,7 @@ class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll {
     df.write
       .format("carbondata")
       .option("tableName", tableName)
-      .option("tempCSV", "true")
-      .option("compress", "true")
+      .option("tempCSV", "false")
       .mode(SaveMode.Overwrite)
       .save()
   }
@@ -73,34 +85,53 @@ class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll {
     dropTable()
   }
 
+  override def beforeEach(): Unit = {
+    Global.overwriteRunning = false
+  }
+
   private def dropTable() = {
     sql("DROP TABLE IF EXISTS orders")
     sql("DROP TABLE IF EXISTS orders_overwrite")
   }
 
-  test("Concurrency test for Insert-Overwrite and compact") {
-    val tasks = new java.util.ArrayList[Callable[String]]()
-    tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite"))
-    tasks.add(new QueryTask("alter table orders compact 'MINOR'"))
-    val futures: util.List[Future[String]] = executorService.invokeAll(tasks)
-    val results = futures.asScala.map(_.get)
-    assert(results.contains("PASS"))
+  // run the input SQL and block until it is running
+  private def runSqlAsync(sql: String): Future[String] = {
+    assert(!Global.overwriteRunning)
+    var count = 0
+    val future = executorService.submit(
+      new QueryTask(sql)
+    )
+    while (!Global.overwriteRunning && count < 1000) {
+      Thread.sleep(10)
+      // to avoid dead loop in case WaitingDataMap is not invoked
+      count += 1
+    }
+    future
+  }
+
+  test("compaction should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
+    val ex = intercept[Exception]{
+      sql("alter table orders compact 'MINOR'")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains("Cannot run data loading and compaction on same table concurrently"))
   }
 
-  test("Concurrency test for Insert-Overwrite and update") {
-    val tasks = new java.util.ArrayList[Callable[String]]()
-    tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite"))
-    tasks.add(new QueryTask("update orders set (o_country)=('newCountry') where o_country='china'"))
-    val futures: util.List[Future[String]] = executorService.invokeAll(tasks)
-    val results = futures.asScala.map(_.get)
-    assert("PASS".equals(results.head) && "FAIL".equals(results(1)))
+  test("update should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
+    val ex = intercept[Exception] {
+      sql("update orders set (o_country)=('newCountry') where o_country='china'").show
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains("Cannot run data loading and update on same table concurrently"))
   }
 
   class QueryTask(query: String) extends Callable[String] {
     override def call(): String = {
       var result = "PASS"
       try {
-        sql(query).show()
+        sql(query).collect()
       } catch {
         case exception: Exception => LOGGER.error(exception.getMessage)
           result = "FAIL"
@@ -109,4 +140,47 @@ class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+}
+
+object Global {
+  var overwriteRunning = false
+}
+
+class WaitingDataMap() extends DataMapFactory {
+
+  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
+
+  override def fireEvent(event: Event): Unit = ???
+
+  override def clear(segmentId: String): Unit = {}
+
+  override def clear(): Unit = {}
+
+  override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap]
= ???
+
+  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+
+  override def createWriter(segmentId: String): DataMapWriter = {
+    new DataMapWriter {
+      override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit
= { }
+
+      override def onBlockletEnd(blockletId: Int): Unit = { }
+
+      override def onBlockEnd(blockId: String): Unit = { }
+
+      override def onBlockletStart(blockletId: Int): Unit = { }
+
+      override def onBlockStart(blockId: String): Unit = {
+        // trigger the second SQL to execute
+        Global.overwriteRunning = true
+
+        // wait for 1 second to let second SQL to finish
+        Thread.sleep(1000)
+      }
+    }
+  }
+
+  override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, FilterType.EQUALTO)
+
+  override def toDistributable(segmentId: String): util.List[DataMapDistributable] = ???
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 9d55d30..4cb9fdd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -482,7 +482,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
         .substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
     File curFile = new File(this.carbonDataFileTempPath);
     if (!curFile.renameTo(origFile)) {
-      throw new CarbonDataWriterException("Problem while renaming the file");
+      throw new CarbonDataWriterException("Problem while renaming the file (" + curFile +
+          "), to file (" + origFile + ")");
     }
   }
 


Mime
View raw message