helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [1/3] helix git commit: Move all command tool classes into a separate sub-folder.
Date Tue, 24 Apr 2018 22:50:12 GMT
Repository: helix
Updated Branches:
  refs/heads/master 5f9fadc72 -> 676207530


http://git-wip-us.apache.org/repos/asf/helix/blob/67620753/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
new file mode 100644
index 0000000..c171b73
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
@@ -0,0 +1,232 @@
+package org.apache.helix.tools.commandtools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.helix.manager.zk.ByteArraySerializer;
+import org.apache.helix.manager.zk.ZkClient;
+
+/**
+ * Dumps the Zookeeper file structure on to Disk
+ */
+@SuppressWarnings("static-access")
+public class ZKDumper {
+  private ZkClient client;
+  private FilenameFilter filter;
+  static Options options;
+  private String suffix = "";
+  // enable by default
+  private boolean removeSuffix = false;
+
+  public String getSuffix() {
+    return suffix;
+  }
+
+  public void setSuffix(String suffix) {
+    this.suffix = suffix;
+  }
+
+  public boolean isRemoveSuffix() {
+    return removeSuffix;
+  }
+
+  public void setRemoveSuffix(boolean removeSuffix) {
+    this.removeSuffix = removeSuffix;
+  }
+
+  static {
+    options = new Options();
+    OptionGroup optionGroup = new OptionGroup();
+
+    Option d =
+        OptionBuilder.withLongOpt("download").withDescription("Download from ZK to File System")
+            .create();
+    d.setArgs(0);
+    Option dSuffix =
+        OptionBuilder.withLongOpt("addSuffix")
+            .withDescription("add suffix to every file downloaded from ZK").create();
+    dSuffix.setArgs(1);
+    dSuffix.setRequired(false);
+
+    Option u =
+        OptionBuilder.withLongOpt("upload").withDescription("Upload from File System to ZK")
+            .create();
+    u.setArgs(0);
+    Option uSuffix =
+        OptionBuilder.withLongOpt("removeSuffix")
+            .withDescription("remove suffix from every file uploaded to ZK").create();
+    uSuffix.setArgs(0);
+    uSuffix.setRequired(false);
+
+    Option del =
+        OptionBuilder.withLongOpt("delete").withDescription("Delete given path from ZK").create();
+
+    optionGroup.setRequired(true);
+    optionGroup.addOption(del);
+    optionGroup.addOption(u);
+    optionGroup.addOption(d);
+    options.addOptionGroup(optionGroup);
+    options.addOption("zkSvr", true, "Zookeeper address");
+    options.addOption("zkpath", true, "Zookeeper path");
+    options.addOption("fspath", true, "Path on local Filesystem to dump");
+    options.addOption("h", "help", false, "Print this usage information");
+    options.addOption("v", "verbose", false, "Print out VERBOSE information");
+    options.addOption(dSuffix);
+    options.addOption(uSuffix);
+  }
+
+  public ZKDumper(String zkAddress) {
+    client = new ZkClient(zkAddress, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+
+    ZkSerializer zkSerializer = new ByteArraySerializer();
+    client.setZkSerializer(zkSerializer);
+    filter = new FilenameFilter() {
+
+      @Override
+      public boolean accept(File dir, String name) {
+        return !name.startsWith(".");
+      }
+    };
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args == null || args.length == 0) {
+      HelpFormatter helpFormatter = new HelpFormatter();
+      helpFormatter.printHelp("java " + ZKDumper.class.getName(), options);
+      System.exit(1);
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+    cmd.hasOption("zkSvr");
+    boolean download = cmd.hasOption("download");
+    boolean upload = cmd.hasOption("upload");
+    boolean del = cmd.hasOption("delete");
+    String zkAddress = cmd.getOptionValue("zkSvr");
+    String zkPath = cmd.getOptionValue("zkpath");
+    String fsPath = cmd.getOptionValue("fspath");
+
+    ZKDumper zkDump = new ZKDumper(zkAddress);
+    if (download) {
+      if (cmd.hasOption("addSuffix")) {
+        zkDump.suffix = cmd.getOptionValue("addSuffix");
+      }
+      zkDump.download(zkPath, fsPath + zkPath);
+    }
+    if (upload) {
+      if (cmd.hasOption("removeSuffix")) {
+        zkDump.removeSuffix = true;
+      }
+      zkDump.upload(zkPath, fsPath);
+    }
+    if (del) {
+      zkDump.delete(zkPath);
+    }
+  }
+
+  private void delete(String zkPath) {
+    client.deleteRecursively(zkPath);
+
+  }
+
+  public void upload(String zkPath, String fsPath) throws Exception {
+    File file = new File(fsPath);
+    System.out.println("Uploading " + file.getCanonicalPath() + " to " + zkPath);
+    zkPath = zkPath.replaceAll("[/]+", "/");
+    int index = -1;
+    if (removeSuffix && (index = file.getName().indexOf(".")) > -1) {
+      zkPath = zkPath.replaceAll(file.getName().substring(index), "");
+    }
+    if (file.isDirectory()) {
+      File[] children = file.listFiles(filter);
+      client.createPersistent(zkPath, true);
+      if (children != null && children.length > 0) {
+
+        for (File child : children) {
+          upload(zkPath + "/" + child.getName(), fsPath + "/" + child.getName());
+        }
+      } else {
+
+      }
+    } else {
+      byte[] result = new byte[(int) file.length()];
+      InputStream input = null;
+      try {
+        int totalBytesRead = 0;
+        input = new BufferedInputStream(new FileInputStream(file));
+        while (totalBytesRead < result.length) {
+          int bytesRemaining = result.length - totalBytesRead;
+          // input.read() returns -1, 0, or more :
+          int bytesRead = input.read(result, totalBytesRead, bytesRemaining);
+          if (bytesRead > 0) {
+            totalBytesRead = totalBytesRead + bytesRead;
+          }
+        }
+        /*
+         * the above style is a bit tricky: it places bytes into the 'result'
+         * array; 'result' is an output parameter; the while loop usually has a
+         * single iteration only.
+         */
+
+        client.createPersistent(zkPath, result);
+      } finally {
+        input.close();
+      }
+
+    }
+  }
+
+  public void download(String zkPath, String fsPath) throws Exception {
+
+    List<String> children = client.getChildren(zkPath);
+    if (children != null && children.size() > 0) {
+      new File(fsPath).mkdirs();
+      for (String child : children) {
+        String childPath = zkPath.equals("/") ? "/" + child : zkPath + "/" + child;
+        download(childPath, fsPath + "/" + child);
+      }
+    } else {
+      System.out
+          .println("Saving " + zkPath + " to " + new File(fsPath + suffix).getCanonicalPath());
+      OutputStream out = new FileOutputStream(fsPath + suffix);
+      Object readData = client.readData(zkPath);
+      if (readData != null) {
+        out.write((byte[]) readData);
+      }
+      out.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/67620753/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKLogFormatter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKLogFormatter.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKLogFormatter.java
new file mode 100644
index 0000000..0ed2db1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKLogFormatter.java
@@ -0,0 +1,341 @@
+package org.apache.helix.tools.commandtools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.io.BufferedWriter;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import javax.xml.bind.annotation.adapters.HexBinaryAdapter;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.persistence.FileHeader;
+import org.apache.zookeeper.server.persistence.FileSnap;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+
+public class ZKLogFormatter {
+  private static final Logger LOG = LoggerFactory.getLogger(ZKLogFormatter.class);
+  private static DateFormat dateTimeInstance = DateFormat.getDateTimeInstance(DateFormat.SHORT,
+      DateFormat.LONG);
+  private static HexBinaryAdapter adapter = new HexBinaryAdapter();
+  private static String fieldDelim = ":";
+  private static String fieldSep = " ";
+
+  static BufferedWriter bw = null;
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length != 2 && args.length != 3) {
+      System.err.println("USAGE: LogFormatter <log|snapshot> log_file");
+      System.exit(2);
+    }
+
+    if (args.length == 3) {
+      bw = new BufferedWriter(new FileWriter(new File(args[2])));
+    }
+
+    if (args[0].equals("log")) {
+      readTransactionLog(args[1]);
+    } else if (args[0].equals("snapshot")) {
+      readSnapshotLog(args[1]);
+    }
+
+    if (bw != null) {
+      bw.close();
+    }
+  }
+
+  private static void readSnapshotLog(String snapshotPath) throws Exception {
+    FileInputStream fis = new FileInputStream(snapshotPath);
+    BinaryInputArchive ia = BinaryInputArchive.getArchive(fis);
+    Map<Long, Integer> sessions = new HashMap<Long, Integer>();
+    DataTree dt = new DataTree();
+    FileHeader header = new FileHeader();
+    header.deserialize(ia, "fileheader");
+    if (header.getMagic() != FileSnap.SNAP_MAGIC) {
+      throw new IOException("mismatching magic headers " + header.getMagic() + " !=  "
+          + FileSnap.SNAP_MAGIC);
+    }
+    SerializeUtils.deserializeSnapshot(dt, ia, sessions);
+
+    if (bw != null) {
+      bw.write(sessions.toString());
+      bw.newLine();
+    } else {
+      System.out.println(sessions);
+    }
+    traverse(dt, 1, "/");
+
+  }
+
+  /*
+   * Level order traversal
+   */
+  private static void traverse(DataTree dt, int startId, String startPath) throws Exception {
+    LinkedList<Pair> queue = new LinkedList<Pair>();
+    queue.add(new Pair(startPath, startId));
+    while (!queue.isEmpty()) {
+      Pair pair = queue.removeFirst();
+      String path = pair._path;
+      DataNode head = dt.getNode(path);
+      Stat stat = new Stat();
+      byte[] data = null;
+      try {
+        data = dt.getData(path, stat, null);
+      } catch (NoNodeException e) {
+        e.printStackTrace();
+      }
+      // print the node
+      format(startId, pair, head, data);
+      Set<String> children = head.getChildren();
+      if (children != null) {
+        for (String child : children) {
+          String childPath;
+          if (path.endsWith("/")) {
+            childPath = path + child;
+          } else {
+            childPath = path + "/" + child;
+          }
+          queue.add(new Pair(childPath, startId));
+        }
+      }
+      startId = startId + 1;
+    }
+
+  }
+
+  static class Pair {
+
+    private final String _path;
+    private final int _parentId;
+
+    public Pair(String path, int parentId) {
+      _path = path;
+      _parentId = parentId;
+    }
+
+  }
+
+  private static void format(int id, Pair pair, DataNode head, byte[] data) throws Exception {
+    String dataStr = "";
+    if (data != null) {
+      dataStr = new String(data).replaceAll("[\\s]+", "");
+    }
+    StringBuffer sb = new StringBuffer();
+    // @formatter:off
+    sb.append("id").append(fieldDelim).append(id).append(fieldSep);
+    sb.append("parent").append(fieldDelim).append(pair._parentId).append(fieldSep);
+    sb.append("path").append(fieldDelim).append(pair._path).append(fieldSep);
+    sb.append("session").append(fieldDelim)
+        .append("0x" + Long.toHexString(head.stat.getEphemeralOwner())).append(fieldSep);
+    sb.append("czxid").append(fieldDelim).append("0x" + Long.toHexString(head.stat.getCzxid()))
+        .append(fieldSep);
+    sb.append("ctime").append(fieldDelim).append(head.stat.getCtime()).append(fieldSep);
+    sb.append("mtime").append(fieldDelim).append(head.stat.getMtime()).append(fieldSep);
+    sb.append("cmzxid").append(fieldDelim).append("0x" + Long.toHexString(head.stat.getMzxid()))
+        .append(fieldSep);
+    sb.append("pzxid").append(fieldDelim).append("0x" + Long.toHexString(head.stat.getPzxid()))
+        .append(fieldSep);
+    sb.append("aversion").append(fieldDelim).append(head.stat.getAversion()).append(fieldSep);
+    sb.append("cversion").append(fieldDelim).append(head.stat.getCversion()).append(fieldSep);
+    sb.append("version").append(fieldDelim).append(head.stat.getVersion()).append(fieldSep);
+    sb.append("data").append(fieldDelim).append(dataStr).append(fieldSep);
+    // @formatter:on
+
+    if (bw != null) {
+      bw.write(sb.toString());
+      bw.newLine();
+    } else {
+      System.out.println(sb);
+    }
+
+  }
+
+  private static void readTransactionLog(String logfilepath) throws FileNotFoundException,
+      IOException, EOFException {
+    FileInputStream fis = new FileInputStream(logfilepath);
+    BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
+    FileHeader fhdr = new FileHeader();
+    fhdr.deserialize(logStream, "fileheader");
+
+    if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
+      System.err.println("Invalid magic number for " + logfilepath);
+      System.exit(2);
+    }
+
+    if (bw != null) {
+      bw.write("ZooKeeper Transactional Log File with dbid " + fhdr.getDbid()
+          + " txnlog format version " + fhdr.getVersion());
+      bw.newLine();
+    } else {
+      System.out.println("ZooKeeper Transactional Log File with dbid " + fhdr.getDbid()
+          + " txnlog format version " + fhdr.getVersion());
+    }
+
+    int count = 0;
+    while (true) {
+      long crcValue;
+      byte[] bytes;
+      try {
+        crcValue = logStream.readLong("crcvalue");
+
+        bytes = logStream.readBuffer("txnEntry");
+      } catch (EOFException e) {
+        if (bw != null) {
+          bw.write("EOF reached after " + count + " txns.");
+          bw.newLine();
+        } else {
+          System.out.println("EOF reached after " + count + " txns.");
+        }
+
+        break;
+      }
+      if (bytes.length == 0) {
+        // Since we preallocate, we define EOF to be an
+        // empty transaction
+        if (bw != null) {
+          bw.write("EOF reached after " + count + " txns.");
+          bw.newLine();
+        } else {
+          System.out.println("EOF reached after " + count + " txns.");
+        }
+
+        return;
+      }
+      Checksum crc = new Adler32();
+      crc.update(bytes, 0, bytes.length);
+      if (crcValue != crc.getValue()) {
+        throw new IOException("CRC doesn't match " + crcValue + " vs " + crc.getValue());
+      }
+      TxnHeader hdr = new TxnHeader();
+      Record txn = SerializeUtils.deserializeTxn(bytes, hdr);
+      if (bw != null) {
+        bw.write(formatTransaction(hdr, txn));
+        bw.newLine();
+      } else {
+        System.out.println(formatTransaction(hdr, txn));
+      }
+
+      if (logStream.readByte("EOR") != 'B') {
+        LOG.error("Last transaction was partial.");
+        throw new EOFException("Last transaction was partial.");
+      }
+      count++;
+    }
+  }
+
+  static String op2String(int op) {
+    switch (op) {
+    case OpCode.notification:
+      return "notification";
+    case OpCode.create:
+      return "create";
+    case OpCode.delete:
+      return "delete";
+    case OpCode.exists:
+      return "exists";
+    case OpCode.getData:
+      return "getDate";
+    case OpCode.setData:
+      return "setData";
+    case OpCode.getACL:
+      return "getACL";
+    case OpCode.setACL:
+      return "setACL";
+    case OpCode.getChildren:
+      return "getChildren";
+    case OpCode.getChildren2:
+      return "getChildren2";
+    case OpCode.ping:
+      return "ping";
+    case OpCode.createSession:
+      return "createSession";
+    case OpCode.closeSession:
+      return "closeSession";
+    case OpCode.error:
+      return "error";
+    default:
+      return "unknown " + op;
+    }
+  }
+
+  private static String formatTransaction(TxnHeader header, Record txn) {
+    StringBuilder sb = new StringBuilder();
+
+    sb.append("time").append(fieldDelim).append(header.getTime());
+    sb.append(fieldSep).append("session").append(fieldDelim).append("0x")
+        .append(Long.toHexString(header.getClientId()));
+    sb.append(fieldSep).append("cxid").append(fieldDelim).append("0x")
+        .append(Long.toHexString(header.getCxid()));
+    sb.append(fieldSep).append("zxid").append(fieldDelim).append("0x")
+        .append(Long.toHexString(header.getZxid()));
+    sb.append(fieldSep).append("type").append(fieldDelim).append(op2String(header.getType()));
+    if (txn != null) {
+      try {
+        byte[] data = null;
+        for (PropertyDescriptor pd : Introspector.getBeanInfo(txn.getClass())
+            .getPropertyDescriptors()) {
+          if (pd.getName().equalsIgnoreCase("data")) {
+            data = (byte[]) pd.getReadMethod().invoke(txn);
+            continue;
+          }
+          if (pd.getReadMethod() != null && !"class".equals(pd.getName())) {
+            sb.append(fieldSep).append(pd.getDisplayName()).append(fieldDelim)
+                .append(pd.getReadMethod().invoke(txn).toString().replaceAll("[\\s]+", ""));
+          }
+        }
+        if (data != null) {
+          sb.append(fieldSep).append("data").append(fieldDelim)
+              .append(new String(data).replaceAll("[\\s]+", ""));
+        }
+      } catch (Exception e) {
+        LOG.error("Error while retrieving bean property values for " + txn.getClass(), e);
+      }
+    }
+
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/67620753/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
new file mode 100644
index 0000000..5bd955a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
@@ -0,0 +1,253 @@
+package org.apache.helix.tools.commandtools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.manager.zk.ByteArraySerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Tool for copying a zk/file path to another zk/file path
+ */
+public class ZkCopy {
+  enum ZkCopyScheme {
+    zk
+  }
+
+  private static Logger logger = LoggerFactory.getLogger(ZkCopy.class);
+  private static final String src = "src";
+  private static final String dst = "dst";
+
+  @SuppressWarnings("static-access")
+  private static Options constructCmdLineOpt() {
+    Option srcOpt =
+        OptionBuilder.withLongOpt(src).hasArgs(1).isRequired(true)
+            .withArgName("source-URI (e.g. zk://localhost:2181/src-path")
+            .withDescription("Provide source URI").create();
+
+    Option dstOpt =
+        OptionBuilder.withLongOpt(dst).hasArgs(1).isRequired(true)
+            .withArgName("destination-URI (e.g. zk://localhost:2181/dst-path")
+            .withDescription("Provide destination URI").create();
+
+    Options options = new Options();
+    options.addOption(srcOpt);
+    options.addOption(dstOpt);
+    return options;
+  }
+
+  private static void printUsage(Options cliOptions) {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + ZkCopy.class.getName(), cliOptions);
+  }
+
+  private static String concatenate(String path, String suffix) {
+    if (suffix == null || suffix.isEmpty()) {
+      return path;
+    }
+
+    if (path.endsWith("/") || suffix.startsWith("/")) {
+      return path + suffix;
+    } else {
+      return path + "/" + suffix;
+    }
+  }
+
+  /**
+   * Copy a list of paths from src to dst
+   * @param srcClient
+   * @param srcRootPath
+   * @param dstClient
+   * @param dstRootPath
+   * @param paths
+   */
+  private static void copy(ZkClient srcClient, String srcRootPath, ZkClient dstClient,
+      String dstRootPath, List<String> paths) {
+    BaseDataAccessor<Object> srcAccessor = new ZkBaseDataAccessor<Object>(srcClient);
+    List<String> readPaths = new ArrayList<String>();
+    for (String path : paths) {
+      readPaths.add(concatenate(srcRootPath, path));
+    }
+    List<Stat> stats = new ArrayList<Stat>();
+    List<Object> readData = srcAccessor.get(readPaths, stats, 0);
+
+    List<String> writePaths = new ArrayList<String>();
+    List<Object> writeData = new ArrayList<Object>();
+    for (int i = 0; i < paths.size(); i++) {
+      if (stats.get(i).getEphemeralOwner() != 0) {
+        logger.warn("Skip copying ephemeral znode: " + readPaths.get(i));
+        continue;
+      }
+
+      writePaths.add(concatenate(dstRootPath, paths.get(i)));
+      writeData.add(readData.get(i));
+    }
+
+    if (writePaths.size() > 0) {
+      BaseDataAccessor<Object> dstAccessor = new ZkBaseDataAccessor<Object>(dstClient);
+      boolean[] success =
+          dstAccessor.createChildren(writePaths, writeData, AccessOption.PERSISTENT);
+      List<String> successPaths = new ArrayList<String>();
+      List<String> failPaths = new ArrayList<String>();
+      for (int i = 0; i < success.length; i++) {
+        if (success[i]) {
+          successPaths.add(writePaths.get(i));
+        } else {
+          failPaths.add(writePaths.get(i));
+        }
+      }
+
+      // Print
+      if (!successPaths.isEmpty()) {
+        System.out.println("Copy " + successPaths);
+      }
+
+      if (!failPaths.isEmpty()) {
+        System.out.println("Skip " + failPaths);
+      }
+    }
+  }
+
+  private static void zkCopy(ZkClient srcClient, String srcRootPath, ZkClient dstClient, String dstRootPath) {
+    // Strip off tailing "/"
+    if (!srcRootPath.equals("/") && srcRootPath.endsWith("/")) {
+      srcRootPath = srcRootPath.substring(0, srcRootPath.length() - 1);
+    }
+
+    if (!dstRootPath.equals("/") && dstRootPath.endsWith("/")) {
+      dstRootPath = dstRootPath.substring(0, dstRootPath.length() - 1);
+    }
+
+    // Validate paths
+    PathUtils.validatePath(srcRootPath);
+    PathUtils.validatePath(dstRootPath);
+
+    if (srcRootPath.equals(dstRootPath)) {
+      logger.info("srcPath == dstPath. Skip copying");
+      return;
+    }
+
+    if (srcRootPath.startsWith(dstRootPath) || dstRootPath.startsWith(srcRootPath)) {
+      throw new IllegalArgumentException(
+          "srcPath/dstPath can't be prefix of dstPath/srcPath, was srcPath: " + srcRootPath
+              + ", dstPath: " + dstRootPath);
+    }
+
+    // Recursive copy using BFS
+    List<String> queue = new LinkedList<String>();
+    String root = "";
+    copy(srcClient, srcRootPath, dstClient, dstRootPath, Arrays.asList(root));
+
+    queue.add(root);
+    while (!queue.isEmpty()) {
+      String path = queue.remove(0);
+      String fromPath = concatenate(srcRootPath, path);
+
+      List<String> children = srcClient.getChildren(fromPath);
+      List<String> paths = new ArrayList<String>();
+      if (children != null && children.size() > 0) {
+        for (String child : children) {
+          String childPath = concatenate(path, child);
+          paths.add(childPath);
+        }
+        copy(srcClient, srcRootPath, dstClient, dstRootPath, paths);
+        queue.addAll(paths);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCmdLineOpt();
+    CommandLine cmd = null;
+
+    try {
+      cmd = cliParser.parse(cliOptions, args);
+    } catch (ParseException pe) {
+      System.err.println("CommandLineClient: failed to parse command-line options: "
+          + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+
+    URI srcUri = new URI(cmd.getOptionValue(src));
+    URI dstUri = new URI(cmd.getOptionValue(dst));
+
+    ZkCopyScheme srcScheme = ZkCopyScheme.valueOf(srcUri.getScheme());
+    ZkCopyScheme dstScheme = ZkCopyScheme.valueOf(dstUri.getScheme());
+
+    if (srcScheme == ZkCopyScheme.zk && dstScheme == ZkCopyScheme.zk) {
+      String srcZkAddr = srcUri.getAuthority();
+      String dstZkAddr = dstUri.getAuthority();
+
+      ZkClient srcClient = null;
+      ZkClient dstClient = null;
+      try {
+        if (srcZkAddr.equals(dstZkAddr)) {
+          srcClient =
+              dstClient =
+                  new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                      ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
+        } else {
+          srcClient =
+              new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                  ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
+          dstClient =
+              new ZkClient(dstZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                  ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
+        }
+        String srcPath = srcUri.getPath();
+        String dstPath = dstUri.getPath();
+        zkCopy(srcClient, srcPath, dstClient, dstPath);
+      } finally {
+        if (srcClient != null) {
+          srcClient.close();
+        }
+        if (dstClient != null) {
+          dstClient.close();
+        }
+      }
+    } else {
+      System.err.println("Unsupported scheme. srcScheme: " + srcScheme + ", dstScheme: " + dstScheme);
+      System.exit(1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/67620753/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkGrep.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkGrep.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkGrep.java
new file mode 100644
index 0000000..b42bdd2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkGrep.java
@@ -0,0 +1,642 @@
+package org.apache.helix.tools.commandtools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * utility for grep zk transaction/snapshot logs
+ * - to grep a pattern by t1 use:
+ * zkgrep --zkCfg zkCfg --by t1 --pattern patterns...
+ * - to grep a pattern between t1 and t2 use:
+ * zkgrep --zkCfg zkCfg --between t1 t2 --pattern patterns...
+ * for example, to find fail-over latency between t1 and t2, use:
+ * 1) zkgrep --zkCfg zkCfg --by t1 --pattern "/{cluster}/LIVEINSTNCES/" | grep {fail-node}
+ * 2) zkgrep --zkCfg zkCfg --between t1 t2 --pattern "closeSession" | grep {fail-node session-id}
+ * 3) zkgrep --zkCfg zkCfg --between t1 t2 --pattern "/{cluster}" | grep "CURRENTSTATES" |
+ * grep "setData" | tail -1
+ * fail-over latency = timestamp difference between 2) and 3)
+ */
+public class ZkGrep {
+  private static Logger LOG = LoggerFactory.getLogger(ZkGrep.class);
+
+  private static final String zkCfg = "zkCfg";
+  private static final String pattern = "pattern";
+  private static final String by = "by";
+  private static final String between = "between";
+
+  public static final String log = "log";
+  public static final String snapshot = "snapshot";
+
+  private static final String gzSuffix = ".gz";
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions() {
+    Option zkCfgOption =
+        OptionBuilder.hasArgs(1).isRequired(false).withLongOpt(zkCfg).withArgName("zoo.cfg")
+            .withDescription("provide zoo.cfg").create();
+
+    Option patternOption =
+        OptionBuilder.hasArgs().isRequired(true).withLongOpt(pattern)
+            .withArgName("grep-patterns...").withDescription("provide patterns (required)")
+            .create();
+
+    Option betweenOption =
+        OptionBuilder.hasArgs(2).isRequired(false).withLongOpt(between)
+            .withArgName("t1 t2 (timestamp in ms or yyMMdd_hhmmss_SSS)")
+            .withDescription("grep between t1 and t2").create();
+
+    Option byOption =
+        OptionBuilder.hasArgs(1).isRequired(false).withLongOpt(by)
+            .withArgName("t (timestamp in ms or yyMMdd_hhmmss_SSS)").withDescription("grep by t")
+            .create();
+
+    OptionGroup group = new OptionGroup();
+    group.setRequired(true);
+    group.addOption(betweenOption);
+    group.addOption(byOption);
+
+    Options options = new Options();
+    options.addOption(zkCfgOption);
+    options.addOption(patternOption);
+    options.addOptionGroup(group);
+    return options;
+  }
+
+  /**
+   * get zk transaction log dir and zk snapshot log dir
+   * @param zkCfgFile
+   * @return String[0]: zk-transaction-log-dir, String[1]: zk-snapshot-dir
+   */
+  static String[] getZkDataDirs(String zkCfgFile) {
+    String[] zkDirs = new String[2];
+
+    FileInputStream fis = null;
+    BufferedReader br = null;
+    try {
+      fis = new FileInputStream(zkCfgFile);
+      br = new BufferedReader(new InputStreamReader(fis));
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        String key = "dataDir=";
+        if (line.startsWith(key)) {
+          zkDirs[1] = zkDirs[0] = line.substring(key.length()) + "/version-2";
+        }
+
+        key = "dataLogDir=";
+        if (line.startsWith(key)) {
+          zkDirs[0] = line.substring(key.length()) + "/version-2";
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("exception in read file: " + zkCfgFile, e);
+    } finally {
+      try {
+        if (br != null) {
+          br.close();
+        }
+
+        if (fis != null) {
+          fis.close();
+        }
+
+      } catch (Exception e) {
+        LOG.error("exception in closing file: " + zkCfgFile, e);
+      }
+    }
+
+    return zkDirs;
+  }
+
+  // debug
+  static void printFiles(File[] files) {
+    System.out.println("START print");
+    for (int i = 0; i < files.length; i++) {
+      File file = files[i];
+      System.out.println(file.getName() + ", " + file.lastModified());
+    }
+    System.out.println("END print");
+  }
+
+  /**
+   * get files under dir in order of last modified time
+   * @param dir
+   * @param pattern
+   * @return
+   */
+  static File[] getSortedFiles(String dirPath, final String pattern) {
+    File dir = new File(dirPath);
+    File[] files = dir.listFiles(new FileFilter() {
+
+      @Override
+      public boolean accept(File file) {
+        return file.isFile() && (file.getName().indexOf(pattern) != -1);
+      }
+    });
+
+    Arrays.sort(files, new Comparator<File>() {
+
+      @Override
+      public int compare(File o1, File o2) {
+        int sign = (int) Math.signum(o1.lastModified() - o2.lastModified());
+        return sign;
+      }
+
+    });
+    return files;
+  }
+
+  /**
+   * get value for an attribute in a parsed zk log; e.g.
+   * "time:1384984016778 session:0x14257d1d17e0004 cxid:0x5 zxid:0x46899 type:error err:-101"
+   * given "time" return "1384984016778"
+   * @param line
+   * @param attribute
+   * @return value
+   */
+  static String getAttributeValue(String line, String attribute) {
+    if (line == null) {
+      return null;
+    }
+
+    if (!attribute.endsWith(":")) {
+      attribute = attribute + ":";
+    }
+
+    String[] parts = line.split("\\s");
+    if (parts != null && parts.length > 0) {
+      for (int i = 0; i < parts.length; i++) {
+        if (parts[i].startsWith(attribute)) {
+          String val = parts[i].substring(attribute.length());
+          return val;
+        }
+      }
+    }
+    return null;
+  }
+
+  static long getTimestamp(String line) {
+    String timestamp = getAttributeValue(line, "time");
+    return Long.parseLong(timestamp);
+  }
+
+  /**
+   * parse a time string either in timestamp form or "yyMMdd_hhmmss_SSS" form
+   * @param time
+   * @return timestamp or -1 on error
+   */
+  static long parseTimeString(String time) {
+    try {
+      return Long.parseLong(time);
+    } catch (NumberFormatException e) {
+      try {
+        SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
+        Date date = formatter.parse(time);
+        return date.getTime();
+      } catch (java.text.ParseException ex) {
+        LOG.error("fail to parse time string: " + time, e);
+      }
+    }
+    return -1;
+  }
+
+  public static void grepZkLog(File zkLog, long start, long end, String... patterns) {
+    FileInputStream fis = null;
+    BufferedReader br = null;
+    try {
+      fis = new FileInputStream(zkLog);
+      br = new BufferedReader(new InputStreamReader(fis));
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        try {
+          long timestamp = getTimestamp(line);
+          if (timestamp > end) {
+            break;
+          }
+
+          if (timestamp < start) {
+            continue;
+          }
+
+          boolean match = true;
+          for (String pattern : patterns) {
+            if (line.indexOf(pattern) == -1) {
+              match = false;
+              break;
+            }
+          }
+
+          if (match) {
+            System.out.println(line);
+          }
+
+        } catch (NumberFormatException e) {
+          // ignore
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("exception in grep zk-log: " + zkLog, e);
+    } finally {
+      try {
+        if (br != null) {
+          br.close();
+        }
+
+        if (fis != null) {
+          fis.close();
+        }
+
+      } catch (Exception e) {
+        LOG.error("exception in closing zk-log: " + zkLog, e);
+      }
+    }
+  }
+
+  public static void grepZkLogDir(List<File> parsedZkLogs, long start, long end, String... patterns) {
+    for (File file : parsedZkLogs) {
+      grepZkLog(file, start, end, patterns);
+
+    }
+
+  }
+
+  public static void grepZkSnapshot(File zkSnapshot, String... patterns) {
+    FileInputStream fis = null;
+    BufferedReader br = null;
+    try {
+      fis = new FileInputStream(zkSnapshot);
+      br = new BufferedReader(new InputStreamReader(fis));
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        try {
+          boolean match = true;
+          for (String pattern : patterns) {
+            if (line.indexOf(pattern) == -1) {
+              match = false;
+              break;
+            }
+          }
+
+          if (match) {
+            System.out.println(line);
+          }
+
+        } catch (NumberFormatException e) {
+          // ignore
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("exception in grep zk-snapshot: " + zkSnapshot, e);
+    } finally {
+      try {
+        if (br != null) {
+          br.close();
+        }
+
+        if (fis != null) {
+          fis.close();
+        }
+
+      } catch (Exception e) {
+        LOG.error("exception in closing zk-snapshot: " + zkSnapshot, e);
+      }
+    }
+  }
+
+  /**
+   * guess zoo.cfg dir
+   * @return absolute path to zoo.cfg
+   */
+  static String guessZkCfgDir() {
+    // TODO impl this
+    return null;
+  }
+
+  public static void printUsage(Options cliOptions) {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + ZkGrep.class.getName(), cliOptions);
+  }
+
+  /**
+   * parse zk-transaction-logs between start and end, if not already parsed
+   * @param zkLogDir
+   * @param start
+   * @param end
+   * @return list of parsed zklogs between start and end, in order of last modified timestamp
+   */
+  static List<File> parseZkLogs(String zkLogDir, long start, long end) {
+    File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+    File[] zkLogs = getSortedFiles(zkLogDir, log);
+    // printFiles(zkDataFiles);
+    List<File> parsedZkLogs = new ArrayList<File>();
+
+    boolean stop = false;
+    for (File zkLog : zkLogs) {
+      if (stop) {
+        break;
+      }
+
+      if (zkLog.lastModified() < start) {
+        continue;
+      }
+
+      if (zkLog.lastModified() > end) {
+        stop = true;
+      }
+
+      try {
+        File parsedZkLog = new File(zkParsedDir, stripGzSuffix(zkLog.getName()) + ".parsed");
+        if (!parsedZkLog.exists() || parsedZkLog.lastModified() <= zkLog.lastModified()) {
+
+          if (zkLog.getName().endsWith(gzSuffix)) {
+            // copy and gunzip it
+            FileUtils.copyFileToDirectory(zkLog, zkParsedDir);
+            File zkLogGz = new File(zkParsedDir, zkLog.getName());
+            File tmpZkLog = gunzip(zkLogGz);
+
+            // parse gunzip file
+            ZKLogFormatter
+                .main(new String[] { log, tmpZkLog.getAbsolutePath(), parsedZkLog.getAbsolutePath()
+                });
+
+            // delete it
+            zkLogGz.delete();
+            tmpZkLog.delete();
+          } else {
+            // parse it directly
+            ZKLogFormatter.main(new String[] {
+                log, zkLog.getAbsolutePath(), parsedZkLog.getAbsolutePath()
+            });
+          }
+        }
+        parsedZkLogs.add(parsedZkLog);
+      } catch (Exception e) {
+        LOG.error("fail to parse zkLog: " + zkLog, e);
+      }
+    }
+
+    return parsedZkLogs;
+  }
+
+  /**
+   * Strip off a .gz suffix if any
+   * @param filename
+   * @return
+   */
+  static String stripGzSuffix(String filename) {
+    if (filename.endsWith(gzSuffix)) {
+      return filename.substring(0, filename.length() - gzSuffix.length());
+    }
+    return filename;
+  }
+
+  /**
+   * Gunzip a file
+   * @param zipFile
+   * @return
+   */
+  static File gunzip(File zipFile) {
+    File outputFile = new File(stripGzSuffix(zipFile.getAbsolutePath()));
+
+    byte[] buffer = new byte[1024];
+
+    try {
+
+      GZIPInputStream gzis = new GZIPInputStream(new FileInputStream(zipFile));
+      FileOutputStream out = new FileOutputStream(outputFile);
+
+      int len;
+      while ((len = gzis.read(buffer)) > 0) {
+        out.write(buffer, 0, len);
+      }
+
+      gzis.close();
+      out.close();
+
+      return outputFile;
+    } catch (IOException e) {
+      LOG.error("fail to gunzip file: " + zipFile, e);
+    }
+
+    return null;
+  }
+
+  /**
+   * parse the last zk-snapshots by by-time, if not already parsed
+   * @param zkSnapshotDir
+   * @param byTime
+   * @return File array which the first element is the last zk-snapshot by by-time and the second
+   *         element is its parsed file
+   */
+  static File[] parseZkSnapshot(String zkSnapshotDir, long byTime) {
+    File[] retFiles = new File[2];
+    File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+    File[] zkSnapshots = getSortedFiles(zkSnapshotDir, snapshot);
+    // printFiles(zkDataFiles);
+    File lastZkSnapshot = null;
+    for (int i = 0; i < zkSnapshots.length; i++) {
+      File zkSnapshot = zkSnapshots[i];
+      if (zkSnapshot.lastModified() >= byTime) {
+        break;
+      }
+      lastZkSnapshot = zkSnapshot;
+      retFiles[0] = lastZkSnapshot;
+    }
+
+    try {
+      File parsedZkSnapshot =
+          new File(zkParsedDir, stripGzSuffix(lastZkSnapshot.getName()) + ".parsed");
+      if (!parsedZkSnapshot.exists()
+          || parsedZkSnapshot.lastModified() <= lastZkSnapshot.lastModified()) {
+
+        if (lastZkSnapshot.getName().endsWith(gzSuffix)) {
+          // copy and gunzip it
+          FileUtils.copyFileToDirectory(lastZkSnapshot, zkParsedDir);
+          File lastZkSnapshotGz = new File(zkParsedDir, lastZkSnapshot.getName());
+          File tmpLastZkSnapshot = gunzip(lastZkSnapshotGz);
+
+          // parse gunzip file
+          ZKLogFormatter.main(new String[] {
+              snapshot, tmpLastZkSnapshot.getAbsolutePath(), parsedZkSnapshot.getAbsolutePath()
+          });
+
+          // delete it
+          lastZkSnapshotGz.delete();
+          tmpLastZkSnapshot.delete();
+        } else {
+          // parse it directly
+          ZKLogFormatter.main(new String[] {
+              snapshot, lastZkSnapshot.getAbsolutePath(), parsedZkSnapshot.getAbsolutePath()
+          });
+        }
+
+      }
+      retFiles[1] = parsedZkSnapshot;
+      return retFiles;
+    } catch (Exception e) {
+      LOG.error("fail to parse zkSnapshot: " + lastZkSnapshot, e);
+    }
+
+    return null;
+  }
+
+  public static void processCommandLineArgs(String[] cliArgs) {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    CommandLine cmd = null;
+
+    try {
+      cmd = cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe) {
+      System.err.println("CommandLineClient: failed to parse command-line options: " + pe);
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+
+    String zkCfgDirValue = null;
+    String zkCfgFile = null;
+
+    if (cmd.hasOption(zkCfg)) {
+      zkCfgDirValue = cmd.getOptionValue(zkCfg);
+    }
+
+    if (zkCfgDirValue == null) {
+      zkCfgDirValue = guessZkCfgDir();
+    }
+
+    if (zkCfgDirValue == null) {
+      LOG.error("couldn't figure out path to zkCfg file");
+      System.exit(1);
+    }
+
+    // get zoo.cfg path from cfg-dir
+    zkCfgFile = zkCfgDirValue;
+    if (!zkCfgFile.endsWith(".cfg")) {
+      // append with default zoo.cfg
+      zkCfgFile = zkCfgFile + "/zoo.cfg";
+    }
+
+    if (!new File(zkCfgFile).exists()) {
+      LOG.error("zoo.cfg file doen't exist: " + zkCfgFile);
+      System.exit(1);
+    }
+
+    String[] patterns = cmd.getOptionValues(pattern);
+
+    String[] zkDataDirs = getZkDataDirs(zkCfgFile);
+
+    // parse zk data files
+    if (zkDataDirs == null || zkDataDirs[0] == null || zkDataDirs[1] == null) {
+      LOG.error("invalid zkCfgDir: " + zkCfgDirValue);
+      System.exit(1);
+    }
+
+    File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+    if (!zkParsedDir.exists()) {
+      LOG.info("creating zklog-parsed dir: " + zkParsedDir.getAbsolutePath());
+      zkParsedDir.mkdir();
+    }
+
+    if (cmd.hasOption(between)) {
+      String[] timeStrings = cmd.getOptionValues(between);
+
+      long startTime = parseTimeString(timeStrings[0]);
+      if (startTime == -1) {
+        LOG.error("invalid start time string: " + timeStrings[0]
+            + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+        System.exit(1);
+      }
+
+      long endTime = parseTimeString(timeStrings[1]);
+      if (endTime == -1) {
+        LOG.error("invalid end time string: " + timeStrings[1]
+            + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+        System.exit(1);
+      }
+
+      if (startTime > endTime) {
+        LOG.warn("empty window: " + startTime + " - " + endTime);
+        System.exit(1);
+      }
+      // zkDataDirs[0] is the transaction log dir
+      List<File> parsedZkLogs = parseZkLogs(zkDataDirs[0], startTime, endTime);
+      grepZkLogDir(parsedZkLogs, startTime, endTime, patterns);
+
+    } else if (cmd.hasOption(by)) {
+      String timeString = cmd.getOptionValue(by);
+
+      long byTime = parseTimeString(timeString);
+      if (byTime == -1) {
+        LOG.error("invalid by time string: " + timeString
+            + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+        System.exit(1);
+      }
+
+      // zkDataDirs[1] is the snapshot dir
+      File[] lastZkSnapshot = parseZkSnapshot(zkDataDirs[1], byTime);
+
+      // lastZkSnapshot[1] is the parsed last snapshot by byTime
+      grepZkSnapshot(lastZkSnapshot[1], patterns);
+
+      // need to grep transaction logs between last-modified-time of snapshot and byTime also
+      // lastZkSnapshot[0] is the last snapshot by byTime
+      long startTime = lastZkSnapshot[0].lastModified();
+
+      // zkDataDirs[0] is the transaction log dir
+      List<File> parsedZkLogs = parseZkLogs(zkDataDirs[0], startTime, byTime);
+      grepZkLogDir(parsedZkLogs, startTime, byTime, patterns);
+    }
+  }
+
+  public static void main(String[] args) {
+    processCommandLineArgs(args);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/67620753/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkLogCSVFormatter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkLogCSVFormatter.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkLogCSVFormatter.java
new file mode 100644
index 0000000..d9d4f98
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkLogCSVFormatter.java
@@ -0,0 +1,320 @@
+package org.apache.helix.tools.commandtools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.util.HelixUtil;
+
+public class ZkLogCSVFormatter {
+  private static final ZNRecordSerializer _deserializer = new ZNRecordSerializer();
+  private static String _fieldDelim = ",";
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length != 2) {
+      System.err.println("USAGE: ZkLogCSVFormatter log_file output_dir");
+      System.exit(2);
+    }
+    File outputDir = new File(args[1]);
+    if (!outputDir.exists() || !outputDir.isDirectory()) {
+      System.err.println(outputDir.getAbsolutePath() + " does NOT exist or is NOT a directory");
+      System.exit(2);
+    }
+    format(args[0], args[1]);
+  }
+
+  private static void formatter(BufferedWriter bw, String... args) {
+    StringBuffer sb = new StringBuffer();
+
+    if (args.length == 0) {
+      return;
+    } else {
+      sb.append(args[0]);
+      for (int i = 1; i < args.length; i++) {
+        sb.append(_fieldDelim).append(args[i]);
+      }
+    }
+
+    try {
+      bw.write(sb.toString());
+      bw.newLine();
+      // System.out.println(sb.toString());
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  private static String getAttributeValue(String line, String attribute) {
+    String[] parts = line.split("\\s");
+    if (parts != null && parts.length > 0) {
+      for (int i = 0; i < parts.length; i++) {
+        if (parts[i].startsWith(attribute)) {
+          String val = parts[i].substring(attribute.length());
+          return val;
+        }
+      }
+    }
+    return null;
+  }
+
+  private static void format(String logfilepath, String outputDir) throws FileNotFoundException {
+    try {
+      // input file
+      FileInputStream fis = new FileInputStream(logfilepath);
+      BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+
+      // output files
+      FileOutputStream isFos = new FileOutputStream(outputDir + "/" + "idealState.csv");
+      BufferedWriter isBw = new BufferedWriter(new OutputStreamWriter(isFos));
+
+      FileOutputStream cfgFos = new FileOutputStream(outputDir + "/" + "config.csv");
+      BufferedWriter cfgBw = new BufferedWriter(new OutputStreamWriter(cfgFos));
+
+      FileOutputStream evFos = new FileOutputStream(outputDir + "/" + "externalView.csv");
+      BufferedWriter evBw = new BufferedWriter(new OutputStreamWriter(evFos));
+
+      FileOutputStream smdCntFos =
+          new FileOutputStream(outputDir + "/" + "stateModelDefStateCount.csv");
+      BufferedWriter smdCntBw = new BufferedWriter(new OutputStreamWriter(smdCntFos));
+
+      FileOutputStream smdNextFos =
+          new FileOutputStream(outputDir + "/" + "stateModelDefStateNext.csv");
+      BufferedWriter smdNextBw = new BufferedWriter(new OutputStreamWriter(smdNextFos));
+
+      FileOutputStream csFos = new FileOutputStream(outputDir + "/" + "currentState.csv");
+      BufferedWriter csBw = new BufferedWriter(new OutputStreamWriter(csFos));
+
+      FileOutputStream msgFos = new FileOutputStream(outputDir + "/" + "messages.csv");
+      BufferedWriter msgBw = new BufferedWriter(new OutputStreamWriter(msgFos));
+
+      FileOutputStream hrPerfFos =
+          new FileOutputStream(outputDir + "/" + "healthReportDefaultPerfCounters.csv");
+      BufferedWriter hrPerfBw = new BufferedWriter(new OutputStreamWriter(hrPerfFos));
+
+      FileOutputStream liFos = new FileOutputStream(outputDir + "/" + "liveInstances.csv");
+      BufferedWriter liBw = new BufferedWriter(new OutputStreamWriter(liFos));
+
+      formatter(cfgBw, "timestamp", "instanceName", "host", "port", "enabled");
+      formatter(isBw, "timestamp", "resourceName", "partitionNumber", "mode", "partition",
+          "instanceName", "priority");
+      formatter(evBw, "timestamp", "resourceName", "partition", "instanceName", "state");
+      formatter(smdCntBw, "timestamp", "stateModel", "state", "count");
+      formatter(smdNextBw, "timestamp", "stateModel", "from", "to", "next");
+      formatter(liBw, "timestamp", "instanceName", "sessionId", "Operation");
+      formatter(csBw, "timestamp", "resourceName", "partition", "instanceName", "sessionId",
+          "state");
+      formatter(msgBw, "timestamp", "resourceName", "partition", "instanceName", "sessionId",
+          "from", "to", "messageType", "messageState");
+      formatter(hrPerfBw, "timestamp", "instanceName", "availableCPUs", "averageSystemLoad",
+          "freeJvmMemory", "freePhysicalMemory", "totalJvmMemory");
+
+      Map<String, ZNRecord> liveInstanceSessionMap = new HashMap<String, ZNRecord>();
+
+      int pos;
+      String inputLine;
+      while ((inputLine = br.readLine()) != null) {
+        if (inputLine.indexOf("CONFIGS") != -1) {
+          pos = inputLine.indexOf("CONFIGS");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1) {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
+
+            formatter(cfgBw, timestamp, record.getId(), record.getSimpleField("HOST"),
+                record.getSimpleField("PORT"), record.getSimpleField("ENABLED"));
+
+          }
+        } else if (inputLine.indexOf("IDEALSTATES") != -1) {
+          pos = inputLine.indexOf("IDEALSTATES");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1) {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
+            // System.out.println("record=" + record);
+            for (String partition : record.getListFields().keySet()) {
+              List<String> preferenceList = record.getListFields().get(partition);
+              for (int i = 0; i < preferenceList.size(); i++) {
+                String instance = preferenceList.get(i);
+                formatter(isBw, timestamp, record.getId(),
+                    record.getSimpleField(IdealStateProperty.NUM_PARTITIONS.toString()),
+                    record.getSimpleField(IdealStateProperty.REBALANCE_MODE.toString()), partition,
+                    instance, Integer.toString(i));
+              }
+            }
+          }
+        } else if (inputLine.indexOf("LIVEINSTANCES") != -1) {
+          pos = inputLine.indexOf("LIVEINSTANCES");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1) {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
+            formatter(liBw, timestamp, record.getId(), record.getSimpleField("SESSION_ID"), "ADD");
+            String zkSessionId = getAttributeValue(inputLine, "session:");
+            if (zkSessionId == null) {
+              System.err.println("no zk session id associated with the adding of live instance: "
+                  + inputLine);
+            } else {
+              liveInstanceSessionMap.put(zkSessionId, record);
+            }
+          }
+
+        } else if (inputLine.indexOf("EXTERNALVIEW") != -1) {
+          pos = inputLine.indexOf("EXTERNALVIEW");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1) {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
+            // System.out.println("record=" + record);
+            for (String partition : record.getMapFields().keySet()) {
+              Map<String, String> stateMap = record.getMapFields().get(partition);
+              for (String instance : stateMap.keySet()) {
+                String state = stateMap.get(instance);
+                formatter(evBw, timestamp, record.getId(), partition, instance, state);
+              }
+            }
+          }
+        } else if (inputLine.indexOf("STATEMODELDEFS") != -1) {
+          pos = inputLine.indexOf("STATEMODELDEFS");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1) {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
+
+            for (String stateInfo : record.getMapFields().keySet()) {
+              if (stateInfo.endsWith(".meta")) {
+                Map<String, String> metaMap = record.getMapFields().get(stateInfo);
+                formatter(smdCntBw, timestamp, record.getId(),
+                    stateInfo.substring(0, stateInfo.indexOf('.')), metaMap.get("count"));
+              } else if (stateInfo.endsWith(".next")) {
+                Map<String, String> nextMap = record.getMapFields().get(stateInfo);
+                for (String destState : nextMap.keySet()) {
+                  formatter(smdNextBw, timestamp, record.getId(),
+                      stateInfo.substring(0, stateInfo.indexOf('.')), destState,
+                      nextMap.get(destState));
+                }
+              }
+            }
+          }
+        } else if (inputLine.indexOf("CURRENTSTATES") != -1) {
+          pos = inputLine.indexOf("CURRENTSTATES");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1) {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
+            // System.out.println("record=" + record);
+            for (String partition : record.getMapFields().keySet()) {
+              Map<String, String> stateMap = record.getMapFields().get(partition);
+              String path = getAttributeValue(inputLine, "path:");
+              if (path != null) {
+                String instance = HelixUtil.getInstanceNameFromPath(path);
+                formatter(csBw, timestamp, record.getId(), partition, instance,
+                    record.getSimpleField("SESSION_ID"), stateMap.get("CURRENT_STATE"));
+              }
+            }
+          }
+        } else if (inputLine.indexOf("MESSAGES") != -1) {
+          pos = inputLine.indexOf("MESSAGES");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1) {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
+
+            formatter(msgBw, timestamp, record.getSimpleField("RESOURCE_NAME"),
+                record.getSimpleField("PARTITION_NAME"), record.getSimpleField("TGT_NAME"),
+                record.getSimpleField("TGT_SESSION_ID"), record.getSimpleField("FROM_STATE"),
+                record.getSimpleField("TO_STATE"), record.getSimpleField("MSG_TYPE"),
+                record.getSimpleField("MSG_STATE"));
+          }
+
+        } else if (inputLine.indexOf("closeSession") != -1) {
+          String zkSessionId = getAttributeValue(inputLine, "session:");
+          if (zkSessionId == null) {
+            System.err.println("no zk session id associated with the closing of zk session: "
+                + inputLine);
+          } else {
+            ZNRecord record = liveInstanceSessionMap.remove(zkSessionId);
+            // System.err.println("zkSessionId:" + zkSessionId + ", record:" + record);
+            if (record != null) {
+              String timestamp = getAttributeValue(inputLine, "time:");
+              formatter(liBw, timestamp, record.getId(), record.getSimpleField("SESSION_ID"),
+                  "DELETE");
+            }
+          }
+        } else if (inputLine.indexOf("HEALTHREPORT/defaultPerfCounters") != -1) {
+          pos = inputLine.indexOf("HEALTHREPORT/defaultPerfCounters");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1) {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
+
+            String path = getAttributeValue(inputLine, "path:");
+            if (path != null) {
+              String instance = HelixUtil.getInstanceNameFromPath(path);
+              formatter(hrPerfBw, timestamp, instance, record.getSimpleField("availableCPUs"),
+                  record.getSimpleField("averageSystemLoad"),
+                  record.getSimpleField("freeJvmMemory"),
+                  record.getSimpleField("freePhysicalMemory"),
+                  record.getSimpleField("totalJvmMemory"));
+            }
+          }
+        }
+      }
+
+      br.close();
+      isBw.close();
+      cfgBw.close();
+      evBw.close();
+      smdCntBw.close();
+      smdNextBw.close();
+      csBw.close();
+      msgBw.close();
+      liBw.close();
+      hrPerfBw.close();
+    } catch (Exception e) {
+      System.err.println("Error: " + e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/67620753/helix-core/src/test/java/org/apache/helix/tools/TestZkCopy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestZkCopy.java b/helix-core/src/test/java/org/apache/helix/tools/TestZkCopy.java
index ee3d2bd..94111eb 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestZkCopy.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestZkCopy.java
@@ -26,6 +26,7 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.tools.commandtools.ZkCopy;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -50,7 +51,8 @@ public class TestZkCopy extends ZkUnitTestBase {
 
     // Copy
     String toPath = "/" + clusterName + "/to";
-    ZkCopy.main(new String[]{"--src", "zk://" + ZK_ADDR + fromPath, "--dst", "zk://" + ZK_ADDR + toPath});
+    ZkCopy.main(
+        new String[] { "--src", "zk://" + ZK_ADDR + fromPath, "--dst", "zk://" + ZK_ADDR + toPath });
 
     // Verify
     Assert.assertTrue(_gZkClient.exists(toPath));


Mime
View raw message