Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/lucene/ShardWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/lucene/ShardWriter.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/lucene/ShardWriter.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/lucene/ShardWriter.java Tue Oct 16 00:02:55 2012
@@ -1,233 +1,233 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.lucene;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration;
-import org.apache.hadoop.contrib.index.mapred.IntermediateForm;
-import org.apache.hadoop.contrib.index.mapred.Shard;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.store.Directory;
-
-/**
- * The initial version of an index is stored in the perm dir. Index files
- * created by newer versions are written to a temp dir on the local FS. After
- * successfully creating the new version in the temp dir, the shard writer
- * moves the new files to the perm dir and deletes the temp dir in close().
- */
-public class ShardWriter {
- static final Log LOG = LogFactory.getLog(ShardWriter.class);
-
- private final FileSystem fs;
- private final FileSystem localFs;
- private final Path perm;
- private final Path temp;
- private final Directory dir;
- private final IndexWriter writer;
- private int maxNumSegments;
- private long numForms = 0;
-
- /**
- * Constructor
- * @param fs
- * @param shard
- * @param tempDir
- * @param iconf
- * @throws IOException
- */
- public ShardWriter(FileSystem fs, Shard shard, String tempDir,
- IndexUpdateConfiguration iconf) throws IOException {
- LOG.info("Construct a shard writer");
-
- this.fs = fs;
- localFs = FileSystem.getLocal(iconf.getConfiguration());
- perm = new Path(shard.getDirectory());
- temp = new Path(tempDir);
-
- long initGeneration = shard.getGeneration();
- if (!fs.exists(perm)) {
- assert (initGeneration < 0);
- fs.mkdirs(perm);
- } else {
- restoreGeneration(fs, perm, initGeneration);
- }
- dir =
- new MixedDirectory(fs, perm, localFs, fs.startLocalOutput(perm, temp),
- iconf.getConfiguration());
-
- // analyzer is null because we only use addIndexes, not addDocument
- writer =
- new IndexWriter(dir, false, null,
- initGeneration < 0 ? new KeepOnlyLastCommitDeletionPolicy()
- : new MixedDeletionPolicy());
- setParameters(iconf);
- }
-
- /**
- * Process an intermediate form by carrying out, on the Lucene instance of
- * the shard, the deletes and the inserts (a ram index) in the form.
- * @param form the intermediate form containing deletes and a ram index
- * @throws IOException
- */
- public void process(IntermediateForm form) throws IOException {
- // first delete
- Iterator<Term> iter = form.deleteTermIterator();
- while (iter.hasNext()) {
- writer.deleteDocuments(iter.next());
- }
- // then insert
- writer.addIndexesNoOptimize(new Directory[] { form.getDirectory() });
- numForms++;
- }
-
- /**
- * Close the shard writer. Optimize the Lucene instance of the shard before
- * closing if necessary, and copy the files created in the temp directory
- * to the permanent directory after closing.
- * @throws IOException
- */
- public void close() throws IOException {
- LOG.info("Closing the shard writer, processed " + numForms + " forms");
- try {
- try {
- if (maxNumSegments > 0) {
- writer.optimize(maxNumSegments);
- LOG.info("Optimized the shard into at most " + maxNumSegments
- + " segments");
- }
- } finally {
- writer.close();
- LOG.info("Closed Lucene index writer");
- }
-
- moveFromTempToPerm();
- LOG.info("Moved new index files to " + perm);
-
- } finally {
- dir.close();
- LOG.info("Closed the shard writer");
- }
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- public String toString() {
- return this.getClass().getName() + "@" + perm + "&" + temp;
- }
-
- private void setParameters(IndexUpdateConfiguration iconf) {
- int maxFieldLength = iconf.getIndexMaxFieldLength();
- if (maxFieldLength > 0) {
- writer.setMaxFieldLength(maxFieldLength);
- }
- writer.setUseCompoundFile(iconf.getIndexUseCompoundFile());
- maxNumSegments = iconf.getIndexMaxNumSegments();
-
- if (maxFieldLength > 0) {
- LOG.info("sea.max.field.length = " + writer.getMaxFieldLength());
- }
- LOG.info("sea.use.compound.file = " + writer.getUseCompoundFile());
- LOG.info("sea.max.num.segments = " + maxNumSegments);
- }
-
- // in case a previous reduce task fails, restore the generation to
- // the original starting point by deleting the segments.gen file
- // and the segments_N files whose generations are greater than the
- // starting generation; rest of the unwanted files will be deleted
- // once the unwanted segments_N files are deleted
- private void restoreGeneration(FileSystem fs, Path perm, long startGen)
- throws IOException {
-
- FileStatus[] fileStatus = fs.listStatus(perm, new PathFilter() {
- public boolean accept(Path path) {
- return LuceneUtil.isSegmentsFile(path.getName());
- }
- });
-
- // remove the segments_N files whose generation are greater than
- // the starting generation
- for (int i = 0; i < fileStatus.length; i++) {
- Path path = fileStatus[i].getPath();
- if (startGen < LuceneUtil.generationFromSegmentsFileName(path.getName())) {
- fs.delete(path, true);
- }
- }
-
- // always remove segments.gen in case last failed try removed segments_N
- // but not segments.gen, and segments.gen will be overwritten anyway.
- Path segmentsGenFile = new Path(LuceneUtil.IndexFileNames.SEGMENTS_GEN);
- if (fs.exists(segmentsGenFile)) {
- fs.delete(segmentsGenFile, true);
- }
- }
-
- // move the files created in the temp dir into the perm dir
- // and then delete the temp dir from the local FS
- private void moveFromTempToPerm() throws IOException {
- try {
- FileStatus[] fileStatus =
- localFs.listStatus(temp, LuceneIndexFileNameFilter.getFilter());
- Path segmentsPath = null;
- Path segmentsGenPath = null;
-
- // move the files created in temp dir except segments_N and segments.gen
- for (int i = 0; i < fileStatus.length; i++) {
- Path path = fileStatus[i].getPath();
- String name = path.getName();
-
- if (LuceneUtil.isSegmentsGenFile(name)) {
- assert (segmentsGenPath == null);
- segmentsGenPath = path;
- } else if (LuceneUtil.isSegmentsFile(name)) {
- assert (segmentsPath == null);
- segmentsPath = path;
- } else {
- fs.completeLocalOutput(new Path(perm, name), path);
- }
- }
-
- // move the segments_N file
- if (segmentsPath != null) {
- fs.completeLocalOutput(new Path(perm, segmentsPath.getName()),
- segmentsPath);
- }
-
- // move the segments.gen file
- if (segmentsGenPath != null) {
- fs.completeLocalOutput(new Path(perm, segmentsGenPath.getName()),
- segmentsGenPath);
- }
- } finally {
- // finally delete the temp dir (files should have been deleted)
- localFs.delete(temp, true);
- }
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.lucene;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration;
+import org.apache.hadoop.contrib.index.mapred.IntermediateForm;
+import org.apache.hadoop.contrib.index.mapred.Shard;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+
+/**
+ * The initial version of an index is stored in the perm dir. Index files
+ * created by newer versions are written to a temp dir on the local FS. After
+ * successfully creating the new version in the temp dir, the shard writer
+ * moves the new files to the perm dir and deletes the temp dir in close().
+ */
+public class ShardWriter {
+ static final Log LOG = LogFactory.getLog(ShardWriter.class);
+
+ private final FileSystem fs;
+ private final FileSystem localFs;
+ private final Path perm;
+ private final Path temp;
+ private final Directory dir;
+ private final IndexWriter writer;
+ private int maxNumSegments;
+ private long numForms = 0;
+
+ /**
+ * Constructor
+ * @param fs
+ * @param shard
+ * @param tempDir
+ * @param iconf
+ * @throws IOException
+ */
+ public ShardWriter(FileSystem fs, Shard shard, String tempDir,
+ IndexUpdateConfiguration iconf) throws IOException {
+ LOG.info("Construct a shard writer");
+
+ this.fs = fs;
+ localFs = FileSystem.getLocal(iconf.getConfiguration());
+ perm = new Path(shard.getDirectory());
+ temp = new Path(tempDir);
+
+ long initGeneration = shard.getGeneration();
+ if (!fs.exists(perm)) {
+ assert (initGeneration < 0);
+ fs.mkdirs(perm);
+ } else {
+ restoreGeneration(fs, perm, initGeneration);
+ }
+ dir =
+ new MixedDirectory(fs, perm, localFs, fs.startLocalOutput(perm, temp),
+ iconf.getConfiguration());
+
+ // analyzer is null because we only use addIndexes, not addDocument
+ writer =
+ new IndexWriter(dir, false, null,
+ initGeneration < 0 ? new KeepOnlyLastCommitDeletionPolicy()
+ : new MixedDeletionPolicy());
+ setParameters(iconf);
+ }
+
+ /**
+ * Process an intermediate form by carrying out, on the Lucene instance of
+ * the shard, the deletes and the inserts (a ram index) in the form.
+ * @param form the intermediate form containing deletes and a ram index
+ * @throws IOException
+ */
+ public void process(IntermediateForm form) throws IOException {
+ // first delete
+ Iterator<Term> iter = form.deleteTermIterator();
+ while (iter.hasNext()) {
+ writer.deleteDocuments(iter.next());
+ }
+ // then insert
+ writer.addIndexesNoOptimize(new Directory[] { form.getDirectory() });
+ numForms++;
+ }
+
+ /**
+ * Close the shard writer. Optimize the Lucene instance of the shard before
+ * closing if necessary, and copy the files created in the temp directory
+ * to the permanent directory after closing.
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ LOG.info("Closing the shard writer, processed " + numForms + " forms");
+ try {
+ try {
+ if (maxNumSegments > 0) {
+ writer.optimize(maxNumSegments);
+ LOG.info("Optimized the shard into at most " + maxNumSegments
+ + " segments");
+ }
+ } finally {
+ writer.close();
+ LOG.info("Closed Lucene index writer");
+ }
+
+ moveFromTempToPerm();
+ LOG.info("Moved new index files to " + perm);
+
+ } finally {
+ dir.close();
+ LOG.info("Closed the shard writer");
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return this.getClass().getName() + "@" + perm + "&" + temp;
+ }
+
+ private void setParameters(IndexUpdateConfiguration iconf) {
+ int maxFieldLength = iconf.getIndexMaxFieldLength();
+ if (maxFieldLength > 0) {
+ writer.setMaxFieldLength(maxFieldLength);
+ }
+ writer.setUseCompoundFile(iconf.getIndexUseCompoundFile());
+ maxNumSegments = iconf.getIndexMaxNumSegments();
+
+ if (maxFieldLength > 0) {
+ LOG.info("sea.max.field.length = " + writer.getMaxFieldLength());
+ }
+ LOG.info("sea.use.compound.file = " + writer.getUseCompoundFile());
+ LOG.info("sea.max.num.segments = " + maxNumSegments);
+ }
+
+ // in case a previous reduce task fails, restore the generation to
+ // the original starting point by deleting the segments.gen file
+ // and the segments_N files whose generations are greater than the
+ // starting generation; rest of the unwanted files will be deleted
+ // once the unwanted segments_N files are deleted
+ private void restoreGeneration(FileSystem fs, Path perm, long startGen)
+ throws IOException {
+
+ FileStatus[] fileStatus = fs.listStatus(perm, new PathFilter() {
+ public boolean accept(Path path) {
+ return LuceneUtil.isSegmentsFile(path.getName());
+ }
+ });
+
+ // remove the segments_N files whose generation are greater than
+ // the starting generation
+ for (int i = 0; i < fileStatus.length; i++) {
+ Path path = fileStatus[i].getPath();
+ if (startGen < LuceneUtil.generationFromSegmentsFileName(path.getName())) {
+ fs.delete(path, true);
+ }
+ }
+
+ // always remove segments.gen in case last failed try removed segments_N
+ // but not segments.gen, and segments.gen will be overwritten anyway.
+ Path segmentsGenFile = new Path(LuceneUtil.IndexFileNames.SEGMENTS_GEN);
+ if (fs.exists(segmentsGenFile)) {
+ fs.delete(segmentsGenFile, true);
+ }
+ }
+
+ // move the files created in the temp dir into the perm dir
+ // and then delete the temp dir from the local FS
+ private void moveFromTempToPerm() throws IOException {
+ try {
+ FileStatus[] fileStatus =
+ localFs.listStatus(temp, LuceneIndexFileNameFilter.getFilter());
+ Path segmentsPath = null;
+ Path segmentsGenPath = null;
+
+ // move the files created in temp dir except segments_N and segments.gen
+ for (int i = 0; i < fileStatus.length; i++) {
+ Path path = fileStatus[i].getPath();
+ String name = path.getName();
+
+ if (LuceneUtil.isSegmentsGenFile(name)) {
+ assert (segmentsGenPath == null);
+ segmentsGenPath = path;
+ } else if (LuceneUtil.isSegmentsFile(name)) {
+ assert (segmentsPath == null);
+ segmentsPath = path;
+ } else {
+ fs.completeLocalOutput(new Path(perm, name), path);
+ }
+ }
+
+ // move the segments_N file
+ if (segmentsPath != null) {
+ fs.completeLocalOutput(new Path(perm, segmentsPath.getName()),
+ segmentsPath);
+ }
+
+ // move the segments.gen file
+ if (segmentsGenPath != null) {
+ fs.completeLocalOutput(new Path(perm, segmentsGenPath.getName()),
+ segmentsGenPath);
+ }
+ } finally {
+ // finally delete the temp dir (files should have been deleted)
+ localFs.delete(temp, true);
+ }
+ }
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java Tue Oct 16 00:02:55 2012
@@ -1,276 +1,276 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.main;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.Arrays;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration;
-import org.apache.hadoop.contrib.index.mapred.IIndexUpdater;
-import org.apache.hadoop.contrib.index.mapred.Shard;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.main;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration;
+import org.apache.hadoop.contrib.index.mapred.IIndexUpdater;
+import org.apache.hadoop.contrib.index.mapred.Shard;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * A distributed "index" is partitioned into "shards". Each shard corresponds
- * to a Lucene instance. This class contains the main() method which uses a
- * Map/Reduce job to analyze documents and update Lucene instances in parallel.
- *
- * The main() method in UpdateIndex requires the following information for
- * updating the shards:
- * - Input formatter. This specifies how to format the input documents.
- * - Analysis. This defines the analyzer to use on the input. The analyzer
- * determines whether a document is being inserted, updated, or deleted.
- * For inserts or updates, the analyzer also converts each input document
- * into a Lucene document.
- * - Input paths. This provides the location(s) of updated documents,
- * e.g., HDFS files or directories, or HBase tables.
- * - Shard paths, or index path with the number of shards. Either specify
- * the path for each shard, or specify an index path and the shards are
- * the sub-directories of the index directory.
- * - Output path. When the update to a shard is done, a message is put here.
- * - Number of map tasks.
- *
- * All of the information can be specified in a configuration file. All but
- * the first two can also be specified as command line options. Check out
- * conf/index-config.xml.template for other configurable parameters.
- *
- * Note: Because of the parallel nature of Map/Reduce, the behaviour of
- * multiple inserts, deletes or updates to the same document is undefined.
- */
-public class UpdateIndex {
- public static final Log LOG = LogFactory.getLog(UpdateIndex.class);
-
- private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
- static {
- NUMBER_FORMAT.setMinimumIntegerDigits(5);
- NUMBER_FORMAT.setGroupingUsed(false);
- }
-
- private static long now() {
- return System.currentTimeMillis();
- }
-
- private static void printUsage(String cmd) {
- System.err.println("Usage: java " + UpdateIndex.class.getName() + "\n"
- + " -inputPaths <inputPath,inputPath>\n"
- + " -outputPath <outputPath>\n"
- + " -shards <shardDir,shardDir>\n"
- + " -indexPath <indexPath>\n"
- + " -numShards <num>\n"
- + " -numMapTasks <num>\n"
- + " -conf <confPath>\n"
- + "Note: Do not use both -shards option and -indexPath option.");
- }
-
- private static String getIndexPath(Configuration conf) {
- return conf.get("sea.index.path");
- }
-
- private static int getNumShards(Configuration conf) {
- return conf.getInt("sea.num.shards", 1);
- }
-
- private static Shard[] createShards(String indexPath, int numShards,
- Configuration conf) throws IOException {
-
- String parent = Shard.normalizePath(indexPath) + Path.SEPARATOR;
- long versionNumber = -1;
- long generation = -1;
-
- FileSystem fs = FileSystem.get(conf);
- Path path = new Path(indexPath);
-
- if (fs.exists(path)) {
- FileStatus[] fileStatus = fs.listStatus(path);
- String[] shardNames = new String[fileStatus.length];
- int count = 0;
- for (int i = 0; i < fileStatus.length; i++) {
- if (fileStatus[i].isDirectory()) {
- shardNames[count] = fileStatus[i].getPath().getName();
- count++;
- }
- }
- Arrays.sort(shardNames, 0, count);
-
- Shard[] shards = new Shard[count >= numShards ? count : numShards];
- for (int i = 0; i < count; i++) {
- shards[i] =
- new Shard(versionNumber, parent + shardNames[i], generation);
- }
-
- int number = count;
- for (int i = count; i < numShards; i++) {
- String shardPath;
- while (true) {
- shardPath = parent + NUMBER_FORMAT.format(number++);
- if (!fs.exists(new Path(shardPath))) {
- break;
- }
- }
- shards[i] = new Shard(versionNumber, shardPath, generation);
- }
- return shards;
- } else {
- Shard[] shards = new Shard[numShards];
- for (int i = 0; i < shards.length; i++) {
- shards[i] =
- new Shard(versionNumber, parent + NUMBER_FORMAT.format(i),
- generation);
- }
- return shards;
- }
- }
-
- /**
- * The main() method
- * @param argv
- */
- public static void main(String[] argv) {
- if (argv.length == 0) {
- printUsage("");
- System.exit(-1);
- }
-
- String inputPathsString = null;
- Path outputPath = null;
- String shardsString = null;
- String indexPath = null;
- int numShards = -1;
- int numMapTasks = -1;
- Configuration conf = new Configuration();
- String confPath = null;
-
- // parse the command line
- for (int i = 0; i < argv.length; i++) { // parse command line
- if (argv[i].equals("-inputPaths")) {
- inputPathsString = argv[++i];
- } else if (argv[i].equals("-outputPath")) {
- outputPath = new Path(argv[++i]);
- } else if (argv[i].equals("-shards")) {
- shardsString = argv[++i];
- } else if (argv[i].equals("-indexPath")) {
- indexPath = argv[++i];
- } else if (argv[i].equals("-numShards")) {
- numShards = Integer.parseInt(argv[++i]);
- } else if (argv[i].equals("-numMapTasks")) {
- numMapTasks = Integer.parseInt(argv[++i]);
- } else if (argv[i].equals("-conf")) {
- // add as a local FS resource
- confPath = argv[++i];
- conf.addResource(new Path(confPath));
- } else {
- System.out.println("Unknown option " + argv[i] + " w/ value "
- + argv[++i]);
- }
- }
- LOG.info("inputPaths = " + inputPathsString);
- LOG.info("outputPath = " + outputPath);
- LOG.info("shards = " + shardsString);
- LOG.info("indexPath = " + indexPath);
- LOG.info("numShards = " + numShards);
- LOG.info("numMapTasks= " + numMapTasks);
- LOG.info("confPath = " + confPath);
-
- Path[] inputPaths = null;
- Shard[] shards = null;
-
- JobConf jobConf = new JobConf(conf);
- IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(jobConf);
-
- if (inputPathsString != null) {
- jobConf.set(org.apache.hadoop.mapreduce.lib.input.
- FileInputFormat.INPUT_DIR, inputPathsString);
- }
- inputPaths = FileInputFormat.getInputPaths(jobConf);
- if (inputPaths.length == 0) {
- inputPaths = null;
- }
-
- if (outputPath == null) {
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A distributed "index" is partitioned into "shards". Each shard corresponds
+ * to a Lucene instance. This class contains the main() method which uses a
+ * Map/Reduce job to analyze documents and update Lucene instances in parallel.
+ *
+ * The main() method in UpdateIndex requires the following information for
+ * updating the shards:
+ * - Input formatter. This specifies how to format the input documents.
+ * - Analysis. This defines the analyzer to use on the input. The analyzer
+ * determines whether a document is being inserted, updated, or deleted.
+ * For inserts or updates, the analyzer also converts each input document
+ * into a Lucene document.
+ * - Input paths. This provides the location(s) of updated documents,
+ * e.g., HDFS files or directories, or HBase tables.
+ * - Shard paths, or index path with the number of shards. Either specify
+ * the path for each shard, or specify an index path and the shards are
+ * the sub-directories of the index directory.
+ * - Output path. When the update to a shard is done, a message is put here.
+ * - Number of map tasks.
+ *
+ * All of the information can be specified in a configuration file. All but
+ * the first two can also be specified as command line options. Check out
+ * conf/index-config.xml.template for other configurable parameters.
+ *
+ * Note: Because of the parallel nature of Map/Reduce, the behaviour of
+ * multiple inserts, deletes or updates to the same document is undefined.
+ */
+public class UpdateIndex {
+ public static final Log LOG = LogFactory.getLog(UpdateIndex.class);
+
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ private static long now() {
+ return System.currentTimeMillis();
+ }
+
+ private static void printUsage(String cmd) {
+ System.err.println("Usage: java " + UpdateIndex.class.getName() + "\n"
+ + " -inputPaths <inputPath,inputPath>\n"
+ + " -outputPath <outputPath>\n"
+ + " -shards <shardDir,shardDir>\n"
+ + " -indexPath <indexPath>\n"
+ + " -numShards <num>\n"
+ + " -numMapTasks <num>\n"
+ + " -conf <confPath>\n"
+ + "Note: Do not use both -shards option and -indexPath option.");
+ }
+
+ private static String getIndexPath(Configuration conf) {
+ return conf.get("sea.index.path");
+ }
+
+ private static int getNumShards(Configuration conf) {
+ return conf.getInt("sea.num.shards", 1);
+ }
+
+ private static Shard[] createShards(String indexPath, int numShards,
+ Configuration conf) throws IOException {
+
+ String parent = Shard.normalizePath(indexPath) + Path.SEPARATOR;
+ long versionNumber = -1;
+ long generation = -1;
+
+ FileSystem fs = FileSystem.get(conf);
+ Path path = new Path(indexPath);
+
+ if (fs.exists(path)) {
+ FileStatus[] fileStatus = fs.listStatus(path);
+ String[] shardNames = new String[fileStatus.length];
+ int count = 0;
+ for (int i = 0; i < fileStatus.length; i++) {
+ if (fileStatus[i].isDirectory()) {
+ shardNames[count] = fileStatus[i].getPath().getName();
+ count++;
+ }
+ }
+ Arrays.sort(shardNames, 0, count);
+
+ Shard[] shards = new Shard[count >= numShards ? count : numShards];
+ for (int i = 0; i < count; i++) {
+ shards[i] =
+ new Shard(versionNumber, parent + shardNames[i], generation);
+ }
+
+ int number = count;
+ for (int i = count; i < numShards; i++) {
+ String shardPath;
+ while (true) {
+ shardPath = parent + NUMBER_FORMAT.format(number++);
+ if (!fs.exists(new Path(shardPath))) {
+ break;
+ }
+ }
+ shards[i] = new Shard(versionNumber, shardPath, generation);
+ }
+ return shards;
+ } else {
+ Shard[] shards = new Shard[numShards];
+ for (int i = 0; i < shards.length; i++) {
+ shards[i] =
+ new Shard(versionNumber, parent + NUMBER_FORMAT.format(i),
+ generation);
+ }
+ return shards;
+ }
+ }
+
+ /**
+ * The main() method
+ * @param argv
+ */
+ public static void main(String[] argv) {
+ if (argv.length == 0) {
+ printUsage("");
+ System.exit(-1);
+ }
+
+ String inputPathsString = null;
+ Path outputPath = null;
+ String shardsString = null;
+ String indexPath = null;
+ int numShards = -1;
+ int numMapTasks = -1;
+ Configuration conf = new Configuration();
+ String confPath = null;
+
+ // parse the command line
+ for (int i = 0; i < argv.length; i++) { // parse command line
+ if (argv[i].equals("-inputPaths")) {
+ inputPathsString = argv[++i];
+ } else if (argv[i].equals("-outputPath")) {
+ outputPath = new Path(argv[++i]);
+ } else if (argv[i].equals("-shards")) {
+ shardsString = argv[++i];
+ } else if (argv[i].equals("-indexPath")) {
+ indexPath = argv[++i];
+ } else if (argv[i].equals("-numShards")) {
+ numShards = Integer.parseInt(argv[++i]);
+ } else if (argv[i].equals("-numMapTasks")) {
+ numMapTasks = Integer.parseInt(argv[++i]);
+ } else if (argv[i].equals("-conf")) {
+ // add as a local FS resource
+ confPath = argv[++i];
+ conf.addResource(new Path(confPath));
+ } else {
+ System.out.println("Unknown option " + argv[i] + " w/ value "
+ + argv[++i]);
+ }
+ }
+ LOG.info("inputPaths = " + inputPathsString);
+ LOG.info("outputPath = " + outputPath);
+ LOG.info("shards = " + shardsString);
+ LOG.info("indexPath = " + indexPath);
+ LOG.info("numShards = " + numShards);
+ LOG.info("numMapTasks= " + numMapTasks);
+ LOG.info("confPath = " + confPath);
+
+ Path[] inputPaths = null;
+ Shard[] shards = null;
+
+ JobConf jobConf = new JobConf(conf);
+ IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(jobConf);
+
+ if (inputPathsString != null) {
+ jobConf.set(org.apache.hadoop.mapreduce.lib.input.
+ FileInputFormat.INPUT_DIR, inputPathsString);
+ }
+ inputPaths = FileInputFormat.getInputPaths(jobConf);
+ if (inputPaths.length == 0) {
+ inputPaths = null;
+ }
+
+ if (outputPath == null) {
outputPath = FileOutputFormat.getOutputPath(jobConf);
- }
-
- if (inputPaths == null || outputPath == null) {
- System.err.println("InputPaths and outputPath must be specified.");
- printUsage("");
- System.exit(-1);
- }
-
- if (shardsString != null) {
- iconf.setIndexShards(shardsString);
- }
- shards = Shard.getIndexShards(iconf);
- if (shards != null && shards.length == 0) {
- shards = null;
- }
-
- if (indexPath == null) {
- indexPath = getIndexPath(conf);
- }
- if (numShards <= 0) {
- numShards = getNumShards(conf);
- }
-
- if (shards == null && indexPath == null) {
- System.err.println("Either shards or indexPath must be specified.");
- printUsage("");
- System.exit(-1);
- }
-
- if (numMapTasks <= 0) {
- numMapTasks = jobConf.getNumMapTasks();
- }
-
- try {
- // create shards and set their directories if necessary
- if (shards == null) {
- shards = createShards(indexPath, numShards, conf);
- }
-
- long startTime = now();
- try {
- IIndexUpdater updater =
- (IIndexUpdater) ReflectionUtils.newInstance(
- iconf.getIndexUpdaterClass(), conf);
- LOG.info("sea.index.updater = "
- + iconf.getIndexUpdaterClass().getName());
-
- updater.run(conf, inputPaths, outputPath, numMapTasks, shards);
- LOG.info("Index update job is done");
-
- } finally {
- long elapsedTime = now() - startTime;
- LOG.info("Elapsed time is " + (elapsedTime / 1000) + "s");
- System.out.println("Elapsed time is " + (elapsedTime / 1000) + "s");
- }
- } catch (Exception e) {
- e.printStackTrace(System.err);
- }
- }
-}
+ }
+
+ if (inputPaths == null || outputPath == null) {
+ System.err.println("InputPaths and outputPath must be specified.");
+ printUsage("");
+ System.exit(-1);
+ }
+
+ if (shardsString != null) {
+ iconf.setIndexShards(shardsString);
+ }
+ shards = Shard.getIndexShards(iconf);
+ if (shards != null && shards.length == 0) {
+ shards = null;
+ }
+
+ if (indexPath == null) {
+ indexPath = getIndexPath(conf);
+ }
+ if (numShards <= 0) {
+ numShards = getNumShards(conf);
+ }
+
+ if (shards == null && indexPath == null) {
+ System.err.println("Either shards or indexPath must be specified.");
+ printUsage("");
+ System.exit(-1);
+ }
+
+ if (numMapTasks <= 0) {
+ numMapTasks = jobConf.getNumMapTasks();
+ }
+
+ try {
+ // create shards and set their directories if necessary
+ if (shards == null) {
+ shards = createShards(indexPath, numShards, conf);
+ }
+
+ long startTime = now();
+ try {
+ IIndexUpdater updater =
+ (IIndexUpdater) ReflectionUtils.newInstance(
+ iconf.getIndexUpdaterClass(), conf);
+ LOG.info("sea.index.updater = "
+ + iconf.getIndexUpdaterClass().getName());
+
+ updater.run(conf, inputPaths, outputPath, numMapTasks, shards);
+ LOG.info("Index update job is done");
+
+ } finally {
+ long elapsedTime = now() - startTime;
+ LOG.info("Elapsed time is " + (elapsedTime / 1000) + "s");
+ System.out.println("Elapsed time is " + (elapsedTime / 1000) + "s");
+ }
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ }
+ }
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java Tue Oct 16 00:02:55 2012
@@ -1,208 +1,208 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.Term;
-
-/**
- * This class represents an indexing operation. The operation can be an insert,
- * a delete or an update. If the operation is an insert or an update, a (new)
- * document must be specified. If the operation is a delete or an update, a
- * delete term must be specified.
- */
-public class DocumentAndOp implements Writable {
-
- /**
- * This class represents the type of an operation - an insert, a delete or
- * an update.
- */
- public static final class Op {
- public static final Op INSERT = new Op("INSERT");
- public static final Op DELETE = new Op("DELETE");
- public static final Op UPDATE = new Op("UPDATE");
-
- private String name;
-
- private Op(String name) {
- this.name = name;
- }
-
- public String toString() {
- return name;
- }
- }
-
- private Op op;
- private Document doc;
- private Term term;
-
- /**
- * Constructor for no operation.
- */
- public DocumentAndOp() {
- }
-
- /**
- * Constructor for an insert operation.
- * @param op
- * @param doc
- */
- public DocumentAndOp(Op op, Document doc) {
- assert (op == Op.INSERT);
- this.op = op;
- this.doc = doc;
- this.term = null;
- }
-
- /**
- * Constructor for a delete operation.
- * @param op
- * @param term
- */
- public DocumentAndOp(Op op, Term term) {
- assert (op == Op.DELETE);
- this.op = op;
- this.doc = null;
- this.term = term;
- }
-
- /**
- * Constructor for an insert, a delete or an update operation.
- * @param op
- * @param doc
- * @param term
- */
- public DocumentAndOp(Op op, Document doc, Term term) {
- if (op == Op.INSERT) {
- assert (doc != null);
- assert (term == null);
- } else if (op == Op.DELETE) {
- assert (doc == null);
- assert (term != null);
- } else {
- assert (op == Op.UPDATE);
- assert (doc != null);
- assert (term != null);
- }
- this.op = op;
- this.doc = doc;
- this.term = term;
- }
-
- /**
- * Set the instance to be an insert operation.
- * @param doc
- */
- public void setInsert(Document doc) {
- this.op = Op.INSERT;
- this.doc = doc;
- this.term = null;
- }
-
- /**
- * Set the instance to be a delete operation.
- * @param term
- */
- public void setDelete(Term term) {
- this.op = Op.DELETE;
- this.doc = null;
- this.term = term;
- }
-
- /**
- * Set the instance to be an update operation.
- * @param doc
- * @param term
- */
- public void setUpdate(Document doc, Term term) {
- this.op = Op.UPDATE;
- this.doc = doc;
- this.term = term;
- }
-
- /**
- * Get the type of operation.
- * @return the type of the operation.
- */
- public Op getOp() {
- return op;
- }
-
- /**
- * Get the document.
- * @return the document
- */
- public Document getDocument() {
- return doc;
- }
-
- /**
- * Get the term.
- * @return the term
- */
- public Term getTerm() {
- return term;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- public String toString() {
- StringBuilder buffer = new StringBuilder();
- buffer.append(this.getClass().getName());
- buffer.append("[op=");
- buffer.append(op);
- buffer.append(", doc=");
- if (doc != null) {
- buffer.append(doc);
- } else {
- buffer.append("null");
- }
- buffer.append(", term=");
- if (term != null) {
- buffer.append(term);
- } else {
- buffer.append("null");
- }
- buffer.append("]");
- return buffer.toString();
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
- */
- public void write(DataOutput out) throws IOException {
- throw new IOException(this.getClass().getName()
- + ".write should never be called");
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
- */
- public void readFields(DataInput in) throws IOException {
- throw new IOException(this.getClass().getName()
- + ".readFields should never be called");
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.Term;
+
+/**
+ * This class represents an indexing operation. The operation can be an insert,
+ * a delete or an update. If the operation is an insert or an update, a (new)
+ * document must be specified. If the operation is a delete or an update, a
+ * delete term must be specified.
+ */
+public class DocumentAndOp implements Writable {
+
+ /**
+ * This class represents the type of an operation - an insert, a delete or
+ * an update.
+ */
+ public static final class Op {
+ public static final Op INSERT = new Op("INSERT");
+ public static final Op DELETE = new Op("DELETE");
+ public static final Op UPDATE = new Op("UPDATE");
+
+ private String name;
+
+ private Op(String name) {
+ this.name = name;
+ }
+
+ public String toString() {
+ return name;
+ }
+ }
+
+ private Op op;
+ private Document doc;
+ private Term term;
+
+ /**
+ * Constructor for no operation.
+ */
+ public DocumentAndOp() {
+ }
+
+ /**
+ * Constructor for an insert operation.
+ * @param op
+ * @param doc
+ */
+ public DocumentAndOp(Op op, Document doc) {
+ assert (op == Op.INSERT);
+ this.op = op;
+ this.doc = doc;
+ this.term = null;
+ }
+
+ /**
+ * Constructor for a delete operation.
+ * @param op
+ * @param term
+ */
+ public DocumentAndOp(Op op, Term term) {
+ assert (op == Op.DELETE);
+ this.op = op;
+ this.doc = null;
+ this.term = term;
+ }
+
+ /**
+ * Constructor for an insert, a delete or an update operation.
+ * @param op
+ * @param doc
+ * @param term
+ */
+ public DocumentAndOp(Op op, Document doc, Term term) {
+ if (op == Op.INSERT) {
+ assert (doc != null);
+ assert (term == null);
+ } else if (op == Op.DELETE) {
+ assert (doc == null);
+ assert (term != null);
+ } else {
+ assert (op == Op.UPDATE);
+ assert (doc != null);
+ assert (term != null);
+ }
+ this.op = op;
+ this.doc = doc;
+ this.term = term;
+ }
+
+ /**
+ * Set the instance to be an insert operation.
+ * @param doc
+ */
+ public void setInsert(Document doc) {
+ this.op = Op.INSERT;
+ this.doc = doc;
+ this.term = null;
+ }
+
+ /**
+ * Set the instance to be a delete operation.
+ * @param term
+ */
+ public void setDelete(Term term) {
+ this.op = Op.DELETE;
+ this.doc = null;
+ this.term = term;
+ }
+
+ /**
+ * Set the instance to be an update operation.
+ * @param doc
+ * @param term
+ */
+ public void setUpdate(Document doc, Term term) {
+ this.op = Op.UPDATE;
+ this.doc = doc;
+ this.term = term;
+ }
+
+ /**
+ * Get the type of operation.
+ * @return the type of the operation.
+ */
+ public Op getOp() {
+ return op;
+ }
+
+ /**
+ * Get the document.
+ * @return the document
+ */
+ public Document getDocument() {
+ return doc;
+ }
+
+ /**
+ * Get the term.
+ * @return the term
+ */
+ public Term getTerm() {
+ return term;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(this.getClass().getName());
+ buffer.append("[op=");
+ buffer.append(op);
+ buffer.append(", doc=");
+ if (doc != null) {
+ buffer.append(doc);
+ } else {
+ buffer.append("null");
+ }
+ buffer.append(", term=");
+ if (term != null) {
+ buffer.append(term);
+ } else {
+ buffer.append("null");
+ }
+ buffer.append("]");
+ return buffer.toString();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ public void write(DataOutput out) throws IOException {
+ throw new IOException(this.getClass().getName()
+ + ".write should never be called");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ public void readFields(DataInput in) throws IOException {
+ throw new IOException(this.getClass().getName()
+ + ".readFields should never be called");
+ }
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java Tue Oct 16 00:02:55 2012
@@ -1,89 +1,89 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * The class represents a document id, which is of type text.
- */
-public class DocumentID implements WritableComparable {
- private final Text docID;
-
- /**
- * Constructor.
- */
- public DocumentID() {
- docID = new Text();
- }
-
- /**
- * The text of the document id.
- * @return the text
- */
- public Text getText() {
- return docID;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Comparable#compareTo(java.lang.Object)
- */
- public int compareTo(Object obj) {
- if (this == obj) {
- return 0;
- } else {
- return docID.compareTo(((DocumentID) obj).docID);
- }
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#hashCode()
- */
- public int hashCode() {
- return docID.hashCode();
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- public String toString() {
- return this.getClass().getName() + "[" + docID + "]";
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
- */
- public void write(DataOutput out) throws IOException {
- throw new IOException(this.getClass().getName()
- + ".write should never be called");
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
- */
- public void readFields(DataInput in) throws IOException {
- throw new IOException(this.getClass().getName()
- + ".readFields should never be called");
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The class represents a document id, which is of type text.
+ */
+public class DocumentID implements WritableComparable {
+ private final Text docID;
+
+ /**
+ * Constructor.
+ */
+ public DocumentID() {
+ docID = new Text();
+ }
+
+ /**
+ * The text of the document id.
+ * @return the text
+ */
+ public Text getText() {
+ return docID;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ public int compareTo(Object obj) {
+ if (this == obj) {
+ return 0;
+ } else {
+ return docID.compareTo(((DocumentID) obj).docID);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return docID.hashCode();
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return this.getClass().getName() + "[" + docID + "]";
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ public void write(DataOutput out) throws IOException {
+ throw new IOException(this.getClass().getName()
+ + ".write should never be called");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ public void readFields(DataInput in) throws IOException {
+ throw new IOException(this.getClass().getName()
+ + ".readFields should never be called");
+ }
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java Tue Oct 16 00:02:55 2012
@@ -1,50 +1,50 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-/**
- * A distribution policy decides, given a document with a document id, which
- * one shard the request should be sent to if the request is an insert, and
- * which shard(s) the request should be sent to if the request is a delete.
- */
-public interface IDistributionPolicy {
-
- /**
- * Initialization. It must be called before any chooseShard() is called.
- * @param shards
- */
- void init(Shard[] shards);
-
- /**
- * Choose a shard to send an insert request.
- * @param key
- * @return the index of the chosen shard
- */
- int chooseShardForInsert(DocumentID key);
-
- /**
- * Choose a shard or all shards to send a delete request. E.g. a round-robin
- * distribution policy would send a delete request to all the shards.
- * -1 represents all the shards.
- * @param key
- * @return the index of the chosen shard, -1 if all the shards are chosen
- */
- int chooseShardForDelete(DocumentID key);
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+/**
+ * A distribution policy decides, given a document with a document id, which
+ * one shard the request should be sent to if the request is an insert, and
+ * which shard(s) the request should be sent to if the request is a delete.
+ */
+public interface IDistributionPolicy {
+
+ /**
+ * Initialization. It must be called before any chooseShard() is called.
+ * @param shards
+ */
+ void init(Shard[] shards);
+
+ /**
+ * Choose a shard to send an insert request.
+ * @param key
+ * @return the index of the chosen shard
+ */
+ int chooseShardForInsert(DocumentID key);
+
+ /**
+ * Choose a shard or all shards to send a delete request. E.g. a round-robin
+ * distribution policy would send a delete request to all the shards.
+ * -1 represents all the shards.
+ * @param key
+ * @return the index of the chosen shard, -1 if all the shards are chosen
+ */
+ int chooseShardForDelete(DocumentID key);
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java Tue Oct 16 00:02:55 2012
@@ -1,46 +1,46 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-/**
- * A class implements an index updater interface should create a Map/Reduce job
- * configuration and run the Map/Reduce job to analyze documents and update
- * Lucene instances in parallel.
- */
-public interface IIndexUpdater {
-
- /**
- * Create a Map/Reduce job configuration and run the Map/Reduce job to
- * analyze documents and update Lucene instances in parallel.
- * @param conf
- * @param inputPaths
- * @param outputPath
- * @param numMapTasks
- * @param shards
- * @throws IOException
- */
- void run(Configuration conf, Path[] inputPaths, Path outputPath,
- int numMapTasks, Shard[] shards) throws IOException;
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A class implements an index updater interface should create a Map/Reduce job
+ * configuration and run the Map/Reduce job to analyze documents and update
+ * Lucene instances in parallel.
+ */
+public interface IIndexUpdater {
+
+ /**
+ * Create a Map/Reduce job configuration and run the Map/Reduce job to
+ * analyze documents and update Lucene instances in parallel.
+ * @param conf
+ * @param inputPaths
+ * @param outputPath
+ * @param numMapTasks
+ * @param shards
+ * @throws IOException
+ */
+ void run(Configuration conf, Path[] inputPaths, Path outputPath,
+ int numMapTasks, Shard[] shards) throws IOException;
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java Tue Oct 16 00:02:55 2012
@@ -1,32 +1,32 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.Mapper;
-
-/**
- * Application specific local analysis. The output type must be (DocumentID,
- * DocumentAndOp).
- */
-public interface ILocalAnalysis<K extends WritableComparable, V extends Writable>
- extends Mapper<K, V, DocumentID, DocumentAndOp> {
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Mapper;
+
+/**
+ * Application specific local analysis. The output type must be (DocumentID,
+ * DocumentAndOp).
+ */
+public interface ILocalAnalysis<K extends WritableComparable, V extends Writable>
+ extends Mapper<K, V, DocumentID, DocumentAndOp> {
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java Tue Oct 16 00:02:55 2012
@@ -1,111 +1,111 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This combiner combines multiple intermediate forms into one intermediate
- * form. More specifically, the input intermediate forms are a single-document
- * ram index and/or a single delete term. An output intermediate form contains
- * a multi-document ram index and/or multiple delete terms.
- */
-public class IndexUpdateCombiner extends MapReduceBase implements
- Reducer<Shard, IntermediateForm, Shard, IntermediateForm> {
- static final Log LOG = LogFactory.getLog(IndexUpdateCombiner.class);
-
- IndexUpdateConfiguration iconf;
- long maxSizeInBytes;
- long nearMaxSizeInBytes;
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
- */
- public void reduce(Shard key, Iterator<IntermediateForm> values,
- OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
- throws IOException {
-
- String message = key.toString();
- IntermediateForm form = null;
-
- while (values.hasNext()) {
- IntermediateForm singleDocForm = values.next();
- long formSize = form == null ? 0 : form.totalSizeInBytes();
- long singleDocFormSize = singleDocForm.totalSizeInBytes();
-
- if (form != null && formSize + singleDocFormSize > maxSizeInBytes) {
- closeForm(form, message);
- output.collect(key, form);
- form = null;
- }
-
- if (form == null && singleDocFormSize >= nearMaxSizeInBytes) {
- output.collect(key, singleDocForm);
- } else {
- if (form == null) {
- form = createForm(message);
- }
- form.process(singleDocForm);
- }
- }
-
- if (form != null) {
- closeForm(form, message);
- output.collect(key, form);
- }
- }
-
- private IntermediateForm createForm(String message) throws IOException {
- LOG.info("Construct a form writer for " + message);
- IntermediateForm form = new IntermediateForm();
- form.configure(iconf);
- return form;
- }
-
- private void closeForm(IntermediateForm form, String message)
- throws IOException {
- form.closeWriter();
- LOG.info("Closed the form writer for " + message + ", form = " + form);
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
- */
- public void configure(JobConf job) {
- iconf = new IndexUpdateConfiguration(job);
- maxSizeInBytes = iconf.getMaxRAMSizeInBytes();
- nearMaxSizeInBytes = maxSizeInBytes - (maxSizeInBytes >>> 3); // 7/8 of max
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#close()
- */
- public void close() throws IOException {
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This combiner combines multiple intermediate forms into one intermediate
+ * form. More specifically, the input intermediate forms are a single-document
+ * ram index and/or a single delete term. An output intermediate form contains
+ * a multi-document ram index and/or multiple delete terms.
+ */
+public class IndexUpdateCombiner extends MapReduceBase implements
+ Reducer<Shard, IntermediateForm, Shard, IntermediateForm> {
+ static final Log LOG = LogFactory.getLog(IndexUpdateCombiner.class);
+
+ IndexUpdateConfiguration iconf;
+ long maxSizeInBytes;
+ long nearMaxSizeInBytes;
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ public void reduce(Shard key, Iterator<IntermediateForm> values,
+ OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
+ throws IOException {
+
+ String message = key.toString();
+ IntermediateForm form = null;
+
+ while (values.hasNext()) {
+ IntermediateForm singleDocForm = values.next();
+ long formSize = form == null ? 0 : form.totalSizeInBytes();
+ long singleDocFormSize = singleDocForm.totalSizeInBytes();
+
+ if (form != null && formSize + singleDocFormSize > maxSizeInBytes) {
+ closeForm(form, message);
+ output.collect(key, form);
+ form = null;
+ }
+
+ if (form == null && singleDocFormSize >= nearMaxSizeInBytes) {
+ output.collect(key, singleDocForm);
+ } else {
+ if (form == null) {
+ form = createForm(message);
+ }
+ form.process(singleDocForm);
+ }
+ }
+
+ if (form != null) {
+ closeForm(form, message);
+ output.collect(key, form);
+ }
+ }
+
+ private IntermediateForm createForm(String message) throws IOException {
+ LOG.info("Construct a form writer for " + message);
+ IntermediateForm form = new IntermediateForm();
+ form.configure(iconf);
+ return form;
+ }
+
+ private void closeForm(IntermediateForm form, String message)
+ throws IOException {
+ form.closeWriter();
+ LOG.info("Closed the form writer for " + message + ", form = " + form);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ iconf = new IndexUpdateConfiguration(job);
+ maxSizeInBytes = iconf.getMaxRAMSizeInBytes();
+ nearMaxSizeInBytes = maxSizeInBytes - (maxSizeInBytes >>> 3); // 7/8 of max
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#close()
+ */
+ public void close() throws IOException {
+ }
+
+}
|