Author: acmurthy
Date: Fri Aug 12 23:25:51 2011
New Revision: 1157290
URL: http://svn.apache.org/viewvc?rev=1157290&view=rev
Log:
MAPREDUCE-901. Efficient framework counters. Contributed by Luke Lu.
Added:
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/Limits.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/package-info.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java
Modified:
hadoop/common/trunk/mapreduce/CHANGES.txt
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskStatus.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counter.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/CounterGroup.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counters.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/JobCounter.properties
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestCombineOutputCollector.java
hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java
Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Fri Aug 12 23:25:51 2011
@@ -227,6 +227,8 @@ Trunk (unreleased changes)
MAPREDUCE-2740. MultipleOutputs in new API creates needless
TaskAttemptContexts. (todd)
+ MAPREDUCE-901. Efficient framework counters. (llu via acmurthy)
+
BUG FIXES
MAPREDUCE-2603. Disable High-Ram emulation in system tests.
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java Fri Aug
12 23:25:51 2011
@@ -18,20 +18,9 @@
package org.apache.hadoop.mapred;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.MissingResourceException;
-import java.util.ResourceBundle;
-import org.apache.commons.logging.*;
+import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IntWritable;
@@ -40,426 +29,302 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+import org.apache.hadoop.mapreduce.counters.CounterGroupFactory;
+import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
+import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.apache.hadoop.mapreduce.counters.Limits;
+import static org.apache.hadoop.mapreduce.util.CountersStrings.*;
/**
* A set of named counters.
- *
- * <p><code>Counters</code> represent global counters, defined either by
the
+ *
+ * <p><code>Counters</code> represent global counters, defined either by
the
* Map-Reduce framework or applications. Each <code>Counter</code> can be of
* any {@link Enum} type.</p>
- *
+ *
* <p><code>Counters</code> are bunched into {@link Group}s, each comprising
of
- * counters from a particular <code>Enum</code> class.
+ * counters from a particular <code>Enum</code> class.
* @deprecated Use {@link org.apache.hadoop.mapreduce.Counters} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
-public class Counters implements Writable, Iterable<Counters.Group> {
- private static final Log LOG = LogFactory.getLog(Counters.class);
- private static final char GROUP_OPEN = '{';
- private static final char GROUP_CLOSE = '}';
- private static final char COUNTER_OPEN = '[';
- private static final char COUNTER_CLOSE = ']';
- private static final char UNIT_OPEN = '(';
- private static final char UNIT_CLOSE = ')';
- private static char[] charsToEscape = {GROUP_OPEN, GROUP_CLOSE,
- COUNTER_OPEN, COUNTER_CLOSE,
- UNIT_OPEN, UNIT_CLOSE};
-
- //private static Log log = LogFactory.getLog("Counters.class");
-
+public class Counters
+ extends AbstractCounters<Counters.Counter, Counters.Group> {
+
+ public Counters() {
+ super(groupFactory);
+ }
+
+ public Counters(org.apache.hadoop.mapreduce.Counters newCounters) {
+ super(newCounters, groupFactory);
+ }
+
/**
* Downgrade new {@link org.apache.hadoop.mapreduce.Counters} to old Counters
* @param newCounters new Counters
* @return old Counters instance corresponding to newCounters
*/
static Counters downgrade(org.apache.hadoop.mapreduce.Counters newCounters) {
- Counters oldCounters = new Counters();
- for (org.apache.hadoop.mapreduce.CounterGroup newGroup: newCounters) {
- String groupName = newGroup.getName();
- Group oldGroup = oldCounters.getGroup(groupName);
- for (org.apache.hadoop.mapreduce.Counter newCounter: newGroup) {
- Counter oldCounter = oldGroup.getCounterForName(newCounter.getName());
- oldCounter.setDisplayName(newCounter.getDisplayName());
- oldCounter.increment(newCounter.getValue());
- }
- }
- return oldCounters;
+ return new Counters(newCounters);
}
/**
- * A counter record, comprising its name and value.
+ * A counter record, comprising its name and value.
*/
- public static class Counter extends org.apache.hadoop.mapreduce.Counter {
-
- Counter() {
- }
+ public interface Counter extends org.apache.hadoop.mapreduce.Counter {
- Counter(String name, String displayName, long value) {
- super(name, displayName);
- increment(value);
- }
-
- public void setDisplayName(String newName) {
- super.setDisplayName(newName);
- }
-
/**
* Returns the compact stringified version of the counter in the format
* [(actual-name)(display-name)(value)]
+ * @return the stringified result
*/
- public synchronized String makeEscapedCompactString() {
+ String makeEscapedCompactString();
- // First up, obtain the strings that need escaping. This will help us
- // determine the buffer length apriori.
- String escapedName = escape(getName());
- String escapedDispName = escape(getDisplayName());
- long currentValue = this.getValue();
- int length = escapedName.length() + escapedDispName.length() + 4;
-
- length += 8; // For the following delimiting characters
- StringBuilder builder = new StringBuilder(length);
- builder.append(COUNTER_OPEN);
-
- // Add the counter name
- builder.append(UNIT_OPEN);
- builder.append(escapedName);
- builder.append(UNIT_CLOSE);
-
- // Add the display name
- builder.append(UNIT_OPEN);
- builder.append(escapedDispName);
- builder.append(UNIT_CLOSE);
-
- // Add the value
- builder.append(UNIT_OPEN);
- builder.append(currentValue);
- builder.append(UNIT_CLOSE);
-
- builder.append(COUNTER_CLOSE);
-
- return builder.toString();
- }
-
- // Checks for (content) equality of two (basic) counters
+ /**
+ * Checks for (content) equality of two (basic) counters
+ * @param counter to compare
+ * @return true if content equals
+ * @deprecated
+ */
@Deprecated
- synchronized boolean contentEquals(Counter c) {
- return this.equals(c);
- }
-
+ boolean contentEquals(Counter counter);
+
/**
- * What is the current value of this counter?
- * @return the current value
+ * @return the value of the counter
*/
- public synchronized long getCounter() {
+ long getCounter();
+ }
+
+ static class OldCounterImpl extends GenericCounter implements Counter {
+
+ OldCounterImpl() {
+ }
+
+ OldCounterImpl(String name, String displayName, long value) {
+ super(name, displayName, value);
+ }
+
+ @Override
+ public synchronized String makeEscapedCompactString() {
+ return toEscapedCompactString(this);
+ }
+
+ @Override @Deprecated
+ public boolean contentEquals(Counter counter) {
+ return equals(counter);
+ }
+
+ @Override
+ public long getCounter() {
return getValue();
}
-
}
-
+
/**
- * <code>Group</code> of counters, comprising of counters from a particular
- * counter {@link Enum} class.
+ * <code>Group</code> of counters, comprising of counters from a particular
+ * counter {@link Enum} class.
*
- * <p><code>Group</code>handles localization of the class name and the
+ * <p><code>Group</code>handles localization of the class name and the
* counter names.</p>
*/
- public static class Group implements Writable, Iterable<Counter> {
- private String groupName;
- private String displayName;
- private Map<String, Counter> subcounters = new HashMap<String, Counter>();
-
- // Optional ResourceBundle for localization of group and counter names.
- private ResourceBundle bundle = null;
-
- Group(String groupName) {
- try {
- bundle = getResourceBundle(groupName);
- }
- catch (MissingResourceException neverMind) {
- }
- this.groupName = groupName;
- this.displayName = localize("CounterGroupName", groupName);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating group " + groupName + " with " +
- (bundle == null ? "nothing" : "bundle"));
- }
- }
-
+ public static interface Group extends CounterGroupBase<Counter> {
+
/**
- * Returns the specified resource bundle, or throws an exception.
- * @throws MissingResourceException if the bundle isn't found
+ * @param counterName the name of the counter
+ * @return the value of the specified counter, or 0 if the counter does
+ * not exist.
*/
- private static ResourceBundle getResourceBundle(String enumClassName) {
- String bundleName = enumClassName.replace('$','_');
- return ResourceBundle.getBundle(bundleName);
- }
-
+ long getCounter(String counterName);
+
/**
- * Returns raw name of the group. This is the name of the enum class
- * for this group of counters.
+ * @return the compact stringified version of the group in the format
+ * {(actual-name)(display-name)(value)[][][]} where [] are compact strings
+ * for the counters within.
*/
- public String getName() {
- return groupName;
- }
-
+ String makeEscapedCompactString();
+
/**
- * Returns localized name of the group. This is the same as getName() by
- * default, but different if an appropriate ResourceBundle is found.
+ * Get the counter for the given id and create it if it doesn't exist.
+ * @param id the numeric id of the counter within the group
+ * @param name the internal counter name
+ * @return the counter
+ * @deprecated use {@link #findCounter(String)} instead
*/
- public String getDisplayName() {
- return displayName;
- }
-
+ @Deprecated
+ Counter getCounter(int id, String name);
+
/**
- * Set the display name
+ * Get the counter for the given name and create it if it doesn't exist.
+ * @param name the internal counter name
+ * @return the counter
*/
- public void setDisplayName(String displayName) {
- this.displayName = displayName;
+ Counter getCounterForName(String name);
+ }
+
+ // All the group impls need this for legacy group interface
+ static long getCounterValue(Group group, String counterName) {
+ Counter counter = group.findCounter(counterName, false);
+ if (counter != null) return counter.getValue();
+ return 0L;
+ }
+
+ // Mix the generic group implementation into the Group interface
+ private static class GenericGroup extends AbstractCounterGroup<Counter>
+ implements Group {
+
+ GenericGroup(String name, String displayName, Limits limits) {
+ super(name, displayName, limits);
}
-
- /**
- * Returns the compact stringified version of the group in the format
- * {(actual-name)(display-name)(value)[][][]} where [] are compact strings for the
- * counters within.
- */
+
+ @Override
+ public long getCounter(String counterName) {
+ return getCounterValue(this, counterName);
+ }
+
+ @Override
public String makeEscapedCompactString() {
- String[] subcountersArray = new String[subcounters.size()];
+ return toEscapedCompactString(this);
+ }
- // First up, obtain the strings that need escaping. This will help us
- // determine the buffer length apriori.
- String escapedName = escape(getName());
- String escapedDispName = escape(getDisplayName());
- int i = 0;
- int length = escapedName.length() + escapedDispName.length();
- for (Counter counter : subcounters.values()) {
- String escapedStr = counter.makeEscapedCompactString();
- subcountersArray[i++] = escapedStr;
- length += escapedStr.length();
- }
+ @Override
+ public Counter getCounter(int id, String name) {
+ return findCounter(name);
+ }
- length += 6; // for all the delimiting characters below
- StringBuilder builder = new StringBuilder(length);
- builder.append(GROUP_OPEN); // group start
-
- // Add the group name
- builder.append(UNIT_OPEN);
- builder.append(escapedName);
- builder.append(UNIT_CLOSE);
-
- // Add the display name
- builder.append(UNIT_OPEN);
- builder.append(escapedDispName);
- builder.append(UNIT_CLOSE);
-
- // write the value
- for(Counter counter: subcounters.values()) {
- builder.append(counter.makeEscapedCompactString());
- }
-
- builder.append(GROUP_CLOSE); // group end
- return builder.toString();
+ @Override
+ public Counter getCounterForName(String name) {
+ return findCounter(name);
}
@Override
- public int hashCode() {
- return subcounters.hashCode();
+ protected Counter newCounter(String counterName, String displayName,
+ long value) {
+ return new OldCounterImpl(counterName, displayName, value);
}
- /**
- * Checks for (content) equality of Groups
- */
@Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
+ protected Counter newCounter() {
+ return new OldCounterImpl();
+ }
+ }
+
+ // Mix the framework group implementation into the Group interface
+ private static class FrameworkGroupImpl<T extends Enum<T>>
+ extends FrameworkCounterGroup<T, Counter> implements Group {
+
+ // Mix the framework counter implmementation into the Counter interface
+ class FrameworkCounterImpl extends FrameworkCounter implements Counter {
+
+ FrameworkCounterImpl(T key) {
+ super(key);
}
- if (obj == null || obj.getClass() != getClass()) {
- return false;
+
+ @Override
+ public String makeEscapedCompactString() {
+ return toEscapedCompactString(this);
}
- boolean isEqual = false;
- Group g = (Group) obj;
- synchronized (this) {
- if (size() == g.size()) {
- isEqual = true;
- for (Map.Entry<String, Counter> entry : subcounters.entrySet()) {
- String key = entry.getKey();
- Counter c1 = entry.getValue();
- Counter c2 = g.getCounterForName(key);
- if (!c1.contentEquals(c2)) {
- isEqual = false;
- break;
- }
- }
- }
+
+ @Override
+ public boolean contentEquals(Counter counter) {
+ return equals(counter);
}
- return isEqual;
- }
-
- /**
- * Returns the value of the specified counter, or 0 if the counter does
- * not exist.
- */
- public synchronized long getCounter(String counterName) {
- for(Counter counter: subcounters.values()) {
- if (counter != null && counter.getDisplayName().equals(counterName)) {
- return counter.getValue();
- }
+
+ @Override
+ public long getCounter() {
+ return getValue();
}
- return 0L;
}
-
- /**
- * Get the counter for the given id and create it if it doesn't exist.
- * @param id the numeric id of the counter within the group
- * @param name the internal counter name
- * @return the counter
- * @deprecated use {@link #getCounter(String)} instead
- */
- @Deprecated
- public synchronized Counter getCounter(int id, String name) {
- return getCounterForName(name);
- }
-
- /**
- * Get the counter for the given name and create it if it doesn't exist.
- * @param name the internal counter name
- * @return the counter
- */
- public synchronized Counter getCounterForName(String name) {
- Counter result = subcounters.get(name);
- if (result == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + name);
- }
- result = new Counter(name, localize(name + ".name", name), 0L);
- subcounters.put(name, result);
- }
- return result;
+
+ FrameworkGroupImpl(Class<T> cls) {
+ super(cls);
}
-
- /**
- * Returns the number of counters in this group.
- */
- public synchronized int size() {
- return subcounters.size();
+
+ @Override
+ public long getCounter(String counterName) {
+ return getCounterValue(this, counterName);
}
-
- /**
- * Looks up key in the ResourceBundle and returns the corresponding value.
- * If the bundle or the key doesn't exist, returns the default value.
- */
- private String localize(String key, String defaultValue) {
- String result = defaultValue;
- if (bundle != null) {
- try {
- result = bundle.getString(key);
- }
- catch (MissingResourceException mre) {
- }
- }
- return result;
+
+ @Override
+ public String makeEscapedCompactString() {
+ return toEscapedCompactString(this);
}
-
- public synchronized void write(DataOutput out) throws IOException {
- Text.writeString(out, displayName);
- WritableUtils.writeVInt(out, subcounters.size());
- for(Counter counter: subcounters.values()) {
- counter.write(out);
- }
+
+ @Override @Deprecated
+ public Counter getCounter(int id, String name) {
+ return findCounter(name);
}
-
- public synchronized void readFields(DataInput in) throws IOException {
- displayName = Text.readString(in);
- subcounters.clear();
- int size = WritableUtils.readVInt(in);
- for(int i=0; i < size; i++) {
- Counter counter = new Counter();
- counter.readFields(in);
- subcounters.put(counter.getName(), counter);
- }
+
+ @Override
+ public Counter getCounterForName(String name) {
+ return findCounter(name);
}
- public synchronized Iterator<Counter> iterator() {
- return new ArrayList<Counter>(subcounters.values()).iterator();
+ @Override
+ protected Counter newCounter(T key) {
+ return new FrameworkCounterImpl(key);
}
}
-
- // Map from group name (enum class name) to map of int (enum ordinal) to
- // counter record (name-value pair).
- private Map<String,Group> counters = new HashMap<String, Group>();
- /**
- * A cache from enum values to the associated counter. Dramatically speeds up
- * typical usage.
- */
- private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
-
- /**
- * Returns the names of all counter classes.
- * @return Set of counter names.
- */
- public synchronized Collection<String> getGroupNames() {
- return counters.keySet();
- }
+ // Mix the file system counter group implementation into the Group interface
+ private static class FSGroupImpl extends FileSystemCounterGroup<Counter>
+ implements Group {
- public synchronized Iterator<Group> iterator() {
- return counters.values().iterator();
- }
+ private class FSCounterImpl extends FSCounter implements Counter {
- /**
- * Returns the named counter group, or an empty group if there is none
- * with the specified name.
- */
- public synchronized Group getGroup(String groupName) {
- Group result = counters.get(groupName);
+ FSCounterImpl(String scheme, FileSystemCounter key) {
+ super(scheme, key);
+ }
- if (result == null) {
- // To provide support for deprecated group names
- if (groupName.equals("org.apache.hadoop.mapred.Task$Counter")) {
- LOG.warn("Group org.apache.hadoop.mapred.Task$Counter is deprecated." +
- " Use org.apache.hadoop.mapreduce.TaskCounter instead");
- return getGroup("org.apache.hadoop.mapreduce.TaskCounter");
- }
+ @Override
+ public String makeEscapedCompactString() {
+ return toEscapedCompactString(this);
+ }
- if (groupName.equals
- ("org.apache.hadoop.mapred.JobInProgress$Counter")) {
- LOG.warn("Group org.apache.hadoop.mapred.JobInProgress$Counter " +
- "is deprecated. Use " +
- "org.apache.hadoop.mapreduce.JobCounter instead");
- return getGroup("org.apache.hadoop.mapreduce.JobCounter");
+ @Override @Deprecated
+ public boolean contentEquals(Counter counter) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public long getCounter() {
+ return getValue();
}
- result = new Group(groupName);
- counters.put(groupName, result);
}
- return result;
- }
+ @Override
+ protected Counter newCounter(String scheme, FileSystemCounter key) {
+ return new FSCounterImpl(scheme, key);
+ }
- /**
- * Find the counter for the given enum. The same enum will always return the
- * same counter.
- * @param key the counter key
- * @return the matching counter object
- */
- public synchronized Counter findCounter(Enum key) {
- Counter counter = cache.get(key);
- if (counter == null) {
- Group group = getGroup(key.getDeclaringClass().getName());
- counter = group.getCounterForName(key.toString());
- cache.put(key, counter);
+ @Override
+ public long getCounter(String counterName) {
+ return getCounterValue(this, counterName);
}
- return counter;
+
+ @Override
+ public String makeEscapedCompactString() {
+ return toEscapedCompactString(this);
+ }
+
+ @Override @Deprecated
+ public Counter getCounter(int id, String name) {
+ return findCounter(name);
+ }
+
+ @Override
+ public Counter getCounterForName(String name) {
+ return findCounter(name);
+ }
+
}
- /**
- * Find a counter given the group and the name.
- * @param group the name of the group
- * @param name the internal name of the counter
- * @return the counter for that name
- */
public synchronized Counter findCounter(String group, String name) {
if (name.equals("MAP_INPUT_BYTES")) {
LOG.warn("Counter name MAP_INPUT_BYTES is deprecated. " +
@@ -471,15 +336,46 @@ public class Counters implements Writabl
}
/**
+ * Provide factory methods for counter group factory implementation.
+ * See also the GroupFactory in
+ * {@link org.apache.hadoop.mapreduce.Counters mapreduce.Counters}
+ */
+ static class GroupFactory extends CounterGroupFactory<Counter, Group> {
+
+ @Override
+ protected <T extends Enum<T>>
+ FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls)
{
+ return new FrameworkGroupFactory<Group>() {
+ @Override public Group newGroup(String name) {
+ return new FrameworkGroupImpl<T>(cls); // impl in this package
+ }
+ };
+ }
+
+ @Override
+ protected Group newGenericGroup(String name, String displayName,
+ Limits limits) {
+ return new GenericGroup(name, displayName, limits);
+ }
+
+ @Override
+ protected Group newFileSystemGroup() {
+ return new FSGroupImpl();
+ }
+ }
+
+ private static final GroupFactory groupFactory = new GroupFactory();
+
+ /**
* Find a counter by using strings
* @param group the name of the group
* @param id the id of the counter within the group (0 to N-1)
* @param name the internal name of the counter
* @return the counter for that name
- * @deprecated
+ * @deprecated use {@link findCounter(String, String)} instead
*/
@Deprecated
- public synchronized Counter findCounter(String group, int id, String name) {
+ public Counter findCounter(String group, int id, String name) {
return findCounter(group, name);
}
@@ -489,10 +385,10 @@ public class Counters implements Writabl
* @param key identifies a counter
* @param amount amount by which counter is to be incremented
*/
- public synchronized void incrCounter(Enum key, long amount) {
+ public void incrCounter(Enum<?> key, long amount) {
findCounter(key).increment(amount);
}
-
+
/**
* Increments the specified counter by the specified amount, creating it if
* it didn't already exist.
@@ -500,27 +396,29 @@ public class Counters implements Writabl
* @param counter the internal name of the counter
* @param amount amount by which counter is to be incremented
*/
- public synchronized void incrCounter(String group, String counter, long amount) {
+ public void incrCounter(String group, String counter, long amount) {
findCounter(group, counter).increment(amount);
}
-
+
/**
* Returns current value of the specified counter, or 0 if the counter
* does not exist.
+ * @param key the counter enum to lookup
+ * @return the counter value or 0 if counter not found
*/
- public synchronized long getCounter(Enum key) {
+ public synchronized long getCounter(Enum<?> key) {
return findCounter(key).getValue();
}
-
+
/**
- * Increments multiple counters by their amounts in another Counters
+ * Increments multiple counters by their amounts in another Counters
* instance.
* @param other the other Counters instance
*/
public synchronized void incrAllCounters(Counters other) {
for (Group otherGroup: other) {
Group group = getGroup(otherGroup.getName());
- group.displayName = otherGroup.displayName;
+ group.setDisplayName(otherGroup.getDisplayName());
for (Counter otherCounter : otherGroup) {
Counter counter = group.getCounterForName(otherCounter.getName());
counter.setDisplayName(otherCounter.getDisplayName());
@@ -530,7 +428,18 @@ public class Counters implements Writabl
}
/**
+ * @return the total number of counters
+ * @deprecated use {@link #countCounters()} instead
+ */
+ public int size() {
+ return countCounters();
+ }
+
+ /**
* Convenience method for computing the sum of two sets of counters.
+ * @param a the first counters
+ * @param b the second counters
+ * @return a new summed counters object
*/
public static Counters sum(Counters a, Counters b) {
Counters counters = new Counters();
@@ -538,55 +447,7 @@ public class Counters implements Writabl
counters.incrAllCounters(b);
return counters;
}
-
- /**
- * Returns the total number of counters, by summing the number of counters
- * in each group.
- */
- public synchronized int size() {
- int result = 0;
- for (Group group : this) {
- result += group.size();
- }
- return result;
- }
-
- /**
- * Write the set of groups.
- * The external format is:
- * #groups (groupName group)*
- *
- * i.e. the number of groups followed by 0 or more groups, where each
- * group is of the form:
- *
- * groupDisplayName #counters (false | true counter)*
- *
- * where each counter is of the form:
- *
- * name (false | true displayName) value
- */
- public synchronized void write(DataOutput out) throws IOException {
- out.writeInt(counters.size());
- for (Group group: counters.values()) {
- Text.writeString(out, group.getName());
- group.write(out);
- }
- }
-
- /**
- * Read a set of groups.
- */
- public synchronized void readFields(DataInput in) throws IOException {
- int numClasses = in.readInt();
- counters.clear();
- while (numClasses-- > 0) {
- String groupName = Text.readString(in);
- Group group = new Group(groupName);
- group.readFields(in);
- counters.put(groupName, group);
- }
- }
-
+
/**
* Logs the current counter values.
* @param log The log to use.
@@ -596,212 +457,31 @@ public class Counters implements Writabl
for(Group group: this) {
log.info(" " + group.getDisplayName());
for (Counter counter: group) {
- log.info(" " + counter.getDisplayName() + "=" +
+ log.info(" " + counter.getDisplayName() + "=" +
counter.getCounter());
- }
- }
- }
-
- /**
- * Return textual representation of the counter values.
- */
- public synchronized String toString() {
- StringBuilder sb = new StringBuilder("Counters: " + size());
- for (Group group: this) {
- sb.append("\n\t" + group.getDisplayName());
- for (Counter counter: group) {
- sb.append("\n\t\t" + counter.getDisplayName() + "=" +
- counter.getCounter());
}
}
- return sb.toString();
}
/**
- * Convert a counters object into a single line that is easy to parse.
- * @return the string with "name=value" for each counter and separated by ","
- */
- public synchronized String makeCompactString() {
- StringBuffer buffer = new StringBuffer();
- boolean first = true;
- for(Group group: this){
- for(Counter counter: group) {
- if (first) {
- first = false;
- } else {
- buffer.append(',');
- }
- buffer.append(group.getDisplayName());
- buffer.append('.');
- buffer.append(counter.getDisplayName());
- buffer.append(':');
- buffer.append(counter.getCounter());
- }
- }
- return buffer.toString();
- }
-
- /**
- * Represent the counter in a textual format that can be converted back to
+ * Represent the counter in a textual format that can be converted back to
* its object form
* @return the string in the following format
- * {(groupname)(group-displayname)[(countername)(displayname)(value)][][]}{}{}
+ * {(groupName)(group-displayName)[(counterName)(displayName)(value)][]*}*
*/
- public synchronized String makeEscapedCompactString() {
- String[] groupsArray = new String[counters.size()];
- int i = 0;
- int length = 0;
-
- // First up, obtain the escaped string for each group so that we can
- // determine the buffer length apriori.
- for (Group group : this) {
- String escapedString = group.makeEscapedCompactString();
- groupsArray[i++] = escapedString;
- length += escapedString.length();
- }
-
- // Now construct the buffer
- StringBuilder builder = new StringBuilder(length);
- for (String group : groupsArray) {
- builder.append(group);
- }
- return builder.toString();
- }
-
- // Extracts a block (data enclosed within delimeters) ignoring escape
- // sequences. Throws ParseException if an incomplete block is found else
- // returns null.
- private static String getBlock(String str, char open, char close,
- IntWritable index) throws ParseException {
- StringBuilder split = new StringBuilder();
- int next = StringUtils.findNext(str, open, StringUtils.ESCAPE_CHAR,
- index.get(), split);
- split.setLength(0); // clear the buffer
- if (next >= 0) {
- ++next; // move over '('
-
- next = StringUtils.findNext(str, close, StringUtils.ESCAPE_CHAR,
- next, split);
- if (next >= 0) {
- ++next; // move over ')'
- index.set(next);
- return split.toString(); // found a block
- } else {
- throw new ParseException("Unexpected end of block", next);
- }
- }
- return null; // found nothing
+ public String makeEscapedCompactString() {
+ return toEscapedCompactString(this);
}
-
+
/**
- * Convert a stringified counter representation into a counter object. Note
- * that the counter can be recovered if its stringified using
- * {@link #makeEscapedCompactString()}.
- * @return a Counter
+ * Convert a stringified (by {@link #makeEscapedCompactString()} counter
+ * representation into a counter object.
+ * @param compactString to parse
+ * @return a new counters object
+ * @throws ParseException
*/
- public static Counters fromEscapedCompactString(String compactString)
- throws ParseException {
- Counters counters = new Counters();
- IntWritable index = new IntWritable(0);
-
- // Get the group to work on
- String groupString =
- getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
-
- while (groupString != null) {
- IntWritable groupIndex = new IntWritable(0);
-
- // Get the actual name
- String groupName =
- getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
- groupName = unescape(groupName);
-
- // Get the display name
- String groupDisplayName =
- getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
- groupDisplayName = unescape(groupDisplayName);
-
- // Get the counters
- Group group = counters.getGroup(groupName);
- group.setDisplayName(groupDisplayName);
-
- String counterString =
- getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
-
- while (counterString != null) {
- IntWritable counterIndex = new IntWritable(0);
-
- // Get the actual name
- String counterName =
- getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
- counterName = unescape(counterName);
-
- // Get the display name
- String counterDisplayName =
- getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
- counterDisplayName = unescape(counterDisplayName);
-
- // Get the value
- long value =
- Long.parseLong(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE,
- counterIndex));
-
- // Add the counter
- Counter counter = group.getCounterForName(counterName);
- counter.setDisplayName(counterDisplayName);
- counter.increment(value);
-
- // Get the next counter
- counterString =
- getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
- }
-
- groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
- }
- return counters;
- }
-
- // Escapes all the delimiters for counters i.e {,[,(,),],}
- private static String escape(String string) {
- return StringUtils.escapeString(string, StringUtils.ESCAPE_CHAR,
- charsToEscape);
- }
-
- // Unescapes all the delimiters for counters i.e {,[,(,),],}
- private static String unescape(String string) {
- return StringUtils.unEscapeString(string, StringUtils.ESCAPE_CHAR,
- charsToEscape);
- }
-
- @Override
- public synchronized int hashCode() {
- return counters.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || obj.getClass() != getClass()) {
- return false;
- }
- boolean isEqual = false;
- Counters other = (Counters) obj;
- synchronized (this) {
- if (size() == other.size()) {
- isEqual = true;
- for (Map.Entry<String, Group> entry : this.counters.entrySet()) {
- String key = entry.getKey();
- Group sourceGroup = entry.getValue();
- Group targetGroup = other.getGroup(key);
- if (!sourceGroup.equals(targetGroup)) {
- isEqual = false;
- break;
- }
- }
- }
- }
- return isEqual;
+ public static Counters fromEscapedCompactString(String compactString)
+ throws ParseException {
+ return parseEscapedCompactString(compactString, new Counters());
}
}
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
(original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
Fri Aug 12 23:25:51 2011
@@ -77,8 +77,10 @@ interface InterTrackerProtocol extends V
* Version 29: Adding user name to the serialized Task for use by TT.
* Version 30: Adding available memory and CPU usage information on TT to
* TaskTrackerStatus for MAPREDUCE-1218
+ * Version 31: Efficient serialization format for Framework counters
+ * (MAPREDUCE-901)
*/
- public static final long versionID = 30L;
+ public static final long versionID = 31L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri
Aug 12 23:25:51 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.JobCo
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.counters.LimitExceededException;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
@@ -1250,6 +1251,11 @@ public class JobInProgress {
* @return the job-level counters.
*/
public synchronized Counters getJobCounters() {
+ try {
+ throw new IOException("");
+ } catch (IOException ioe) {
+ LOG.info("getJC", ioe);
+ }
return jobCounters;
}
@@ -1291,8 +1297,12 @@ public class JobInProgress {
*/
private Counters incrementTaskCounters(Counters counters,
TaskInProgress[] tips) {
- for (TaskInProgress tip : tips) {
- counters.incrAllCounters(tip.getCounters());
+ try {
+ for (TaskInProgress tip : tips) {
+ counters.incrAllCounters(tip.getCounters());
+ }
+ } catch (LimitExceededException e) {
+ // too many user counters/groups, leaving existing counters intact.
}
return counters;
}
@@ -2748,6 +2758,9 @@ public class JobInProgress {
retireMap(tip);
if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
this.status.setMapProgress(1.0f);
+ if (canLaunchJobCleanupTask()) {
+ checkCountersLimitsOrFail();
+ }
}
} else {
runningReduceTasks -= 1;
@@ -2760,6 +2773,9 @@ public class JobInProgress {
retireReduce(tip);
if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
this.status.setReduceProgress(1.0f);
+ if (canLaunchJobCleanupTask()) {
+ checkCountersLimitsOrFail();
+ }
}
}
decrementSpeculativeCount(wasSpeculating, tip);
@@ -2769,6 +2785,19 @@ public class JobInProgress {
}
return true;
}
+
+ /*
+ * add up the counters and fail the job if it exceeds the limits.
+ * Make sure we do not recalculate the counters after we fail the job.
+ * Currently this is taken care by terminateJob() since it does not
+ * calculate the counters.
+ */
+ private void checkCountersLimitsOrFail() {
+ Counters counters = getCounters();
+ if (counters.limits().violation() != null) {
+ jobtracker.failJob(this);
+ }
+ }
private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus,
Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {
|