carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [1/2] incubator-carbondata git commit: add Flink example
Date Mon, 09 Jan 2017 03:22:53 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master f7f40c09f -> bc5a061e9


add Flink example

fix comment


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/33cd11e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/33cd11e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/33cd11e1

Branch: refs/heads/master
Commit: 33cd11e17f667bb8477986f4ac81ab0223a5bc8d
Parents: f7f40c0
Author: jackylk <jacky.likun@huawei.com>
Authored: Sun Jan 8 00:13:06 2017 +0800
Committer: chenliang613 <chenliang613@apache.org>
Committed: Mon Jan 9 11:21:02 2017 +0800

----------------------------------------------------------------------
 examples/flink/pom.xml                          | 77 ++++++++++++++++++++
 .../carbondata/examples/FlinkExample.scala      | 62 ++++++++++++++++
 .../examples/DataFrameAPIExample.scala          |  3 +-
 .../carbondata/examples/DatasourceExample.scala |  3 +
 .../carbondata/examples/DirectSQLExample.scala  |  3 +
 .../carbondata/examples/HadoopFileExample.scala | 20 ++++-
 .../carbondata/examples/util/ExampleUtils.scala | 20 +++--
 pom.xml                                         | 10 +++
 8 files changed, 184 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/33cd11e1/examples/flink/pom.xml
----------------------------------------------------------------------
diff --git a/examples/flink/pom.xml b/examples/flink/pom.xml
new file mode 100644
index 0000000..a365521
--- /dev/null
+++ b/examples/flink/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.0.0-incubating-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-examples-flink</artifactId>
+  <name>Apache CarbonData :: Flink Examples</name>
+
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-java</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-clients_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-hadoop</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-examples-spark</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <sourceDirectory>src/main/scala</sourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/33cd11e1/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
----------------------------------------------------------------------
diff --git a/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
b/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
new file mode 100644
index 0000000..9ce95ae
--- /dev/null
+++ b/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.Job
+
+import org.apache.carbondata.examples.util.ExampleUtils
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonProjection}
+
+// Write carbondata file by spark and read it by flink
+// scalastyle:off println
+object FlinkExample {
+
+  def main(args: Array[String]): Unit = {
+    // write carbondata file by spark
+    val cc = ExampleUtils.createCarbonContext("FlinkExample")
+    val path = ExampleUtils.writeSampleCarbonFile(cc, "carbon1")
+
+    // read two columns by flink
+    val projection = new CarbonProjection
+    projection.addColumn("c1")  // column c1
+    projection.addColumn("c3")  // column c3
+    val conf = new Configuration()
+    CarbonInputFormat.setColumnProjection(conf, projection)
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.readHadoopFile(
+      new CarbonInputFormat[Array[Object]],
+      classOf[Void],
+      classOf[Array[Object]],
+      path,
+      new Job(conf)
+    )
+
+    // print result
+    val result = ds.collect()
+    for (i <- 0 until result.size()) {
+      println(result.get(i).f1.mkString(","))
+    }
+
+    // delete carbondata file
+    ExampleUtils.cleanSampleCarbonFile(cc, "carbon1")
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/33cd11e1/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
index 97fa152..db5def9 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
@@ -44,7 +44,8 @@ object DataFrameAPIExample {
     // use SQL to read
     cc.sql("SELECT c1, count(c3) FROM carbon1 where c3 > 500 group by c1 limit 10").show
 
-    cc.sql("DROP TABLE IF EXISTS carbon1")
+    // delete carbondata file
+    ExampleUtils.cleanSampleCarbonFile(cc, "carbon1")
   }
 }
 // scalastyle:on println

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/33cd11e1/examples/spark/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
b/examples/spark/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
index 791a126..a3af2c3 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
@@ -37,5 +37,8 @@ object DatasourceExample {
         | OPTIONS (path '${cc.storePath}/default/table1')
       """.stripMargin)
     sqlContext.sql("SELECT c1, c2, count(*) FROM source WHERE c3 > 100 GROUP BY c1, c2").show
+
+    // delete carbondata file
+    ExampleUtils.cleanSampleCarbonFile(cc, "table1")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/33cd11e1/examples/spark/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
b/examples/spark/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
index 5e448fe..6a66b93 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
@@ -40,5 +40,8 @@ object DirectSQLExample {
         | WHERE c3 > 100
         | GROUP BY c1, c2
       """.stripMargin).show
+
+    // delete carbondata file
+    ExampleUtils.cleanSampleCarbonFile(cc, "table1")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/33cd11e1/examples/spark/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
b/examples/spark/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
index 292a3b5..5fec332 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
@@ -17,8 +17,13 @@
 
 package org.apache.carbondata.examples
 
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.examples.util.ExampleUtils
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonProjection}
+import org.apache.carbondata.scan.expression.{ColumnExpression, LiteralExpression}
+import org.apache.carbondata.scan.expression.conditional.GreaterThanExpression
 
 // scalastyle:off println
 object HadoopFileExample {
@@ -27,13 +32,24 @@ object HadoopFileExample {
     val cc = ExampleUtils.createCarbonContext("HadoopFileExample")
     ExampleUtils.writeSampleCarbonFile(cc, "carbon1")
 
+    // read two columns
+    val projection = new CarbonProjection
+    projection.addColumn("c1")  // column c1
+    projection.addColumn("c3")  // column c3
+    val conf = new Configuration()
+    CarbonInputFormat.setColumnProjection(conf, projection)
+
     val sc = cc.sparkContext
     val input = sc.newAPIHadoopFile(s"${cc.storePath}/default/carbon1",
       classOf[CarbonInputFormat[Array[Object]]],
       classOf[Void],
-      classOf[Array[Object]])
+      classOf[Array[Object]],
+      conf)
     val result = input.map(x => x._2.toList).collect
     result.foreach(x => println(x.mkString(", ")))
+
+    // delete carbondata file
+    ExampleUtils.cleanSampleCarbonFile(cc, "carbon1")
   }
 }
 // scalastyle:on println

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/33cd11e1/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
b/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
index 7649ac3..54d9139 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
@@ -56,17 +56,21 @@ object ExampleUtils {
   /**
    * This func will write a sample CarbonData file containing following schema:
    * c1: String, c2: String, c3: Double
+   * Returns table path
    */
-  def writeSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): Unit
= {
+  def writeSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): String
= {
     cc.sql(s"DROP TABLE IF EXISTS $tableName")
     writeDataframe(cc, tableName, numRows, SaveMode.Overwrite)
+    s"$storeLocation/default/$tableName"
   }
 
   /**
    * This func will append data to the CarbonData file
+   * Returns table path
    */
-  def appendSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000):
Unit = {
+  def appendSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000):
String = {
     writeDataframe(cc, tableName, numRows, SaveMode.Append)
+    s"$storeLocation/default/$tableName"
   }
 
   /**
@@ -81,15 +85,6 @@ object ExampleUtils {
         .map(x => ("a", "b", x))
         .toDF("c1", "c2", "c3")
 
-    // save dataframe to carbon file:(df->csv->carbon file)
-    df.write
-        .format("carbondata")
-        .option("tableName", tableName)
-        .option("compress", "true")
-        .option("useKettle", "false")
-        .mode(mode)
-        .save()
-
     // save dataframe directl to carbon file without tempCSV
     df.write
       .format("carbondata")
@@ -99,7 +94,10 @@ object ExampleUtils {
       .option("tempCSV", "false")
       .mode(mode)
       .save()
+  }
 
+  def cleanSampleCarbonFile(cc: CarbonContext, tableName: String): Unit = {
+    cc.sql(s"DROP TABLE IF EXISTS $tableName")
   }
 }
 // scalastyle:on println

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/33cd11e1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cb23ef7..f68b24b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -348,6 +348,16 @@
       </modules>
     </profile>
     <profile>
+      <id>flink</id>
+      <properties>
+        <flink.version>1.1.4</flink.version>
+        <scala.binary.version>2.10</scala.binary.version>
+      </properties>
+      <modules>
+        <module>examples/flink</module>
+      </modules>
+    </profile>
+    <profile>
       <id>findbugs</id>
       <build>
         <plugins>


Mime
View raw message