Author: szetszwo
Date: Mon Aug 1 14:11:19 2011
New Revision: 1152787
URL: http://svn.apache.org/viewvc?rev=1152787&view=rev
Log:
MAPREDUCE-2243. Close streams propely in a finally-block to avoid leakage in CompletedJobStatusStore,
TaskLog, EventWriter and TotalOrderPartitioner. Contributed by Devaraj K
Modified:
hadoop/common/trunk/mapreduce/CHANGES.txt
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1152787&r1=1152786&r2=1152787&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Mon Aug 1 14:11:19 2011
@@ -362,6 +362,10 @@ Trunk (unreleased changes)
MAPREDUCE-2463. Job history files are not moved to done folder when job
history location is hdfs. (Devaraj K via szetszwo)
+ MAPREDUCE-2243. Close streams propely in a finally-block to avoid leakage
+ in CompletedJobStatusStore, TaskLog, EventWriter and TotalOrderPartitioner.
+ (Devaraj K via szetszwo)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java?rev=1152787&r1=1152786&r2=1152787&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
(original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
Mon Aug 1 14:11:19 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/**
@@ -172,8 +173,9 @@ class CompletedJobStatusStore implements
if (active && retainTime > 0) {
JobID jobId = job.getStatus().getJobID();
Path jobStatusFile = getInfoFilePath(jobId);
+ FSDataOutputStream dataOut = null;
try {
- FSDataOutputStream dataOut = fs.create(jobStatusFile);
+ dataOut = fs.create(jobStatusFile);
job.getStatus().write(dataOut);
@@ -189,6 +191,7 @@ class CompletedJobStatusStore implements
}
dataOut.close();
+ dataOut = null; // set dataOut to null explicitly so that close in finally will not
be executed again.
} catch (IOException ex) {
LOG.warn("Could not store [" + jobId + "] job info : " +
ex.getMessage(), ex);
@@ -198,6 +201,8 @@ class CompletedJobStatusStore implements
catch (IOException ex1) {
//ignore
}
+ } finally {
+ IOUtils.cleanup(LOG, dataOut);
}
}
}
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=1152787&r1=1152786&r2=1152787&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java Mon Aug
1 14:11:19 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.LocalFileSys
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.util.ProcessTree;
@@ -111,34 +112,42 @@ public class TaskLog {
//stderr:<start-offset in the stderr file> <length>
//syslog:<start-offset in the syslog file> <length>
LogFileDetail l = new LogFileDetail();
- String str = fis.readLine();
- if (str == null) { //the file doesn't have anything
- throw new IOException ("Index file for the log of " + taskid+" doesn't exist.");
- }
- l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)+
- LogFileDetail.LOCATION.length());
- //special cases are the debugout and profile.out files. They are guaranteed
- //to be associated with each task attempt since jvm reuse is disabled
- //when profiling/debugging is enabled
- if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
- l.length = new File(l.location, filter.toString()).length();
- l.start = 0;
- fis.close();
- return l;
- }
- str = fis.readLine();
- while (str != null) {
- //look for the exact line containing the logname
- if (str.contains(filter.toString())) {
- str = str.substring(filter.toString().length()+1);
- String[] startAndLen = str.split(" ");
- l.start = Long.parseLong(startAndLen[0]);
- l.length = Long.parseLong(startAndLen[1]);
- break;
+ String str = null;
+ try {
+ str = fis.readLine();
+ if (str == null) { // the file doesn't have anything
+ throw new IOException("Index file for the log of " + taskid
+ + " doesn't exist.");
+ }
+ l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)
+ + LogFileDetail.LOCATION.length());
+ // special cases are the debugout and profile.out files. They are
+ // guaranteed
+ // to be associated with each task attempt since jvm reuse is disabled
+ // when profiling/debugging is enabled
+ if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
+ l.length = new File(l.location, filter.toString()).length();
+ l.start = 0;
+ fis.close();
+ return l;
}
str = fis.readLine();
+ while (str != null) {
+ // look for the exact line containing the logname
+ if (str.contains(filter.toString())) {
+ str = str.substring(filter.toString().length() + 1);
+ String[] startAndLen = str.split(" ");
+ l.start = Long.parseLong(startAndLen[0]);
+ l.length = Long.parseLong(startAndLen[1]);
+ break;
+ }
+ str = fis.readLine();
+ }
+ fis.close();
+ fis = null;
+ } finally {
+ IOUtils.cleanup(LOG, fis);
}
- fis.close();
return l;
}
@@ -189,22 +198,27 @@ public class TaskLog {
//LOG_DIR: <the dir where the task logs are really stored>
//STDOUT: <start-offset in the stdout file> <length>
//STDERR: <start-offset in the stderr file> <length>
- //SYSLOG: <start-offset in the syslog file> <length>
- dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"
- + LogName.STDOUT.toString() + ":");
- dos.writeBytes(Long.toString(prevOutLength) + " ");
- dos.writeBytes(Long.toString(new File(logLocation, LogName.STDOUT
- .toString()).length() - prevOutLength)
- + "\n" + LogName.STDERR + ":");
- dos.writeBytes(Long.toString(prevErrLength) + " ");
- dos.writeBytes(Long.toString(new File(logLocation, LogName.STDERR
- .toString()).length() - prevErrLength)
- + "\n" + LogName.SYSLOG.toString() + ":");
- dos.writeBytes(Long.toString(prevLogLength) + " ");
- dos.writeBytes(Long.toString(new File(logLocation, LogName.SYSLOG
- .toString()).length() - prevLogLength)
- + "\n");
- dos.close();
+ //SYSLOG: <start-offset in the syslog file> <length>
+ try{
+ dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"
+ + LogName.STDOUT.toString() + ":");
+ dos.writeBytes(Long.toString(prevOutLength) + " ");
+ dos.writeBytes(Long.toString(new File(logLocation, LogName.STDOUT
+ .toString()).length() - prevOutLength)
+ + "\n" + LogName.STDERR + ":");
+ dos.writeBytes(Long.toString(prevErrLength) + " ");
+ dos.writeBytes(Long.toString(new File(logLocation, LogName.STDERR
+ .toString()).length() - prevErrLength)
+ + "\n" + LogName.SYSLOG.toString() + ":");
+ dos.writeBytes(Long.toString(prevLogLength) + " ");
+ dos.writeBytes(Long.toString(new File(logLocation, LogName.SYSLOG
+ .toString()).length() - prevLogLength)
+ + "\n");
+ dos.close();
+ dos = null;
+ } finally {
+ IOUtils.cleanup(LOG, dos);
+ }
File indexFile = getIndexFile(currentTaskid, isCleanup);
Path indexFilePath = new Path(indexFile.getAbsolutePath());
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=1152787&r1=1152786&r2=1152787&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
(original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
Mon Aug 1 14:11:19 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
@@ -33,6 +34,8 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Event Writer is an utility class used to write events to the underlying
@@ -47,6 +50,7 @@ class EventWriter {
private DatumWriter<Event> writer =
new SpecificDatumWriter<Event>(Event.class);
private Encoder encoder;
+ private static final Log LOG = LogFactory.getLog(EventWriter.class);
EventWriter(FSDataOutputStream out) throws IOException {
this.out = out;
@@ -72,8 +76,13 @@ class EventWriter {
}
void close() throws IOException {
- encoder.flush();
- out.close();
+ try {
+ encoder.flush();
+ out.close();
+ out = null;
+ } finally {
+ IOUtils.cleanup(LOG, out);
+ }
}
private static final Schema GROUPS =
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java?rev=1152787&r1=1152786&r2=1152787&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
(original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
Mon Aug 1 14:11:19 2011
@@ -23,6 +23,8 @@ import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
@@ -30,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.RawComparator;
@@ -56,6 +59,7 @@ public class TotalOrderPartitioner<K ext
public static final String NATURAL_ORDER =
"mapreduce.totalorderpartitioner.naturalorder";
Configuration conf;
+ private static final Log LOG = LogFactory.getLog(TotalOrderPartitioner.class);
public TotalOrderPartitioner() { }
@@ -298,11 +302,16 @@ public class TotalOrderPartitioner<K ext
ArrayList<K> parts = new ArrayList<K>();
K key = ReflectionUtils.newInstance(keyClass, conf);
NullWritable value = NullWritable.get();
- while (reader.next(key, value)) {
- parts.add(key);
- key = ReflectionUtils.newInstance(keyClass, conf);
+ try {
+ while (reader.next(key, value)) {
+ parts.add(key);
+ key = ReflectionUtils.newInstance(keyClass, conf);
+ }
+ reader.close();
+ reader = null;
+ } finally {
+ IOUtils.cleanup(LOG, reader);
}
- reader.close();
return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
}
|