http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java b/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
deleted file mode 100644
index 2fd95e9..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
+++ /dev/null
@@ -1,494 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.helix.HelixException;
-import org.apache.log4j.Logger;
-
-public class ExpressionParser {
- private static Logger logger = Logger.getLogger(ExpressionParser.class);
-
- final static String opDelim = "|";
- final static String opDelimForSplit = "\\|";
- final static String argDelim = ",";
- final public static String statFieldDelim = ".";
- final static String wildcardChar = "*";
-
- // static Map<String, ExpressionOperatorType> operatorMap = new
- // HashMap<String, ExpressionOperatorType>();
-
- static Map<String, Operator> operatorMap = new HashMap<String, Operator>();
- static Map<String, Aggregator> aggregatorMap = new HashMap<String, Aggregator>();
-
- static {
-
- addOperatorEntry("EXPAND", new ExpandOperator());
- addOperatorEntry("DIVIDE", new DivideOperator());
- addOperatorEntry("SUM", new SumOperator());
- addOperatorEntry("SUMEACH", new SumEachOperator());
-
- addAggregatorEntry("ACCUMULATE", new AccumulateAggregator());
- addAggregatorEntry("DECAY", new DecayAggregator());
- addAggregatorEntry("WINDOW", new WindowAggregator());
- /*
- * addEntry("EACH", ExpressionOperatorType.EACH); addEntry("SUM",
- * ExpressionOperatorType.SUM); addEntry("DIVIDE",
- * ExpressionOperatorType.DIVIDE); addEntry("ACCUMULATE",
- * ExpressionOperatorType.ACCUMULATE);
- */
- }
-
- // static Pattern pattern = Pattern.compile("(\\{.+?\\})");
-
- private static void addOperatorEntry(String label, Operator op) {
- if (!operatorMap.containsKey(label)) {
- operatorMap.put(label, op);
- }
- logger.info("Adding operator: " + op);
- }
-
- private static void addAggregatorEntry(String label, Aggregator agg) {
- if (!aggregatorMap.containsKey(label.toUpperCase())) {
- aggregatorMap.put(label.toUpperCase(), agg);
- }
- logger.info("Adding aggregator: " + agg);
- }
-
- /*
- * private static void addEntry(String label, ExpressionOperatorType type) {
- * if (!operatorMap.containsKey(label)) { operatorMap.put(label, type); }
- * logger.info("Adding operator type: "+type); }
- */
-
- public static boolean isExpressionNested(String expression) {
- return expression.contains("(");
- }
-
- /*
- * public static Operator getOperatorType(String expression) throws Exception
- * { String op = expression.substring(0,expression.indexOf("(")); if
- * (!operatorMap.containsKey(op)) { throw new
- * Exception(op+" is not a valid op type"); } return operatorMap.get(op); }
- */
-
- public static String getInnerExpression(String expression) {
- return expression.substring(expression.indexOf("(") + 1, expression.lastIndexOf(")"));
- }
-
- /*
- * public static String[] getBaseStats(ExpressionOperatorType type, String
- * expression) throws Exception { String[] items = null; if
- * (isExpressionNested(expression)) { ExpressionOperatorType nextType =
- * getOperatorType(expression); String innerExp =
- * getInnerExpression(expression); items = getBaseStats(nextType, innerExp); }
- * else { //base class, no nesting items = expression.split(","); }
- * if (type != null && type.isBaseOp()) { //surround items with type. for (int
- * i=0; i<items.length; i++) { items[i] = type + "(" + items[i] + ")"; //!!!!
- * NEED type to behave like string here
- * logger.debug("Forming item "+items[i]); } } return items; }
- * public static String[] getBaseStats(String expression) throws Exception {
- * expression = expression.replaceAll("\\s+", ""); return getBaseStats(null,
- * expression); }
- */
-
- /*
- * Validate 2 sets of parenthesis exist, all before first opDelim
- * extract agg type and validate it exists. validate number of args passed in
- */
- public static void validateAggregatorFormat(String expression) throws HelixException {
- logger.debug("validating aggregator for expression: " + expression);
- // have 0 or more args, 1 or more stats...e.g. ()(x) or (2)(x,y)
- Pattern pattern = Pattern.compile("\\(.*?\\)");
- Matcher matcher = pattern.matcher(expression);
- String aggComponent = null;
- String statComponent = null;
- int lastMatchEnd = -1;
- if (matcher.find()) {
- aggComponent = matcher.group();
- aggComponent = aggComponent.substring(1, aggComponent.length() - 1);
- if (aggComponent.contains(")") || aggComponent.contains("(")) {
- throw new HelixException(expression + " has invalid aggregate component");
- }
- } else {
- throw new HelixException(expression + " has invalid aggregate component");
- }
- if (matcher.find()) {
- statComponent = matcher.group();
- statComponent = statComponent.substring(1, statComponent.length() - 1);
- // statComponent must have at least 1 arg between paren
- if (statComponent.contains(")") || statComponent.contains("(") || statComponent.length() == 0) {
- throw new HelixException(expression + " has invalid stat component");
- }
- lastMatchEnd = matcher.end();
- } else {
- throw new HelixException(expression + " has invalid stat component");
- }
- if (matcher.find()) {
- throw new HelixException(expression + " has too many parenthesis components");
- }
-
- if (expression.length() >= lastMatchEnd + 1) { // lastMatchEnd is pos 1 past the pattern. check
- // if there are paren there
- if (expression.substring(lastMatchEnd).contains("(")
- || expression.substring(lastMatchEnd).contains(")")) {
- throw new HelixException(expression + " has extra parenthesis");
- }
- }
-
- // check wildcard locations. each part can have at most 1 wildcard, and must
- // be at end
- // String expStatNamePart = expression.substring(expression.)
- StringTokenizer fieldTok = new StringTokenizer(statComponent, statFieldDelim);
- while (fieldTok.hasMoreTokens()) {
- String currTok = fieldTok.nextToken();
- if (currTok.contains(wildcardChar)) {
- if (currTok.indexOf(wildcardChar) != currTok.length() - 1
- || currTok.lastIndexOf(wildcardChar) != currTok.length() - 1) {
- throw new HelixException(currTok
- + " is illegal stat name. Single wildcard must appear at end.");
- }
- }
- }
- }
-
- public static boolean statContainsWildcards(String stat) {
- return stat.contains(wildcardChar);
- }
-
- /*
- * Return true if stat name matches exactly...incomingStat has no agg type
- * currentStat can have any
- * Function can match for 2 cases extractStatFromAgg=false. Match
- * accumulate()(dbFoo.partition10.latency) with
- * accumulate()(dbFoo.partition10.latency)...trival extractStatFromAgg=true.
- * Match accumulate()(dbFoo.partition10.latency) with
- * dbFoo.partition10.latency
- */
- public static boolean isExactMatch(String currentStat, String incomingStat,
- boolean extractStatFromAgg) {
- String currentStatName = currentStat;
- if (extractStatFromAgg) {
- currentStatName = getSingleAggregatorStat(currentStat);
- }
- return (incomingStat.equals(currentStatName));
- }
-
- /*
- * Return true if incomingStat matches wildcardStat except currentStat has 1+
- * fields with "*" a*.c* matches a5.c7 a*.c* does not match a5.b6.c7
- * Function can match for 2 cases extractStatFromAgg=false. Match
- * accumulate()(dbFoo.partition*.latency) with
- * accumulate()(dbFoo.partition10.latency) extractStatFromAgg=true. Match
- * accumulate()(dbFoo.partition*.latency) with dbFoo.partition10.latency
- */
- public static boolean isWildcardMatch(String currentStat, String incomingStat,
- boolean statCompareOnly, ArrayList<String> bindings) {
- if (!statCompareOnly) { // need to check for match on agg type and stat
- String currentStatAggType = (currentStat.split("\\)"))[0];
- String incomingStatAggType = (incomingStat.split("\\)"))[0];
- if (!currentStatAggType.equals(incomingStatAggType)) {
- return false;
- }
- }
- // now just get the stats
- String currentStatName = getSingleAggregatorStat(currentStat);
- String incomingStatName = getSingleAggregatorStat(incomingStat);
-
- if (!currentStatName.contains(wildcardChar)) { // no wildcards in stat name
- return false;
- }
-
- String currentStatNamePattern = currentStatName.replace(".", "\\.");
- currentStatNamePattern = currentStatNamePattern.replace("*", ".*");
- boolean result = Pattern.matches(currentStatNamePattern, incomingStatName);
- if (result && bindings != null) {
- bindings.add(incomingStatName);
- }
- return result;
- /*
- * StringTokenizer currentStatTok = new StringTokenizer(currentStatName,
- * statFieldDelim);
- * StringTokenizer incomingStatTok = new StringTokenizer(incomingStatName,
- * statFieldDelim);
- * if (currentStatTok.countTokens() != incomingStatTok.countTokens())
- * { // stat names different numbers of fields
- * return false;
- * }
- * // for each token, if not wildcarded, must be an exact match
- * while (currentStatTok.hasMoreTokens())
- * {
- * String currTok = currentStatTok.nextToken();
- * String incomingTok = incomingStatTok.nextToken();
- * logger.debug("curTok: " + currTok);
- * logger.debug("incomingTok: " + incomingTok);
- * if (!currTok.contains(wildcardChar))
- * { // no wildcard, but have exact match
- * if (!currTok.equals(incomingTok))
- * { // not exact match
- * return false;
- * }
- * }
- * else
- * { // currTok has a wildcard
- * if (currTok.indexOf(wildcardChar) != currTok.length() - 1
- * || currTok.lastIndexOf(wildcardChar) != currTok.length() - 1)
- * {
- * throw new HelixException(currTok
- * + " is illegal stat name. Single wildcard must appear at end.");
- * }
- * // for wildcard matching, need to escape parentheses on currTok, so
- * // regex works
- * // currTok = currTok.replace("(", "\\(");
- * // currTok = currTok.replace(")", "\\)");
- * // incomingTok = incomingTok.replace("(", "\\(");
- * // incomingTok = incomingTok.replace(")", "\\)");
- * String currTokPreWildcard = currTok.substring(0, currTok.length() - 1);
- * // TODO: if current token has a "(" in it, pattern compiling throws
- * // error
- * // Pattern pattern = Pattern.compile(currTokPreWildcard+".+"); //form
- * // pattern...wildcard part can be anything
- * // Matcher matcher = pattern.matcher(incomingTok); //see if incomingTok
- * // matches
- * if (incomingTok.indexOf(currTokPreWildcard) != 0)
- * {
- * // if (!matcher.find()) { //no match on one tok, return false
- * return false;
- * }
- * // get the binding
- * if (bindings != null)
- * {
- * // TODO: debug me!
- * String wildcardBinding = incomingTok.substring(incomingTok
- * .indexOf(currTokPreWildcard) + currTokPreWildcard.length());
- * bindings.add(wildcardBinding);
- * }
- * }
- * }
- * // all fields match or wildcard match...return true!
- * return true;
- */
- }
-
- /*
- * For checking if an incoming stat (no agg type defined) matches a persisted
- * stat (with agg type defined)
- */
- public static boolean isIncomingStatExactMatch(String currentStat, String incomingStat) {
- return isExactMatch(currentStat, incomingStat, true);
- }
-
- /*
- * For checking if an incoming stat (no agg type defined) wildcard matches a
- * persisted stat (with agg type defined) The persisted stat may have
- * wildcards
- */
- public static boolean isIncomingStatWildcardMatch(String currentStat, String incomingStat) {
- return isWildcardMatch(currentStat, incomingStat, true, null);
- }
-
- /*
- * For checking if a persisted stat matches a stat defined in an alert
- */
- public static boolean isAlertStatExactMatch(String alertStat, String currentStat) {
- return isExactMatch(alertStat, currentStat, false);
- }
-
- /*
- * For checking if a maintained stat wildcard matches a stat defined in an
- * alert. The alert may have wildcards
- */
- public static boolean isAlertStatWildcardMatch(String alertStat, String currentStat,
- ArrayList<String> wildcardBindings) {
- return isWildcardMatch(alertStat, currentStat, false, wildcardBindings);
- }
-
- public static Aggregator getAggregator(String aggStr) throws HelixException {
- aggStr = aggStr.toUpperCase();
- Aggregator agg = aggregatorMap.get(aggStr);
- if (agg == null) {
- throw new HelixException("Unknown aggregator type " + aggStr);
- }
- return agg;
- }
-
- public static String getAggregatorStr(String expression) throws HelixException {
- if (!expression.contains("(")) {
- throw new HelixException(expression
- + " does not contain a valid aggregator. No parentheses found");
- }
- String aggName = expression.substring(0, expression.indexOf("("));
- if (!aggregatorMap.containsKey(aggName.toUpperCase())) {
- throw new HelixException("aggregator <" + aggName + "> is unknown type");
- }
- return aggName;
- }
-
- public static String[] getAggregatorArgs(String expression) throws HelixException {
- String aggregator = getAggregatorStr(expression);
- String argsStr = getAggregatorArgsStr(expression);
- String[] args = argsStr.split(argDelim);
- logger.debug("args size: " + args.length);
- int numArgs = (argsStr.length() == 0) ? 0 : args.length;
- // String[] argList = (expression.substring(expression.indexOf("(")+1,
- // expression.indexOf(")"))).split(argDelim);
- // verify correct number of args
- int requiredNumArgs = aggregatorMap.get(aggregator.toUpperCase()).getRequiredNumArgs();
- if (numArgs != requiredNumArgs) {
- throw new HelixException(expression + " contains " + args.length
- + " arguments, but requires " + requiredNumArgs);
- }
- return args;
- }
-
- /*
- * public static String[] getAggregatorArgsList(String expression) { String
- * argsStr = getAggregatorArgsStr(expression); String[] args =
- * argsStr.split(argDelim); return args; }
- */
-
- public static String getAggregatorArgsStr(String expression) {
- return expression.substring(expression.indexOf("(") + 1, expression.indexOf(")"));
- }
-
- public static String[] getAggregatorStats(String expression) throws HelixException {
- String justStats = expression;
- if (expression.contains("(") && expression.contains(")")) {
- justStats =
- (expression.substring(expression.lastIndexOf("(") + 1, expression.lastIndexOf(")")));
- }
- String[] statList = justStats.split(argDelim);
- if (statList.length < 1) {
- throw new HelixException(expression + " does not contain any aggregator stats");
- }
- return statList;
- }
-
- public static String getSingleAggregatorStat(String expression) throws HelixException {
- String[] stats = getAggregatorStats(expression);
- if (stats.length > 1) {
- throw new HelixException(expression + " contains more than 1 stat");
- }
- return stats[0];
- }
-
- public static String getWildcardStatSubstitution(String wildcardStat, String fixedStat) {
- int lastOpenParenLoc = wildcardStat.lastIndexOf("(");
- int lastCloseParenLoc = wildcardStat.lastIndexOf(")");
- StringBuilder builder = new StringBuilder();
- builder.append(wildcardStat.substring(0, lastOpenParenLoc + 1));
- builder.append(fixedStat);
- builder.append(")");
- logger.debug("wildcardStat: " + wildcardStat);
- logger.debug("fixedStat: " + fixedStat);
- logger.debug("subbedStat: " + builder.toString());
- return builder.toString();
- }
-
- // XXX: each op type should have number of inputs, number of outputs. do
- // validation.
- // (dbFoo.partition*.latency, dbFoo.partition*.count)|EACH|ACCUMULATE|DIVIDE
- public static String[] getBaseStats(String expression) throws HelixException {
- expression = expression.replaceAll("\\s+", "");
- validateAggregatorFormat(expression);
-
- String aggName = getAggregatorStr(expression);
- String[] aggArgs = getAggregatorArgs(expression);
- String[] aggStats = getAggregatorStats(expression);
-
- // form aggArgs
- String aggArgList = getAggregatorArgsStr(expression);
-
- String[] baseStats = new String[aggStats.length];
- for (int i = 0; i < aggStats.length; i++) {
- StringBuilder stat = new StringBuilder();
- stat.append(aggName);
- stat.append("(");
- stat.append(aggArgList);
- stat.append(")");
- stat.append("(");
- stat.append(aggStats[i]);
- stat.append(")");
- baseStats[i] = stat.toString();
- }
- return baseStats;
- }
-
- public static String[] getOperators(String expression) throws HelixException {
- String[] ops = null;
- int numAggStats = (getAggregatorStats(expression)).length;
- int opDelimLoc = expression.indexOf(opDelim);
- if (opDelimLoc < 0) {
- return null;
- }
- logger.debug("ops str: " + expression.substring(opDelimLoc + 1));
- ops = expression.substring(opDelimLoc + 1).split(opDelimForSplit);
-
- // validate this string of ops
- // verify each op exists
- // take num input tuples sets and verify ops will output exactly 1 tuple
- // sets
- int currNumTuples = numAggStats;
- for (String op : ops) {
- logger.debug("op: " + op);
- if (!operatorMap.containsKey(op.toUpperCase())) {
- throw new HelixException("<" + op + "> is not a valid operator type");
- }
- Operator currOpType = operatorMap.get(op.toUpperCase());
- if (currNumTuples < currOpType.minInputTupleLists
- || currNumTuples > currOpType.maxInputTupleLists) {
- throw new HelixException("<" + op + "> cannot process " + currNumTuples + " input tuples");
- }
- // reset num tuples to this op's output size
- if (!currOpType.inputOutputTupleListsCountsEqual) { // if equal, this number does not change
- currNumTuples = currOpType.numOutputTupleLists;
- }
- }
- if (currNumTuples != 1) {
- throw new HelixException(expression + " does not terminate in a single tuple set");
- }
- return ops;
- }
-
- public static void validateOperators(String expression) throws HelixException {
- getOperators(expression);
- }
-
- public static Operator getOperator(String opName) throws HelixException {
- if (!operatorMap.containsKey(opName)) {
- throw new HelixException(opName + " is unknown op type");
- }
- return operatorMap.get(opName);
- }
-
- public static void validateExpression(String expression) throws HelixException {
- // 1. extract stats part and validate
- validateAggregatorFormat(expression);
- // 2. extract ops part and validate the ops exist and the inputs/outputs are
- // correct
- validateOperators(expression);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java b/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
deleted file mode 100644
index 0e9c8f1..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-
-public class GreaterAlertComparator extends AlertComparator {
-
- @Override
- /*
- * Returns true if any element left tuple exceeds any element in right tuple
- */
- public boolean evaluate(Tuple<String> leftTup, Tuple<String> rightTup) {
- Iterator<String> leftIter = leftTup.iterator();
- while (leftIter.hasNext()) {
- double leftVal = Double.parseDouble(leftIter.next());
- Iterator<String> rightIter = rightTup.iterator();
- while (rightIter.hasNext()) {
- double rightVal = Double.parseDouble(rightIter.next());
- if (leftVal > rightVal) {
- return true;
- }
- }
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
deleted file mode 100644
index 74a4688..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class MultiplyOperator extends Operator {
-
- public MultiplyOperator() {
- minInputTupleLists = 1;
- maxInputTupleLists = Integer.MAX_VALUE;
- inputOutputTupleListsCountsEqual = false;
- numOutputTupleLists = 1;
- }
-
- public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) {
- List out = new ArrayList();
- out.add(input.iterator());
- return out;
- }
-
- @Override
- public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
- ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
- if (input == null || input.size() == 0) {
- return singleSetToIter(output);
- }
- while (true) { // loop through set of iters, return when 1 runs out (not completing the row in
- // progress)
- Tuple<String> rowProduct = null;
- for (Iterator<Tuple<String>> it : input) {
- if (!it.hasNext()) { // when any iterator runs out, we are done
- return singleSetToIter(output);
- }
- rowProduct = multiplyTuples(rowProduct, it.next());
- }
- output.add(rowProduct);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Operator.java b/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
deleted file mode 100644
index 0612cf3..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-import java.util.List;
-
-public abstract class Operator {
-
- public int minInputTupleLists;
- public int maxInputTupleLists;
- public int numOutputTupleLists = -1;
- public boolean inputOutputTupleListsCountsEqual = false;
-
- public Operator() {
-
- }
-
- public Tuple<String> multiplyTuples(Tuple<String> tup1, Tuple<String> tup2) {
- if (tup1 == null) {
- return tup2;
- }
- if (tup2 == null) {
- return tup1;
- }
- Tuple<String> outputTup = new Tuple<String>();
-
- // sum staggers if the tuples are same length
- // e.g. 1,2,3 + 4,5 = 1,6,8
- // so this is a bit tricky
- Tuple<String> largerTup;
- Tuple<String> smallerTup;
- if (tup1.size() >= tup2.size()) {
- largerTup = tup1;
- smallerTup = tup2;
- } else {
- largerTup = tup2;
- smallerTup = tup1;
- }
- int gap = largerTup.size() - smallerTup.size();
-
- for (int i = 0; i < largerTup.size(); i++) {
- if (i < gap) {
- outputTup.add(largerTup.getElement(i));
- } else {
- double elementProduct = 0;
- elementProduct =
- Double.parseDouble(largerTup.getElement(i))
- * Double.parseDouble(smallerTup.getElement(i - gap));
- outputTup.add(String.valueOf(elementProduct));
- }
- }
- return outputTup;
- }
-
- public Tuple<String> sumTuples(Tuple<String> tup1, Tuple<String> tup2) {
- if (tup1 == null) {
- return tup2;
- }
- if (tup2 == null) {
- return tup1;
- }
- Tuple<String> outputTup = new Tuple<String>();
-
- // sum staggers if the tuples are same length
- // e.g. 1,2,3 + 4,5 = 1,6,8
- // so this is a bit tricky
- Tuple<String> largerTup;
- Tuple<String> smallerTup;
- if (tup1.size() >= tup2.size()) {
- largerTup = tup1;
- smallerTup = tup2;
- } else {
- largerTup = tup2;
- smallerTup = tup1;
- }
- int gap = largerTup.size() - smallerTup.size();
-
- for (int i = 0; i < largerTup.size(); i++) {
- if (i < gap) {
- outputTup.add(largerTup.getElement(i));
- } else {
- double elementSum = 0;
- elementSum =
- Double.parseDouble(largerTup.getElement(i))
- + Double.parseDouble(smallerTup.getElement(i - gap));
- outputTup.add(String.valueOf(elementSum));
- }
- }
- return outputTup;
- }
-
- public abstract List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input);
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Stat.java b/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
deleted file mode 100644
index 6895128..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class Stat {
- String _name;
- Tuple<String> _value;
- Tuple<String> _timestamp;
-
- public Stat(String name, Tuple<String> value, Tuple<String> timestamp) {
- _name = name;
- _value = value;
- _timestamp = timestamp;
- }
-
- public String getName() {
- return _name;
- }
-
- public Tuple<String> getValue() {
- return _value;
- }
-
- public Tuple<String> getTimestamp() {
- return _timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java b/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
deleted file mode 100644
index 1538eb8..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
+++ /dev/null
@@ -1,306 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.stages.HealthDataCache;
-import org.apache.helix.model.PersistentStats;
-import org.apache.log4j.Logger;
-
-public class StatsHolder {
- enum MatchResult {
- WILDCARDMATCH,
- EXACTMATCH,
- NOMATCH
- };
-
- private static final Logger logger = Logger.getLogger(StatsHolder.class.getName());
-
- public static final String VALUE_NAME = "value";
- public static final String TIMESTAMP_NAME = "TimeStamp";
-
- HelixDataAccessor _accessor;
- HealthDataCache _cache;
-
- Map<String, Map<String, String>> _statMap;
- Map<String, Map<String, MatchResult>> _statAlertMatchResult;
-
- private Builder _keyBuilder;
-
- // PersistentStats _persistentStats;
-
- public StatsHolder(HelixManager manager, HealthDataCache cache) {
- _accessor = manager.getHelixDataAccessor();
- _cache = cache;
- _keyBuilder = new PropertyKey.Builder(manager.getClusterName());
- updateCache(_cache);
- _statAlertMatchResult = new HashMap<String, Map<String, MatchResult>>();
-
- }
-
- public void refreshStats() {
- logger.info("Refreshing cached stats");
- _cache.refresh(_accessor);
- updateCache(_cache);
- }
-
- public void persistStats() {
- // XXX: Am I using _accessor too directly here?
- // took around 35 ms from desktop to ESV4 machine
- PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
- if (stats == null) {
- stats = new PersistentStats(PersistentStats.nodeName); // TODO: fix naming of
- // this record, if it
- // matters
- }
- stats.getRecord().setMapFields(_statMap);
- boolean retVal = _accessor.setProperty(_keyBuilder.persistantStat(), stats);
- }
-
- public void getStatsFromCache(boolean refresh) {
- long refreshStartTime = System.currentTimeMillis();
- if (refresh) {
- _cache.refresh(_accessor);
- }
- PersistentStats persistentStatRecord = _cache.getPersistentStats();
- if (persistentStatRecord != null) {
- _statMap = persistentStatRecord.getMapFields();
- } else {
- _statMap = new HashMap<String, Map<String, String>>();
- }
- /*
- * if (_cache.getPersistentStats() != null) {
- * _statMap = _cache.getPersistentStats();
- * }
- */
- // TODO: confirm this a good place to init the _statMap when null
- /*
- * if (_statMap == null) {
- * _statMap = new HashMap<String, Map<String, String>>();
- * }
- */
- System.out.println("Refresh stats done: " + (System.currentTimeMillis() - refreshStartTime));
- }
-
- public Iterator<String> getAllStats() {
- return null;
- }
-
- /*
- * TODO: figure out pre-conditions here. I think not allowing anything to be
- * null on input
- */
- public Map<String, String> mergeStats(String statName, Map<String, String> existingStat,
- Map<String, String> incomingStat) throws HelixException {
- if (existingStat == null) {
- throw new HelixException("existing stat for merge is null");
- }
- if (incomingStat == null) {
- throw new HelixException("incoming stat for merge is null");
- }
- // get agg type and arguments, then get agg object
- String aggTypeStr = ExpressionParser.getAggregatorStr(statName);
- String[] aggArgs = ExpressionParser.getAggregatorArgs(statName);
- Aggregator agg = ExpressionParser.getAggregator(aggTypeStr);
- // XXX: some of below lines might fail with null exceptions
-
- // get timestamps, values out of zk maps
- String existingTime = existingStat.get(TIMESTAMP_NAME);
- String existingVal = existingStat.get(VALUE_NAME);
- String incomingTime = incomingStat.get(TIMESTAMP_NAME);
- String incomingVal = incomingStat.get(VALUE_NAME);
- // parse values into tuples, if the values exist. else, tuples are null
- Tuple<String> existingTimeTuple =
- (existingTime != null) ? Tuple.fromString(existingTime) : null;
- Tuple<String> existingValueTuple = (existingVal != null) ? Tuple.fromString(existingVal) : null;
- Tuple<String> incomingTimeTuple =
- (incomingTime != null) ? Tuple.fromString(incomingTime) : null;
- Tuple<String> incomingValueTuple = (incomingVal != null) ? Tuple.fromString(incomingVal) : null;
-
- // dp merge
- agg.merge(existingValueTuple, incomingValueTuple, existingTimeTuple, incomingTimeTuple, aggArgs);
- // put merged tuples back in map
- Map<String, String> mergedMap = new HashMap<String, String>();
- if (existingTimeTuple.size() == 0) {
- throw new HelixException("merged time tuple has size zero");
- }
- if (existingValueTuple.size() == 0) {
- throw new HelixException("merged value tuple has size zero");
- }
-
- mergedMap.put(TIMESTAMP_NAME, existingTimeTuple.toString());
- mergedMap.put(VALUE_NAME, existingValueTuple.toString());
- return mergedMap;
- }
-
- /*
- * Find all persisted stats this stat matches. Update those stats. An incoming
- * stat can match multiple stats exactly (if that stat has multiple agg types)
- * An incoming stat can match multiple wildcard stats
- */
-
- // need to do a time check here!
-
- public void applyStat(String incomingStatName, Map<String, String> statFields) {
- // TODO: consider locking stats here
- // refreshStats(); //will have refreshed by now during stage
-
- Map<String, Map<String, String>> pendingAdds = new HashMap<String, Map<String, String>>();
-
- if (!_statAlertMatchResult.containsKey(incomingStatName)) {
- _statAlertMatchResult.put(incomingStatName, new HashMap<String, MatchResult>());
- }
- Map<String, MatchResult> resultMap = _statAlertMatchResult.get(incomingStatName);
- // traverse through all persistent stats
- for (String key : _statMap.keySet()) {
- if (resultMap.containsKey(key)) {
- MatchResult cachedMatchResult = resultMap.get(key);
- if (cachedMatchResult == MatchResult.EXACTMATCH) {
- processExactMatch(key, statFields);
- } else if (cachedMatchResult == MatchResult.WILDCARDMATCH) {
- processWildcardMatch(incomingStatName, key, statFields, pendingAdds);
- }
- // don't care about NOMATCH
- continue;
- }
- // exact match on stat and stat portion of persisted stat, just update
- if (ExpressionParser.isIncomingStatExactMatch(key, incomingStatName)) {
- processExactMatch(key, statFields);
- resultMap.put(key, MatchResult.EXACTMATCH);
- }
- // wildcard match
- else if (ExpressionParser.isIncomingStatWildcardMatch(key, incomingStatName)) {
- processWildcardMatch(incomingStatName, key, statFields, pendingAdds);
- resultMap.put(key, MatchResult.WILDCARDMATCH);
- } else {
- resultMap.put(key, MatchResult.NOMATCH);
- }
- }
- _statMap.putAll(pendingAdds);
- }
-
- void processExactMatch(String key, Map<String, String> statFields) {
- Map<String, String> mergedStat = mergeStats(key, _statMap.get(key), statFields);
- // update in place, no problem with hash map
- _statMap.put(key, mergedStat);
- }
-
- void processWildcardMatch(String incomingStatName, String key, Map<String, String> statFields,
- Map<String, Map<String, String>> pendingAdds) {
-
- // make sure incoming stat doesn't already exist, either in previous
- // round or this round
- // form new key (incomingStatName with agg type from the wildcarded
- // stat)
- String statToAdd = ExpressionParser.getWildcardStatSubstitution(key, incomingStatName);
- // if the stat already existed in _statMap, we have/will apply it as an
- // exact match
- // if the stat was added this round to pendingAdds, no need to recreate
- // (it would have same value)
- if (!_statMap.containsKey(statToAdd) && !pendingAdds.containsKey(statToAdd)) {
- // add this stat to persisted stats
- Map<String, String> mergedStat = mergeStats(statToAdd, getEmptyStat(), statFields);
- // add to pendingAdds so we don't mess up ongoing traversal of
- // _statMap
- pendingAdds.put(statToAdd, mergedStat);
- }
- }
-
- // add parsing of stat (or is that in expression holder?) at least add
- // validate
- public void addStat(String exp) throws HelixException {
- refreshStats(); // get current stats
-
- String[] parsedStats = ExpressionParser.getBaseStats(exp);
-
- for (String stat : parsedStats) {
- if (_statMap.containsKey(stat)) {
- logger.debug("Stat " + stat + " already exists; not adding");
- continue;
- }
- _statMap.put(stat, getEmptyStat()); // add new stat to map
- }
- }
-
- public static Map<String, Map<String, String>> parseStat(String exp) throws HelixException {
- String[] parsedStats = ExpressionParser.getBaseStats(exp);
- Map<String, Map<String, String>> statMap = new HashMap<String, Map<String, String>>();
-
- for (String stat : parsedStats) {
- if (statMap.containsKey(stat)) {
- logger.debug("Stat " + stat + " already exists; not adding");
- continue;
- }
- statMap.put(stat, getEmptyStat()); // add new stat to map
- }
- return statMap;
- }
-
- public static Map<String, String> getEmptyStat() {
- Map<String, String> statFields = new HashMap<String, String>();
- statFields.put(TIMESTAMP_NAME, "");
- statFields.put(VALUE_NAME, "");
- return statFields;
- }
-
- public List<Stat> getStatsList() {
- List<Stat> stats = new LinkedList<Stat>();
- for (String stat : _statMap.keySet()) {
- Map<String, String> statFields = _statMap.get(stat);
- Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
- Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
- Stat s = new Stat(stat, valTup, timeTup);
- stats.add(s);
- }
- return stats;
- }
-
- public Map<String, Tuple<String>> getStatsMap() {
- // refreshStats(); //don't refresh, stage will have refreshed by this time
- HashMap<String, Tuple<String>> stats = new HashMap<String, Tuple<String>>();
- for (String stat : _statMap.keySet()) {
- Map<String, String> statFields = _statMap.get(stat);
- Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
- Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
- stats.put(stat, valTup);
- }
- return stats;
- }
-
- public void updateCache(HealthDataCache cache) {
- _cache = cache;
- PersistentStats persistentStatRecord = _cache.getPersistentStats();
- if (persistentStatRecord != null) {
- _statMap = persistentStatRecord.getMapFields();
- } else {
- _statMap = new HashMap<String, Map<String, String>>();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
deleted file mode 100644
index 2cc733f..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumEachOperator extends Operator {
-
- public SumEachOperator() {
- minInputTupleLists = 1;
- maxInputTupleLists = Integer.MAX_VALUE;
- inputOutputTupleListsCountsEqual = true;
- numOutputTupleLists = -1;
- }
-
- // for each column, generate sum
- @Override
- public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
- List<Iterator<Tuple<String>>> out = new ArrayList<Iterator<Tuple<String>>>();
- for (Iterator<Tuple<String>> currIt : input) {
- Tuple<String> currSum = null;
- while (currIt.hasNext()) {
- currSum = sumTuples(currSum, currIt.next());
- }
- ArrayList<Tuple<String>> currOutList = new ArrayList<Tuple<String>>();
- currOutList.add(currSum);
- out.add(currOutList.iterator());
- }
- return out;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
deleted file mode 100644
index 90c9ab0..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumOperator extends Operator {
-
- public SumOperator() {
- minInputTupleLists = 1;
- maxInputTupleLists = Integer.MAX_VALUE;
- inputOutputTupleListsCountsEqual = false;
- numOutputTupleLists = 1;
- }
-
- public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) {
- List out = new ArrayList();
- out.add(input.iterator());
- return out;
- }
-
- @Override
- public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
- ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
- if (input == null || input.size() == 0) {
- return singleSetToIter(output);
- }
- while (true) { // loop through set of iters, return when 1 runs out (not completing the row in
- // progress)
- Tuple<String> rowSum = null;
- for (Iterator<Tuple<String>> it : input) {
- if (!it.hasNext()) { // when any iterator runs out, we are done
- return singleSetToIter(output);
- }
- rowSum = sumTuples(rowSum, it.next());
- }
- output.add(rowSum);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java b/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
deleted file mode 100644
index e57f088..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class Tuple<T> {
- List<T> elements;
-
- public Tuple() {
- elements = new ArrayList<T>();
- }
-
- public int size() {
- return elements.size();
- }
-
- public void add(T entry) {
- elements.add(entry);
- }
-
- public void addAll(Tuple<T> incoming) {
- elements.addAll(incoming.getElements());
- }
-
- public Iterator<T> iterator() {
- return elements.listIterator();
- }
-
- public T getElement(int ind) {
- return elements.get(ind);
- }
-
- public List<T> getElements() {
- return elements;
- }
-
- public void clear() {
- elements.clear();
- }
-
- public static Tuple<String> fromString(String in) {
- Tuple<String> tup = new Tuple<String>();
- if (in.length() > 0) {
- String[] elements = in.split(",");
- for (String element : elements) {
- tup.add(element);
- }
- }
- return tup;
- }
-
- public String toString() {
- StringBuilder out = new StringBuilder();
- Iterator<T> it = iterator();
- boolean outEmpty = true;
- while (it.hasNext()) {
- if (!outEmpty) {
- out.append(",");
- }
- out.append(it.next());
- outEmpty = false;
- }
- return out.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
deleted file mode 100644
index 6ef4cfe..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-
-public class WindowAggregator extends Aggregator {
-
- int _windowSize;
-
- public WindowAggregator(String windowSize) {
- _windowSize = Integer.parseInt(windowSize);
- _numArgs = 1;
- }
-
- public WindowAggregator() {
- this("1");
- }
-
- @Override
- public void merge(Tuple<String> currValTup, Tuple<String> newValTup, Tuple<String> currTimeTup,
- Tuple<String> newTimeTup, String... args) {
-
- _windowSize = Integer.parseInt(args[0]);
-
- // figure out how many curr tuple values we displace
- Tuple<String> mergedTimeTuple = new Tuple<String>();
- Tuple<String> mergedValTuple = new Tuple<String>();
-
- Iterator<String> currTimeIter = currTimeTup.iterator();
- Iterator<String> currValIter = currValTup.iterator();
- Iterator<String> newTimeIter = newTimeTup.iterator();
- Iterator<String> newValIter = newValTup.iterator();
- int currCtr = 0;
- // traverse current vals
- double currTime = -1;
- double currVal;
- while (currTimeIter.hasNext()) {
- currTime = Double.parseDouble(currTimeIter.next());
- currVal = Double.parseDouble(currValIter.next());
- currCtr++;
- // number of evicted currVals equal to total size of both minus _windowSize
- if (currCtr > (newTimeTup.size() + currTimeTup.size() - _windowSize)) { // non-evicted
- // element, just bump
- // down
- mergedTimeTuple.add(String.valueOf(currTime));
- mergedValTuple.add(String.valueOf(currVal));
- }
- }
-
- double newVal;
- double newTime;
- while (newTimeIter.hasNext()) {
- newVal = Double.parseDouble(newValIter.next());
- newTime = Double.parseDouble(newTimeIter.next());
- if (newTime <= currTime) { // oldest new time older than newest curr time. we will not apply
- // new tuple!
- return; // curr tuples remain the same
- }
- currCtr++;
- if (currCtr > (newTimeTup.size() + currTimeTup.size() - _windowSize)) { // non-evicted element
- mergedTimeTuple.add(String.valueOf(newTime));
- mergedValTuple.add(String.valueOf(newVal));
- }
- }
- // set curr tuples to merged tuples
- currTimeTup.clear();
- currTimeTup.addAll(mergedTimeTuple);
- currValTup.clear();
- currValTup.addAll(mergedValTuple);
- // TODO: see if we can do merger in place on curr
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/package-info.java b/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
deleted file mode 100644
index bf1d9a6..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Classes for Helix alerts
- */
-package org.apache.helix.alerts;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index c85dd0b..92fb636 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -26,15 +26,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Controller;
import org.apache.helix.api.Participant;
@@ -61,7 +56,6 @@ import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConfiguration;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -73,7 +67,6 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
-import org.apache.helix.model.PersistentStats;
import org.apache.helix.model.ProvisionerConfigHolder;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfiguration;
@@ -521,14 +514,6 @@ public class ClusterAccessor {
}
/**
- * Get the stats persisted on this cluster
- * @return PersistentStats, or null if none persisted
- */
- public PersistentStats readStats() {
- return _accessor.getProperty(_keyBuilder.persistantStat());
- }
-
- /**
* Read the persisted controller contexts
* @return map of context id to controller context
*/
@@ -556,152 +541,6 @@ public class ClusterAccessor {
}
/**
- * Get the current cluster stats
- * @return PersistentStats
- */
- public PersistentStats getStats() {
- return _accessor.getProperty(_keyBuilder.persistantStat());
- }
-
- /**
- * Get the current cluster alerts
- * @return Alerts
- */
- public Alerts getAlerts() {
- return _accessor.getProperty(_keyBuilder.alerts());
- }
-
- /**
- * Add a statistic specification to the cluster. Existing stat specifications will not be
- * overwritten
- * @param statName string representing a stat specification
- * @return true if the stat spec was added, false otherwise
- */
- public boolean addStat(final String statName) {
- if (!isClusterStructureValid()) {
- LOG.error("cluster " + _clusterId + " is not setup yet");
- return false;
- }
-
- String persistentStatsPath = _keyBuilder.persistantStat().getPath();
- BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
- return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord statsRec) {
- if (statsRec == null) {
- statsRec = new ZNRecord(PersistentStats.nodeName);
- }
- Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
- Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
- for (String newStat : newStatMap.keySet()) {
- if (!currStatMap.containsKey(newStat)) {
- currStatMap.put(newStat, newStatMap.get(newStat));
- }
- }
- statsRec.setMapFields(currStatMap);
- return statsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- /**
- * Remove a statistic specification from the cluster
- * @param statName string representing a statistic specification
- * @return true if stats removed, false otherwise
- */
- public boolean dropStat(final String statName) {
- if (!isClusterStructureValid()) {
- LOG.error("cluster " + _clusterId + " is not setup yet");
- return false;
- }
-
- String persistentStatsPath = _keyBuilder.persistantStat().getPath();
- BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
- return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord statsRec) {
- if (statsRec == null) {
- throw new HelixException("No stats record in ZK, nothing to drop");
- }
- Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
- Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
- // delete each stat from stat map
- for (String newStat : newStatMap.keySet()) {
- if (currStatMap.containsKey(newStat)) {
- currStatMap.remove(newStat);
- }
- }
- statsRec.setMapFields(currStatMap);
- return statsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- /**
- * Add an alert specification to the cluster
- * @param alertName string representing the alert spec
- * @return true if added, false otherwise
- */
- public boolean addAlert(final String alertName) {
- if (!isClusterStructureValid()) {
- LOG.error("cluster " + _clusterId + " is not setup yet");
- return false;
- }
-
- BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
- String alertsPath = _keyBuilder.alerts().getPath();
- return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord alertsRec) {
- if (alertsRec == null) {
- alertsRec = new ZNRecord(Alerts.nodeName);
- }
- Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
- StringBuilder newStatName = new StringBuilder();
- Map<String, String> newAlertMap = new HashMap<String, String>();
-
- // use AlertsHolder to get map of new stats and map for this alert
- AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
-
- // add stat
- addStat(newStatName.toString());
-
- // add alert
- currAlertMap.put(alertName, newAlertMap);
- alertsRec.setMapFields(currAlertMap);
- return alertsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- /**
- * Remove an alert specification from the cluster
- * @param alertName string representing an alert specification
- * @return true if removed, false otherwise
- */
- public boolean dropAlert(final String alertName) {
- if (!isClusterStructureValid()) {
- LOG.error("cluster " + _clusterId + " is not setup yet");
- return false;
- }
-
- String alertsPath = _keyBuilder.alerts().getPath();
- BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
- return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord alertsRec) {
- if (alertsRec == null) {
- throw new HelixException("No alerts record persisted, nothing to drop");
- }
- Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
- currAlertMap.remove(alertName);
- alertsRec.setMapFields(currAlertMap);
- return alertsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- /**
* Add user configuration to the existing cluster user configuration. Overwrites properties with
* the same key
* @param userConfig the user config key-value pairs to add
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index f9af914..7bb214e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -33,7 +33,6 @@ import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HealthStateChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.IdealStateChangeListener;
@@ -66,7 +65,6 @@ import org.apache.helix.controller.stages.ResourceValidationStage;
import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HealthStat;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -76,23 +74,21 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
/**
- * Cluster Controllers main goal is to keep the cluster state as close as possible to
- * Ideal State. It does this by listening to changes in cluster state and scheduling new
- * tasks to get cluster state to best possible ideal state. Every instance of this class
- * can control can control only one cluster
- * Get all the partitions use IdealState, CurrentState and Messages <br>
+ * Cluster Controllers main goal is to keep the cluster state as close as possible to Ideal State.
+ * It does this by listening to changes in cluster state and scheduling new tasks to get cluster
+ * state to best possible ideal state. Every instance of this class can control can control only one
+ * cluster Get all the partitions use IdealState, CurrentState and Messages <br>
* foreach partition <br>
* 1. get the (instance,state) from IdealState, CurrentState and PendingMessages <br>
- * 2. compute best possible state (instance,state) pair. This needs previous step data and
- * state model constraints <br>
+ * 2. compute best possible state (instance,state) pair. This needs previous step data and state
+ * model constraints <br>
* 3. compute the messages/tasks needed to move to 1 to 2 <br>
* 4. select the messages that can be sent, needs messages and state model constraints <br>
* 5. send messages
*/
public class GenericHelixController implements ConfigChangeListener, IdealStateChangeListener,
LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
- ExternalViewChangeListener, ControllerChangeListener, HealthStateChangeListener,
- InstanceConfigChangeListener {
+ ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener {
private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
volatile boolean init = false;
private final PipelineRegistry _registry;
@@ -109,15 +105,14 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
private final ClusterEventProcessor _eventThread;
/**
- * The _paused flag is checked by function handleEvent(), while if the flag is set
- * handleEvent() will be no-op. Other event handling logic keeps the same when the flag
- * is set.
+ * The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent()
+ * will be no-op. Other event handling logic keeps the same when the flag is set.
*/
private boolean _paused;
/**
- * The timer that can periodically run the rebalancing pipeline. The timer will start if there
- * is one resource group has the config to use the timer.
+ * The timer that can periodically run the rebalancing pipeline. The timer will start if there is
+ * one resource group has the config to use the timer.
*/
Timer _rebalanceTimer = null;
int _timerPeriod = Integer.MAX_VALUE;
@@ -128,9 +123,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
private ClusterDataCache _cache;
/**
- * Default constructor that creates a default pipeline registry. This is sufficient in
- * most cases, but if there is a some thing specific needed use another constructor
- * where in you can pass a pipeline registry
+ * Default constructor that creates a default pipeline registry. This is sufficient in most cases,
+ * but if there is a some thing specific needed use another constructor where in you can pass a
+ * pipeline registry
*/
public GenericHelixController() {
this(createDefaultRegistry());
@@ -160,9 +155,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
// TODO who should stop this timer
/**
- * Starts the rebalancing timer with the specified period. Start the timer if necessary;
- * If the period is smaller than the current period, cancel the current timer and use
- * the new period.
+ * Starts the rebalancing timer with the specified period. Start the timer if necessary; If the
+ * period is smaller than the current period, cancel the current timer and use the new period.
*/
void startRebalancingTimer(int period, HelixManager manager) {
logger.info("Controller starting timer at period " + period);
@@ -227,7 +221,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
registry.register("resume", dataRefresh, rebalancePipeline, externalViewPipeline);
registry
.register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
-
return registry;
}
}
@@ -245,8 +238,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
}
/**
- * lock-always: caller always needs to obtain an external lock before call, calls to
- * handleEvent() should be serialized
+ * lock-always: caller always needs to obtain an external lock before call, calls to handleEvent()
+ * should be serialized
* @param event
*/
protected synchronized void handleEvent(ClusterEvent event) {
@@ -345,16 +338,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
}
@Override
- public void onHealthChange(String instanceName, List<HealthStat> reports,
- NotificationContext changeContext) {
- /**
- * When there are more participant ( > 20, can be in hundreds), This callback can be
- * called quite frequently as each participant reports health stat every minute. Thus
- * we change the health check pipeline to run in a timer callback.
- */
- }
-
- @Override
public void onMessage(String instanceName, List<Message> messages,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onMessage()");
@@ -526,9 +509,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
}
/**
- * Go through the list of liveinstances in the cluster, and add currentstateChange
- * listener and Message listeners to them if they are newly added. For current state
- * change, the observation is tied to the session id of each live instance.
+ * Go through the list of liveinstances in the cluster, and add currentstateChange listener and
+ * Message listeners to them if they are newly added. For current state change, the observation is
+ * tied to the session id of each live instance.
*/
protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
deleted file mode 100644
index 6b29e2d..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.model.AlertStatus;
-import org.apache.helix.model.Alerts;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.PersistentStats;
-
-public class HealthDataCache {
- Map<String, LiveInstance> _liveInstanceMap;
-
- Map<String, Map<String, HealthStat>> _healthStatMap;
- HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
- PersistentStats _persistentStats;
- Alerts _alerts;
- AlertStatus _alertStatus;
-
- public HealthStat getGlobalStats() {
- return _globalStats;
- }
-
- public PersistentStats getPersistentStats() {
- return _persistentStats;
- }
-
- public Alerts getAlerts() {
- return _alerts;
- }
-
- public AlertStatus getAlertStatus() {
- return _alertStatus;
- }
-
- public Map<String, HealthStat> getHealthStats(String instanceName) {
- Map<String, HealthStat> map = _healthStatMap.get(instanceName);
- if (map != null) {
- return map;
- } else {
- return Collections.emptyMap();
- }
- }
-
- public Map<String, LiveInstance> getLiveInstances() {
- return _liveInstanceMap;
- }
-
- public boolean refresh(HelixDataAccessor accessor) {
- Builder keyBuilder = accessor.keyBuilder();
- _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-
- Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
-
- for (String instanceName : _liveInstanceMap.keySet()) {
- // xxx clearly getting znodes for the instance here...so get the
- // timestamp!
-
- Map<String, HealthStat> childValuesMap =
- accessor.getChildValuesMap(keyBuilder.healthReports(instanceName));
- hsMap.put(instanceName, childValuesMap);
- }
- _healthStatMap = Collections.unmodifiableMap(hsMap);
- _persistentStats = accessor.getProperty(keyBuilder.persistantStat());
- _alerts = accessor.getProperty(keyBuilder.alerts());
- _alertStatus = accessor.getProperty(keyBuilder.alertStatus());
-
- return true;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
deleted file mode 100644
index 859c1d0..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-
-public class ReadHealthDataStage extends AbstractBaseStage {
- HealthDataCache _cache;
-
- public ReadHealthDataStage() {
- _cache = new HealthDataCache();
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- long startTime = System.currentTimeMillis();
-
- HelixManager manager = event.getAttribute("helixmanager");
- if (manager == null) {
- throw new StageException("HelixManager attribute value is null");
- }
- // DataAccessor dataAccessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- _cache.refresh(accessor);
-
- event.addAttribute("HealthDataCache", _cache);
-
- long processLatency = System.currentTimeMillis() - startTime;
- addLatencyToMonitor(event, processLatency);
- }
-}
|