drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [26/27] git commit: DRILL-1153: skip brackets, braces & respect escape chars in literal strings while splitting JSON records
Date Sun, 27 Jul 2014 18:47:12 GMT
DRILL-1153: skip brackets, braces & respect escape chars in literal strings while splitting
JSON records


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1fe9c21a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1fe9c21a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1fe9c21a

Branch: refs/heads/master
Commit: 1fe9c21a83e7642e31827bc6c582bf6ea0e22bf3
Parents: 846e291
Author: Hanifi Gunes <hgunes@maprtech.com>
Authored: Fri Jul 25 18:55:03 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sat Jul 26 10:43:31 2014 -0700

----------------------------------------------------------------------
 .../complex/fn/JsonRecordSplitterBase.java      | 117 +++++++++++++++++
 .../complex/fn/ReaderJSONRecordSplitter.java    | 112 +++--------------
 .../complex/fn/UTF8JsonRecordSplitter.java      | 125 ++++++-------------
 3 files changed, 178 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fe9c21a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java
new file mode 100644
index 0000000..c73fef1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.IOException;
+import java.io.Reader;
+
+public abstract class JsonRecordSplitterBase implements JsonRecordSplitter {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class);
+  public final static int MAX_RECORD_SIZE = 128*1024;
+
+  private static final int OPEN_CBRACKET = '{';
+  private static final int OPEN_BRACKET = '[';
+  private static final int CLOSE_CBRACKET = '}';
+  private static final int CLOSE_BRACKET = ']';
+  private static final int ESCAPE = '\\';
+  private static final int LITERAL = '"';
+
+  private static final int SPACE = ' ';
+  private static final int TAB = '\t';
+  private static final int NEW_LINE = '\n';
+  private static final int FORM_FEED = '\f';
+  private static final int CR = '\r';
+
+  private long start = 0;
+
+
+  protected void preScan() throws IOException { }
+
+  protected void postScan() throws IOException { }
+
+  protected abstract int readNext() throws IOException;
+
+  protected abstract Reader createReader(long maxBytes);
+
+  @Override
+  public Reader getNextReader() throws IOException {
+    preScan();
+
+    boolean isEscaped = false;
+    boolean inLiteral = false;
+    boolean inCandidate = false;
+    boolean found = false;
+
+    long endOffset = start;
+    int cur;
+    outside: while(true) {
+      cur = readNext();
+      endOffset++;
+
+      if(cur == -1) {
+        if(inCandidate){
+          found = true;
+        }
+        break;
+      }
+
+      switch(cur) {
+        case ESCAPE:
+          isEscaped = !isEscaped;
+          break;
+        case LITERAL:
+          if (!isEscaped) {
+            inLiteral = !inLiteral;
+          }
+          isEscaped = false;
+          break;
+        case CLOSE_BRACKET:
+        case CLOSE_CBRACKET:
+          inCandidate = !inLiteral;
+          break;
+        case OPEN_BRACKET:
+        case OPEN_CBRACKET:
+          if(inCandidate){
+            found = true;
+            break outside;
+          }
+          break;
+
+        case SPACE:
+        case TAB:
+        case NEW_LINE:
+        case CR:
+        case FORM_FEED:
+          break;
+
+        default:
+          inCandidate = false;
+          isEscaped = false;
+      }
+    }
+
+    if(!found) {
+      return null;
+    }
+
+    postScan();
+    long maxBytes = endOffset - 1 - start;
+    start = endOffset;
+    return createReader(maxBytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fe9c21a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java
index 0cdbf85..aef8b75 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java
@@ -21,91 +21,37 @@ import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
 
-import com.google.common.io.CharStreams;
+public class ReaderJSONRecordSplitter extends JsonRecordSplitterBase {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class);
 
-public class ReaderJSONRecordSplitter implements JsonRecordSplitter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class);
-
-  private static final int OPEN_CBRACKET = '{';
-  private static final int OPEN_BRACKET = '[';
-  private static final int CLOSE_CBRACKET = '}';
-  private static final int CLOSE_BRACKET = ']';
-
-  private static final int SPACE = ' ';
-  private static final int TAB = '\t';
-  private static final int NEW_LINE = '\n';
-  private static final int FORM_FEED = '\f';
-  private static final int CR = '\r';
-
-  private long start = 0;
   private Reader reader;
 
+  public ReaderJSONRecordSplitter(String str){
+    this(new StringReader(str));
+  }
+
   public ReaderJSONRecordSplitter(Reader reader){
     this.reader = reader;
   }
 
-  public ReaderJSONRecordSplitter(String str){
-    this.reader = new StringReader(str);
+  @Override
+  protected void preScan() throws IOException {
+    reader.mark(MAX_RECORD_SIZE);
   }
 
   @Override
-  public Reader getNextReader() throws IOException{
-
-    boolean inCandidate = false;
-    boolean found = false;
-
-    reader.mark(128*1024);
-    long endOffset = start;
-    outside: while(true){
-      int c = reader.read();
-//      System.out.println(b);
-      endOffset++;
-
-      if(c == -1){
-        if(inCandidate){
-          found = true;
-        }
-        break;
-      }
-
-      switch(c){
-      case CLOSE_BRACKET:
-      case CLOSE_CBRACKET:
-//        System.out.print("c");
-        inCandidate = true;
-        break;
-      case OPEN_BRACKET:
-      case OPEN_CBRACKET:
-//        System.out.print("o");
-        if(inCandidate){
-          found = true;
-          break outside;
-        }
-        break;
-
-      case SPACE:
-      case TAB:
-      case NEW_LINE:
-      case CR:
-      case FORM_FEED:
-//        System.out.print(' ');
-        break;
-
-      default:
-//        System.out.print('-');
-        inCandidate = false;
-      }
-    }
+  protected void postScan() throws IOException {
+    reader.reset();
+  }
 
-    if(found){
-      long maxBytes = endOffset - 1 - start;
-      start = endOffset;
-      reader.reset();
-      return new LimitedReader(reader, (int) maxBytes);
-    }else{
-      return null;
-    }
+  @Override
+  protected int readNext() throws IOException {
+    return reader.read();
+  }
 
+  @Override
+  protected Reader createReader(long maxBytes) {
+    return new LimitedReader(reader, (int)maxBytes);
   }
 
   private class LimitedReader extends Reader {
@@ -128,11 +74,8 @@ public class ReaderJSONRecordSplitter implements JsonRecordSplitter {
         bytes++;
         return incoming.read();
       }
-
-
     }
 
-
     @Override
     public void mark(int readAheadLimit) throws IOException {
       incoming.mark(readAheadLimit);
@@ -158,21 +101,6 @@ public class ReaderJSONRecordSplitter implements JsonRecordSplitter {
     }
 
     @Override
-    public void close() throws IOException {
-    }
-
-  }
-
-  public static void main(String[] args) throws Exception{
-    String str = " { \"b\": \"hello\", \"c\": \"goodbye\", r: []}\n { \"b\": \"yellow\",
\"c\": \"red\"}\n ";
-    JsonRecordSplitter splitter = new ReaderJSONRecordSplitter(new StringReader(str));
-    Reader obj = null;
-    System.out.println();
-
-    while( (obj = splitter.getNextReader()) != null){
-      System.out.println();
-      System.out.println(CharStreams.toString(obj));
-      System.out.println("===end obj===");
-    }
+    public void close() throws IOException { }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1fe9c21a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
index e46e1bd..ee94015 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
@@ -19,94 +19,45 @@ package org.apache.drill.exec.vector.complex.fn;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Charsets;
-import com.google.common.io.CharStreams;
 
-public class UTF8JsonRecordSplitter implements JsonRecordSplitter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UTF8JsonRecordSplitter.class);
+public class UTF8JsonRecordSplitter extends JsonRecordSplitterBase {
+  private final static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UTF8JsonRecordSplitter.class);
 
-  private static final int OPEN_CBRACKET = '{';
-  private static final int OPEN_BRACKET = '[';
-  private static final int CLOSE_CBRACKET = '}';
-  private static final int CLOSE_BRACKET = ']';
-
-  private static final int SPACE = ' ';
-  private static final int TAB = '\t';
-  private static final int NEW_LINE = '\n';
-  private static final int FORM_FEED = '\f';
-  private static final int CR = '\r';
-
-  private long start = 0;
-  private InputStream incoming;
+  private final InputStream incoming;
 
   public UTF8JsonRecordSplitter(InputStream incoming){
     this.incoming = new BufferedInputStream(incoming);
   }
 
   @Override
-  public Reader getNextReader() throws IOException{
-
-    boolean inCandidate = false;
-    boolean found = false;
-
-    incoming.mark(128*1024);
-    long endOffset = start;
-    outside: while(true){
-      int b = incoming.read();
-//      System.out.println(b);
-      endOffset++;
-
-      if(b == -1){
-        if(inCandidate){
-          found = true;
-        }
-        break;
-      }
-
-      switch(b){
-      case CLOSE_BRACKET:
-      case CLOSE_CBRACKET:
-//        System.out.print("c");
-        inCandidate = true;
-        break;
-      case OPEN_BRACKET:
-      case OPEN_CBRACKET:
-//        System.out.print("o");
-        if(inCandidate){
-          found = true;
-          break outside;
-        }
-        break;
-
-      case SPACE:
-      case TAB:
-      case NEW_LINE:
-      case CR:
-      case FORM_FEED:
-//        System.out.print(' ');
-        break;
+  protected void preScan() throws IOException {
+    incoming.mark(MAX_RECORD_SIZE);
+  }
 
-      default:
-//        System.out.print('-');
-        inCandidate = false;
-      }
-    }
+  @Override
+  protected void postScan() throws IOException {
+    incoming.reset();
+  }
 
-    if(found){
-      long maxBytes = endOffset - 1 - start;
-      start = endOffset;
-      incoming.reset();
-      return new BufferedReader(new InputStreamReader(new DelInputStream(incoming, maxBytes),
Charsets.UTF_8));
-    }else{
-      return null;
-    }
+  @Override
+  protected int readNext() throws IOException {
+    return incoming.read();
+  }
 
+  @Override
+  protected Reader createReader(long maxBytes) {
+    return new BufferedReader(new InputStreamReader(new DelInputStream(incoming, maxBytes),
Charsets.UTF_8));
   }
 
   private class DelInputStream extends InputStream {
@@ -128,23 +79,29 @@ public class UTF8JsonRecordSplitter implements JsonRecordSplitter {
         bytes++;
         return incoming.read();
       }
-
-
     }
-
   }
 
-  public static void main(String[] args) throws Exception{
-    byte[] str = " { \"b\": \"hello\", \"c\": \"goodbye\", r: []}\n { \"b\": \"yellow\",
\"c\": \"red\"}\n ".getBytes(Charsets.UTF_8);
-    InputStream s = new ByteArrayInputStream(str);
+  public static void main(String[] args) throws Exception {
+    String path = "/Users/hgunes/workspaces/mapr/incubator-drill/yelp_academic_dataset_review.json";
+    InputStream s = new FileInputStream(new File(path));
     JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(s);
-    Reader obj = null;
-    System.out.println();
-
-    while( (obj = splitter.getNextReader()) != null){
-      System.out.println();
-      System.out.println(CharStreams.toString(obj));
-      System.out.println("===end obj===");
+    int recordCount = 0;
+    Reader record = null;
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      while ((record = splitter.getNextReader()) != null) {
+        recordCount++;
+        JsonNode node = mapper.readTree(record);
+        out(node.toString());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
     }
+    out("last record: " + recordCount);
+  }
+
+  static void out(Object thing) {
+    System.out.println(thing);
   }
 }


Mime
View raw message