fluo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] keith-turner closed pull request #1018: Scan command now can produce results as csv and json. Related to #984
Date Wed, 04 Apr 2018 14:31:56 GMT
keith-turner closed pull request #1018: Scan command now can produce results as csv and json.
Related to #984
URL: https://github.com/apache/fluo/pull/1018
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index 9de17f44..3ec090f5 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -15,6 +15,7 @@
 
 package org.apache.fluo.cluster.runner;
 
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Collections;
@@ -72,10 +73,15 @@ public void scan(FluoConfiguration config, String[] args) {
       System.exit(0);
     }
 
-    if (options.scanAccumuloTable) {
-      ScanUtil.scanAccumulo(options.getScanOpts(), config);
-    } else {
-      ScanUtil.scanFluo(options.getScanOpts(), config);
+    try {
+      if (options.scanAccumuloTable) {
+        ScanUtil.scanAccumulo(options.getScanOpts(), config, System.out);
+      } else {
+        ScanUtil.scanFluo(options.getScanOpts(), config, System.out);
+      }
+    } catch (IOException e) {
+      System.err.println(e.getMessage());
+      System.exit(-1);
     }
   }
 
@@ -228,7 +234,7 @@ public String getRowPrefix() {
 
     public ScanUtil.ScanOpts getScanOpts() {
       return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, help,
-          hexEncNonAscii, scanAccumuloTable);
+          hexEncNonAscii, scanAccumuloTable, false);
     }
   }
 }
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
index 4d56194d..bcd03c07 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain
a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under
the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
  * or implied. See the License for the specific language governing permissions and limitations
under
@@ -15,6 +15,7 @@
 
 package org.apache.fluo.command;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
@@ -54,6 +55,10 @@
             + "internal schema, making it easier to comprehend.")
     public boolean scanAccumuloTable = false;
 
+    @Parameter(names = "--json", help = true,
+        description = "Export key/values stored in Accumulo as JSON file.")
+    public boolean exportAsJson = false;
+
     public String getStartRow() {
       return startRow;
     }
@@ -77,9 +82,19 @@ public String getRowPrefix() {
       return columns;
     }
 
+    /**
+     * Check if the parameters informed can be used together.
+     */
+    private void checkScanOptions() {
+      if (this.scanAccumuloTable && this.exportAsJson) {
+        throw new IllegalArgumentException(
+            "Both \"--raw\" and \"--json\" can not be set together.");
+      }
+    }
+
     public ScanUtil.ScanOpts getScanOpts() {
       return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, help,
-          hexEncNonAscii, scanAccumuloTable);
+          hexEncNonAscii, scanAccumuloTable, exportAsJson);
     }
 
     public static ScanOptions parse(String[] args) {
@@ -95,16 +110,24 @@ public static void main(String[] args) {
     Logger.getLogger("org.apache.fluo").setLevel(Level.ERROR);
 
     ScanOptions options = ScanOptions.parse(args);
+    options.checkScanOptions();
     FluoConfiguration config = CommandUtil.resolveFluoConfig();
     config.setApplicationName(options.getApplicationName());
     options.overrideFluoConfig(config);
     CommandUtil.verifyAppRunning(config);
 
-    if (options.scanAccumuloTable) {
-      config = FluoAdminImpl.mergeZookeeperConfig(config);
-      ScanUtil.scanAccumulo(options.getScanOpts(), config);
-    } else {
-      ScanUtil.scanFluo(options.getScanOpts(), config);
+    try {
+      options.overrideFluoConfig(config);
+      if (options.scanAccumuloTable) {
+        config = FluoAdminImpl.mergeZookeeperConfig(config);
+        ScanUtil.scanAccumulo(options.getScanOpts(), config, System.out);
+      } else {
+        ScanUtil.scanFluo(options.getScanOpts(), config, System.out);
+      }
+    } catch (RuntimeException | IOException e) {
+      System.err.println("Scan failed - " + e.getMessage());
+      System.exit(-1);
     }
   }
+
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index 46aacca8..4690fd5a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -15,12 +15,17 @@
 
 package org.apache.fluo.core.util;
 
+import java.io.IOException;
+import java.io.PrintStream;
+import java.text.DateFormat;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
 
-import com.google.common.collect.Iterables;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.security.Authorizations;
@@ -34,9 +39,19 @@
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.exceptions.FluoException;
+
+import com.google.common.collect.Iterables;
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonIOException;
 
 public class ScanUtil {
+  public static final String FLUO_VALUE = "value";
+  public static final String FLUO_COLUMN_VISIBILITY = "visibility";
+  public static final String FLUO_COLUMN_QUALIFIER = "qualifier";
+  public static final String FLUO_COLUMN_FAMILY = "family";
+  public static final String FLUO_ROW = "row";
 
   public static Span getSpan(ScanOpts options) {
     Span span = new Span();
@@ -89,68 +104,83 @@ public static Span getSpan(ScanOpts options) {
     return columns;
   }
 
-  public static void scanFluo(ScanOpts options, FluoConfiguration sConfig) {
+
+  private static Function<Bytes, String> getEncoder(ScanOpts options) {
+    if (options.hexEncNonAscii) {
+      return Hex::encNonAscii;
+    } else {
+      return Bytes::toString;
+    }
+  }
+
+  public static void scanFluo(ScanOpts options, FluoConfiguration sConfig, PrintStream out)
+      throws IOException {
 
     try (FluoClient client = FluoFactory.newClient(sConfig)) {
       try (Snapshot s = client.newSnapshot()) {
 
-        Span span = null;
-        Collection<Column> columns = null;
-        try {
-          span = getSpan(options);
-          columns = getColumns(options);
-        } catch (IllegalArgumentException e) {
-          System.err.println(e.getMessage());
-          System.exit(-1);
-        }
-
+        Span span = getSpan(options);
+        Collection<Column> columns = getColumns(options);
         CellScanner cellScanner = s.scanner().over(span).fetch(columns).build();
+        Function<Bytes, String> encoder = getEncoder(options);
 
-        StringBuilder sb = new StringBuilder();
-        for (RowColumnValue rcv : cellScanner) {
-          if (options.hexEncNonAscii) {
-            sb.setLength(0);
-            Hex.encNonAscii(sb, rcv.getRow());
-            sb.append(" ");
-            Hex.encNonAscii(sb, rcv.getColumn(), " ");
-            sb.append("\t");
-            Hex.encNonAscii(sb, rcv.getValue());
-            System.out.println(sb.toString());
-          } else {
-            sb.setLength(0);
-            sb.append(rcv.getsRow());
-            sb.append(" ");
-            sb.append(rcv.getColumn());
-            sb.append("\t");
-            sb.append(rcv.getsValue());
-            System.out.println(sb.toString());
-          }
-
-          if (System.out.checkError()) {
-            break;
+        if (options.exportAsJson) {
+          generateJson(cellScanner, encoder, out);
+        } else {
+          for (RowColumnValue rcv : cellScanner) {
+            out.print(encoder.apply(rcv.getRow()));
+            out.print(' ');
+            out.print(encoder.apply(rcv.getColumn().getFamily()));
+            out.print(' ');
+            out.print(encoder.apply(rcv.getColumn().getQualifier()));
+            out.print(' ');
+            out.print(encoder.apply(rcv.getColumn().getVisibility()));
+            out.print("\t");
+            out.print(encoder.apply(rcv.getValue()));
+            out.println();
+            if (out.checkError()) {
+              break;
+            }
           }
         }
+      }
+    }
+  }
 
-      } catch (FluoException e) {
-        System.err.println("Scan failed - " + e.getMessage());
-        System.exit(-1);
+  /**
+   * Generate JSON format as result of the scan.
+   *
+   * @since 1.2
+   */
+  private static void generateJson(CellScanner cellScanner, Function<Bytes, String>
encoder,
+      PrintStream out) throws JsonIOException {
+    Gson gson = new GsonBuilder().serializeNulls().setDateFormat(DateFormat.LONG)
+        .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).setVersion(1.0)
+        .create();
+
+    Map<String, String> json = new LinkedHashMap<>();
+    for (RowColumnValue rcv : cellScanner) {
+      json.put(FLUO_ROW, encoder.apply(rcv.getRow()));
+      json.put(FLUO_COLUMN_FAMILY, encoder.apply(rcv.getColumn().getFamily()));
+      json.put(FLUO_COLUMN_QUALIFIER, encoder.apply(rcv.getColumn().getQualifier()));
+      json.put(FLUO_COLUMN_VISIBILITY, encoder.apply(rcv.getColumn().getVisibility()));
+      json.put(FLUO_VALUE, encoder.apply(rcv.getValue()));
+      gson.toJson(json, out);
+      out.append("\n");
+
+      if (out.checkError()) {
+        break;
       }
     }
+    out.flush();
   }
 
-  public static void scanAccumulo(ScanOpts options, FluoConfiguration sConfig) {
+  public static void scanAccumulo(ScanOpts options, FluoConfiguration sConfig, PrintStream
out) {
 
     Connector conn = AccumuloUtil.getConnector(sConfig);
 
-    Span span = null;
-    Collection<Column> columns = null;
-    try {
-      span = getSpan(options);
-      columns = getColumns(options);
-    } catch (IllegalArgumentException e) {
-      System.err.println(e.getMessage());
-      System.exit(-1);
-    }
+    Span span = getSpan(options);
+    Collection<Column> columns = getColumns(options);
 
     try {
       Scanner scanner = conn.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
@@ -165,11 +195,11 @@ public static void scanAccumulo(ScanOpts options, FluoConfiguration
sConfig) {
       }
 
       for (String entry : Iterables.transform(scanner, FluoFormatter::toString)) {
-        System.out.println(entry);
+        out.println(entry);
       }
+      out.flush();
     } catch (Exception e) {
-      System.err.println("Scan failed - " + e.getMessage());
-      System.exit(-1);
+      throw new RuntimeException(e);
     }
   }
 
@@ -183,9 +213,11 @@ public static void scanAccumulo(ScanOpts options, FluoConfiguration sConfig)
{
     public boolean help;
     public boolean hexEncNonAscii = true;
     public boolean scanAccumuloTable = false;
+    public boolean exportAsJson = false;
 
     public ScanOpts(String startRow, String endRow, List<String> columns, String exactRow,
-        String rowPrefix, boolean help, boolean hexEncNonAscii, boolean scanAccumuloTable)
{
+        String rowPrefix, boolean help, boolean hexEncNonAscii, boolean scanAccumuloTable,
+        boolean exportAsJson) {
       this.startRow = startRow;
       this.endRow = endRow;
       this.columns = columns;
@@ -194,6 +226,7 @@ public ScanOpts(String startRow, String endRow, List<String> columns,
String exa
       this.help = help;
       this.hexEncNonAscii = hexEncNonAscii;
       this.scanAccumuloTable = scanAccumuloTable;
+      this.exportAsJson = exportAsJson;
     }
 
     public String getStartRow() {
diff --git a/modules/distribution/src/main/lib/fetch.sh b/modules/distribution/src/main/lib/fetch.sh
index d945d834..2dd523c0 100755
--- a/modules/distribution/src/main/lib/fetch.sh
+++ b/modules/distribution/src/main/lib/fetch.sh
@@ -17,7 +17,7 @@ lib_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
 maven_prefix=https://repo1.maven.org/maven2
 
 function download {
-  IFS=':' read -ra DEP <<< "$1" 
+  IFS=':' read -ra DEP <<< "$1"
   dir=$lib_dir/
   if [ -n "$2" ]; then
     dir=$lib_dir/$2
@@ -35,7 +35,7 @@ function download {
 
   if [ -f $dir/$fn ]; then
     echo "SUCCESS: Dependency exists - $dir/$fn"
-  else 
+  else
     wget -q $download_url -P $dir
     if [ $? == 0 ]; then
       echo "SUCCESS: Dependency downloaded from $download_url"
@@ -56,7 +56,7 @@ extra)
   echo "Fetching extra Fluo dependencies"
   download aopalliance:aopalliance:jar:1.0
   download com.beust:jcommander:jar:1.32
-  download com.google.code.gson:gson:jar:2.2.4
+  download com.google.code.gson:gson:jar:2.8.0
   download com.google.guava:guava:jar:13.0.1
   download com.google.inject:guice:jar:4.0
   download commons-collections:commons-collections:jar:3.2.1
@@ -117,4 +117,3 @@ extra)
   echo -e "However, you can override them using the command below:\n"
   echo "./fetch.sh ahz -Daccumulo.version=1.7.2 -Dhadoop.version=2.7.2 -Dzookeeper.version=3.4.8"
 esac
-
diff --git a/pom.xml b/pom.xml
index ccd069e2..c4f8f3e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,9 +5,9 @@
   copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
   "License"); you may not use this file except in compliance with the License. You may obtain
a
   copy of the License at
-   
+
      http://www.apache.org/licenses/LICENSE-2.0
-  
+
   Unless required by applicable law or agreed to in writing, software distributed under the
License
   is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
   or implied. See the License for the specific language governing permissions and limitations
under


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message