Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java Fri Oct 19 02:25:55 2012
@@ -1,208 +1,208 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.Term;
-
-/**
- * This class represents an indexing operation. The operation can be an insert,
- * a delete or an update. If the operation is an insert or an update, a (new)
- * document must be specified. If the operation is a delete or an update, a
- * delete term must be specified.
- */
-public class DocumentAndOp implements Writable {
-
- /**
- * This class represents the type of an operation - an insert, a delete or
- * an update.
- */
- public static final class Op {
- public static final Op INSERT = new Op("INSERT");
- public static final Op DELETE = new Op("DELETE");
- public static final Op UPDATE = new Op("UPDATE");
-
- private String name;
-
- private Op(String name) {
- this.name = name;
- }
-
- public String toString() {
- return name;
- }
- }
-
- private Op op;
- private Document doc;
- private Term term;
-
- /**
- * Constructor for no operation.
- */
- public DocumentAndOp() {
- }
-
- /**
- * Constructor for an insert operation.
- * @param op
- * @param doc
- */
- public DocumentAndOp(Op op, Document doc) {
- assert (op == Op.INSERT);
- this.op = op;
- this.doc = doc;
- this.term = null;
- }
-
- /**
- * Constructor for a delete operation.
- * @param op
- * @param term
- */
- public DocumentAndOp(Op op, Term term) {
- assert (op == Op.DELETE);
- this.op = op;
- this.doc = null;
- this.term = term;
- }
-
- /**
- * Constructor for an insert, a delete or an update operation.
- * @param op
- * @param doc
- * @param term
- */
- public DocumentAndOp(Op op, Document doc, Term term) {
- if (op == Op.INSERT) {
- assert (doc != null);
- assert (term == null);
- } else if (op == Op.DELETE) {
- assert (doc == null);
- assert (term != null);
- } else {
- assert (op == Op.UPDATE);
- assert (doc != null);
- assert (term != null);
- }
- this.op = op;
- this.doc = doc;
- this.term = term;
- }
-
- /**
- * Set the instance to be an insert operation.
- * @param doc
- */
- public void setInsert(Document doc) {
- this.op = Op.INSERT;
- this.doc = doc;
- this.term = null;
- }
-
- /**
- * Set the instance to be a delete operation.
- * @param term
- */
- public void setDelete(Term term) {
- this.op = Op.DELETE;
- this.doc = null;
- this.term = term;
- }
-
- /**
- * Set the instance to be an update operation.
- * @param doc
- * @param term
- */
- public void setUpdate(Document doc, Term term) {
- this.op = Op.UPDATE;
- this.doc = doc;
- this.term = term;
- }
-
- /**
- * Get the type of operation.
- * @return the type of the operation.
- */
- public Op getOp() {
- return op;
- }
-
- /**
- * Get the document.
- * @return the document
- */
- public Document getDocument() {
- return doc;
- }
-
- /**
- * Get the term.
- * @return the term
- */
- public Term getTerm() {
- return term;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- public String toString() {
- StringBuilder buffer = new StringBuilder();
- buffer.append(this.getClass().getName());
- buffer.append("[op=");
- buffer.append(op);
- buffer.append(", doc=");
- if (doc != null) {
- buffer.append(doc);
- } else {
- buffer.append("null");
- }
- buffer.append(", term=");
- if (term != null) {
- buffer.append(term);
- } else {
- buffer.append("null");
- }
- buffer.append("]");
- return buffer.toString();
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
- */
- public void write(DataOutput out) throws IOException {
- throw new IOException(this.getClass().getName()
- + ".write should never be called");
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
- */
- public void readFields(DataInput in) throws IOException {
- throw new IOException(this.getClass().getName()
- + ".readFields should never be called");
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.Term;
+
+/**
+ * This class represents an indexing operation. The operation can be an insert,
+ * a delete or an update. If the operation is an insert or an update, a (new)
+ * document must be specified. If the operation is a delete or an update, a
+ * delete term must be specified.
+ */
+public class DocumentAndOp implements Writable {
+
+ /**
+ * This class represents the type of an operation - an insert, a delete or
+ * an update.
+ */
+ public static final class Op {
+ public static final Op INSERT = new Op("INSERT");
+ public static final Op DELETE = new Op("DELETE");
+ public static final Op UPDATE = new Op("UPDATE");
+
+ private String name;
+
+ private Op(String name) {
+ this.name = name;
+ }
+
+ public String toString() {
+ return name;
+ }
+ }
+
+ private Op op;
+ private Document doc;
+ private Term term;
+
+ /**
+ * Constructor for no operation.
+ */
+ public DocumentAndOp() {
+ }
+
+ /**
+ * Constructor for an insert operation.
+ * @param op
+ * @param doc
+ */
+ public DocumentAndOp(Op op, Document doc) {
+ assert (op == Op.INSERT);
+ this.op = op;
+ this.doc = doc;
+ this.term = null;
+ }
+
+ /**
+ * Constructor for a delete operation.
+ * @param op
+ * @param term
+ */
+ public DocumentAndOp(Op op, Term term) {
+ assert (op == Op.DELETE);
+ this.op = op;
+ this.doc = null;
+ this.term = term;
+ }
+
+ /**
+ * Constructor for an insert, a delete or an update operation.
+ * @param op
+ * @param doc
+ * @param term
+ */
+ public DocumentAndOp(Op op, Document doc, Term term) {
+ if (op == Op.INSERT) {
+ assert (doc != null);
+ assert (term == null);
+ } else if (op == Op.DELETE) {
+ assert (doc == null);
+ assert (term != null);
+ } else {
+ assert (op == Op.UPDATE);
+ assert (doc != null);
+ assert (term != null);
+ }
+ this.op = op;
+ this.doc = doc;
+ this.term = term;
+ }
+
+ /**
+ * Set the instance to be an insert operation.
+ * @param doc
+ */
+ public void setInsert(Document doc) {
+ this.op = Op.INSERT;
+ this.doc = doc;
+ this.term = null;
+ }
+
+ /**
+ * Set the instance to be a delete operation.
+ * @param term
+ */
+ public void setDelete(Term term) {
+ this.op = Op.DELETE;
+ this.doc = null;
+ this.term = term;
+ }
+
+ /**
+ * Set the instance to be an update operation.
+ * @param doc
+ * @param term
+ */
+ public void setUpdate(Document doc, Term term) {
+ this.op = Op.UPDATE;
+ this.doc = doc;
+ this.term = term;
+ }
+
+ /**
+ * Get the type of operation.
+ * @return the type of the operation.
+ */
+ public Op getOp() {
+ return op;
+ }
+
+ /**
+ * Get the document.
+ * @return the document
+ */
+ public Document getDocument() {
+ return doc;
+ }
+
+ /**
+ * Get the term.
+ * @return the term
+ */
+ public Term getTerm() {
+ return term;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(this.getClass().getName());
+ buffer.append("[op=");
+ buffer.append(op);
+ buffer.append(", doc=");
+ if (doc != null) {
+ buffer.append(doc);
+ } else {
+ buffer.append("null");
+ }
+ buffer.append(", term=");
+ if (term != null) {
+ buffer.append(term);
+ } else {
+ buffer.append("null");
+ }
+ buffer.append("]");
+ return buffer.toString();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ public void write(DataOutput out) throws IOException {
+ throw new IOException(this.getClass().getName()
+ + ".write should never be called");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ public void readFields(DataInput in) throws IOException {
+ throw new IOException(this.getClass().getName()
+ + ".readFields should never be called");
+ }
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java Fri Oct 19 02:25:55 2012
@@ -1,89 +1,89 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * The class represents a document id, which is of type text.
- */
-public class DocumentID implements WritableComparable {
- private final Text docID;
-
- /**
- * Constructor.
- */
- public DocumentID() {
- docID = new Text();
- }
-
- /**
- * The text of the document id.
- * @return the text
- */
- public Text getText() {
- return docID;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Comparable#compareTo(java.lang.Object)
- */
- public int compareTo(Object obj) {
- if (this == obj) {
- return 0;
- } else {
- return docID.compareTo(((DocumentID) obj).docID);
- }
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#hashCode()
- */
- public int hashCode() {
- return docID.hashCode();
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- public String toString() {
- return this.getClass().getName() + "[" + docID + "]";
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
- */
- public void write(DataOutput out) throws IOException {
- throw new IOException(this.getClass().getName()
- + ".write should never be called");
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
- */
- public void readFields(DataInput in) throws IOException {
- throw new IOException(this.getClass().getName()
- + ".readFields should never be called");
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The class represents a document id, which is of type text.
+ */
+public class DocumentID implements WritableComparable {
+ private final Text docID;
+
+ /**
+ * Constructor.
+ */
+ public DocumentID() {
+ docID = new Text();
+ }
+
+ /**
+ * The text of the document id.
+ * @return the text
+ */
+ public Text getText() {
+ return docID;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ public int compareTo(Object obj) {
+ if (this == obj) {
+ return 0;
+ } else {
+ return docID.compareTo(((DocumentID) obj).docID);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return docID.hashCode();
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return this.getClass().getName() + "[" + docID + "]";
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ public void write(DataOutput out) throws IOException {
+ throw new IOException(this.getClass().getName()
+ + ".write should never be called");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ public void readFields(DataInput in) throws IOException {
+ throw new IOException(this.getClass().getName()
+ + ".readFields should never be called");
+ }
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java Fri Oct 19 02:25:55 2012
@@ -1,50 +1,50 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-/**
- * A distribution policy decides, given a document with a document id, which
- * one shard the request should be sent to if the request is an insert, and
- * which shard(s) the request should be sent to if the request is a delete.
- */
-public interface IDistributionPolicy {
-
- /**
- * Initialization. It must be called before any chooseShard() is called.
- * @param shards
- */
- void init(Shard[] shards);
-
- /**
- * Choose a shard to send an insert request.
- * @param key
- * @return the index of the chosen shard
- */
- int chooseShardForInsert(DocumentID key);
-
- /**
- * Choose a shard or all shards to send a delete request. E.g. a round-robin
- * distribution policy would send a delete request to all the shards.
- * -1 represents all the shards.
- * @param key
- * @return the index of the chosen shard, -1 if all the shards are chosen
- */
- int chooseShardForDelete(DocumentID key);
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+/**
+ * A distribution policy decides, given a document with a document id, which
+ * one shard the request should be sent to if the request is an insert, and
+ * which shard(s) the request should be sent to if the request is a delete.
+ */
+public interface IDistributionPolicy {
+
+ /**
+ * Initialization. It must be called before any chooseShard() is called.
+ * @param shards
+ */
+ void init(Shard[] shards);
+
+ /**
+ * Choose a shard to send an insert request.
+ * @param key
+ * @return the index of the chosen shard
+ */
+ int chooseShardForInsert(DocumentID key);
+
+ /**
+ * Choose a shard or all shards to send a delete request. E.g. a round-robin
+ * distribution policy would send a delete request to all the shards.
+ * -1 represents all the shards.
+ * @param key
+ * @return the index of the chosen shard, -1 if all the shards are chosen
+ */
+ int chooseShardForDelete(DocumentID key);
+
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java Fri Oct 19 02:25:55 2012
@@ -1,46 +1,46 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-/**
- * A class implements an index updater interface should create a Map/Reduce job
- * configuration and run the Map/Reduce job to analyze documents and update
- * Lucene instances in parallel.
- */
-public interface IIndexUpdater {
-
- /**
- * Create a Map/Reduce job configuration and run the Map/Reduce job to
- * analyze documents and update Lucene instances in parallel.
- * @param conf
- * @param inputPaths
- * @param outputPath
- * @param numMapTasks
- * @param shards
- * @throws IOException
- */
- void run(Configuration conf, Path[] inputPaths, Path outputPath,
- int numMapTasks, Shard[] shards) throws IOException;
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A class implements an index updater interface should create a Map/Reduce job
+ * configuration and run the Map/Reduce job to analyze documents and update
+ * Lucene instances in parallel.
+ */
+public interface IIndexUpdater {
+
+ /**
+ * Create a Map/Reduce job configuration and run the Map/Reduce job to
+ * analyze documents and update Lucene instances in parallel.
+ * @param conf
+ * @param inputPaths
+ * @param outputPath
+ * @param numMapTasks
+ * @param shards
+ * @throws IOException
+ */
+ void run(Configuration conf, Path[] inputPaths, Path outputPath,
+ int numMapTasks, Shard[] shards) throws IOException;
+
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java Fri Oct 19 02:25:55 2012
@@ -1,32 +1,32 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.Mapper;
-
-/**
- * Application specific local analysis. The output type must be (DocumentID,
- * DocumentAndOp).
- */
-public interface ILocalAnalysis<K extends WritableComparable, V extends Writable>
- extends Mapper<K, V, DocumentID, DocumentAndOp> {
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Mapper;
+
+/**
+ * Application specific local analysis. The output type must be (DocumentID,
+ * DocumentAndOp).
+ */
+public interface ILocalAnalysis<K extends WritableComparable, V extends Writable>
+ extends Mapper<K, V, DocumentID, DocumentAndOp> {
+
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java Fri Oct 19 02:25:55 2012
@@ -1,111 +1,111 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This combiner combines multiple intermediate forms into one intermediate
- * form. More specifically, the input intermediate forms are a single-document
- * ram index and/or a single delete term. An output intermediate form contains
- * a multi-document ram index and/or multiple delete terms.
- */
-public class IndexUpdateCombiner extends MapReduceBase implements
- Reducer<Shard, IntermediateForm, Shard, IntermediateForm> {
- static final Log LOG = LogFactory.getLog(IndexUpdateCombiner.class);
-
- IndexUpdateConfiguration iconf;
- long maxSizeInBytes;
- long nearMaxSizeInBytes;
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
- */
- public void reduce(Shard key, Iterator<IntermediateForm> values,
- OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
- throws IOException {
-
- String message = key.toString();
- IntermediateForm form = null;
-
- while (values.hasNext()) {
- IntermediateForm singleDocForm = values.next();
- long formSize = form == null ? 0 : form.totalSizeInBytes();
- long singleDocFormSize = singleDocForm.totalSizeInBytes();
-
- if (form != null && formSize + singleDocFormSize > maxSizeInBytes) {
- closeForm(form, message);
- output.collect(key, form);
- form = null;
- }
-
- if (form == null && singleDocFormSize >= nearMaxSizeInBytes) {
- output.collect(key, singleDocForm);
- } else {
- if (form == null) {
- form = createForm(message);
- }
- form.process(singleDocForm);
- }
- }
-
- if (form != null) {
- closeForm(form, message);
- output.collect(key, form);
- }
- }
-
- private IntermediateForm createForm(String message) throws IOException {
- LOG.info("Construct a form writer for " + message);
- IntermediateForm form = new IntermediateForm();
- form.configure(iconf);
- return form;
- }
-
- private void closeForm(IntermediateForm form, String message)
- throws IOException {
- form.closeWriter();
- LOG.info("Closed the form writer for " + message + ", form = " + form);
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
- */
- public void configure(JobConf job) {
- iconf = new IndexUpdateConfiguration(job);
- maxSizeInBytes = iconf.getMaxRAMSizeInBytes();
- nearMaxSizeInBytes = maxSizeInBytes - (maxSizeInBytes >>> 3); // 7/8 of max
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#close()
- */
- public void close() throws IOException {
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This combiner combines multiple intermediate forms into one intermediate
+ * form. More specifically, the input intermediate forms are a single-document
+ * ram index and/or a single delete term. An output intermediate form contains
+ * a multi-document ram index and/or multiple delete terms.
+ */
+public class IndexUpdateCombiner extends MapReduceBase implements
+ Reducer<Shard, IntermediateForm, Shard, IntermediateForm> {
+ static final Log LOG = LogFactory.getLog(IndexUpdateCombiner.class);
+
+ IndexUpdateConfiguration iconf;
+ long maxSizeInBytes;
+ long nearMaxSizeInBytes;
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ public void reduce(Shard key, Iterator<IntermediateForm> values,
+ OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
+ throws IOException {
+
+ String message = key.toString();
+ IntermediateForm form = null;
+
+ while (values.hasNext()) {
+ IntermediateForm singleDocForm = values.next();
+ long formSize = form == null ? 0 : form.totalSizeInBytes();
+ long singleDocFormSize = singleDocForm.totalSizeInBytes();
+
+ if (form != null && formSize + singleDocFormSize > maxSizeInBytes) {
+ closeForm(form, message);
+ output.collect(key, form);
+ form = null;
+ }
+
+ if (form == null && singleDocFormSize >= nearMaxSizeInBytes) {
+ output.collect(key, singleDocForm);
+ } else {
+ if (form == null) {
+ form = createForm(message);
+ }
+ form.process(singleDocForm);
+ }
+ }
+
+ if (form != null) {
+ closeForm(form, message);
+ output.collect(key, form);
+ }
+ }
+
+ private IntermediateForm createForm(String message) throws IOException {
+ LOG.info("Construct a form writer for " + message);
+ IntermediateForm form = new IntermediateForm();
+ form.configure(iconf);
+ return form;
+ }
+
+ private void closeForm(IntermediateForm form, String message)
+ throws IOException {
+ form.closeWriter();
+ LOG.info("Closed the form writer for " + message + ", form = " + form);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ iconf = new IndexUpdateConfiguration(job);
+ maxSizeInBytes = iconf.getMaxRAMSizeInBytes();
+ nearMaxSizeInBytes = maxSizeInBytes - (maxSizeInBytes >>> 3); // 7/8 of max
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#close()
+ */
+ public void close() throws IOException {
+ }
+
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java Fri Oct 19 02:25:55 2012
@@ -1,256 +1,256 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
-import org.apache.hadoop.contrib.index.example.LineDocInputFormat;
-import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-
-/**
- * This class provides the getters and the setters to a number of parameters.
- * Most of the parameters are related to the index update and the rest are
- * from the existing Map/Reduce parameters.
- */
-public class IndexUpdateConfiguration {
- final Configuration conf;
-
- /**
- * Constructor
- * @param conf
- */
- public IndexUpdateConfiguration(Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * Get the underlying configuration object.
- * @return the configuration
- */
- public Configuration getConfiguration() {
- return conf;
- }
-
- //
- // existing map/reduce properties
- //
- // public int getIOFileBufferSize() {
- // return getInt("io.file.buffer.size", 4096);
- // }
-
- /**
- * Get the IO sort space in MB.
- * @return the IO sort space in MB
- */
- public int getIOSortMB() {
- return conf.getInt(MRJobConfig.IO_SORT_MB, 100);
- }
-
- /**
- * Set the IO sort space in MB.
- * @param mb the IO sort space in MB
- */
- public void setIOSortMB(int mb) {
- conf.setInt(MRJobConfig.IO_SORT_MB, mb);
- }
-
- /**
- * Get the Map/Reduce temp directory.
- * @return the Map/Reduce temp directory
- */
- public String getMapredTempDir() {
- return conf.get(MRConfig.TEMP_DIR);
- }
-
- //
- // properties for index update
- //
- /**
- * Get the distribution policy class.
- * @return the distribution policy class
- */
- public Class<? extends IDistributionPolicy> getDistributionPolicyClass() {
- return conf.getClass("sea.distribution.policy",
- HashingDistributionPolicy.class, IDistributionPolicy.class);
- }
-
- /**
- * Set the distribution policy class.
- * @param theClass the distribution policy class
- */
- public void setDistributionPolicyClass(
- Class<? extends IDistributionPolicy> theClass) {
- conf.setClass("sea.distribution.policy", theClass,
- IDistributionPolicy.class);
- }
-
- /**
- * Get the analyzer class.
- * @return the analyzer class
- */
- public Class<? extends Analyzer> getDocumentAnalyzerClass() {
- return conf.getClass("sea.document.analyzer", StandardAnalyzer.class,
- Analyzer.class);
- }
-
- /**
- * Set the analyzer class.
- * @param theClass the analyzer class
- */
- public void setDocumentAnalyzerClass(Class<? extends Analyzer> theClass) {
- conf.setClass("sea.document.analyzer", theClass, Analyzer.class);
- }
-
- /**
- * Get the index input format class.
- * @return the index input format class
- */
- public Class<? extends InputFormat> getIndexInputFormatClass() {
- return conf.getClass("sea.input.format", LineDocInputFormat.class,
- InputFormat.class);
- }
-
- /**
- * Set the index input format class.
- * @param theClass the index input format class
- */
- public void setIndexInputFormatClass(Class<? extends InputFormat> theClass) {
- conf.setClass("sea.input.format", theClass, InputFormat.class);
- }
-
- /**
- * Get the index updater class.
- * @return the index updater class
- */
- public Class<? extends IIndexUpdater> getIndexUpdaterClass() {
- return conf.getClass("sea.index.updater", IndexUpdater.class,
- IIndexUpdater.class);
- }
-
- /**
- * Set the index updater class.
- * @param theClass the index updater class
- */
- public void setIndexUpdaterClass(Class<? extends IIndexUpdater> theClass) {
- conf.setClass("sea.index.updater", theClass, IIndexUpdater.class);
- }
-
- /**
- * Get the local analysis class.
- * @return the local analysis class
- */
- public Class<? extends ILocalAnalysis> getLocalAnalysisClass() {
- return conf.getClass("sea.local.analysis", LineDocLocalAnalysis.class,
- ILocalAnalysis.class);
- }
-
- /**
- * Set the local analysis class.
- * @param theClass the local analysis class
- */
- public void setLocalAnalysisClass(Class<? extends ILocalAnalysis> theClass) {
- conf.setClass("sea.local.analysis", theClass, ILocalAnalysis.class);
- }
-
- /**
- * Get the string representation of a number of shards.
- * @return the string representation of a number of shards
- */
- public String getIndexShards() {
- return conf.get("sea.index.shards");
- }
-
- /**
- * Set the string representation of a number of shards.
- * @param shards the string representation of a number of shards
- */
- public void setIndexShards(String shards) {
- conf.set("sea.index.shards", shards);
- }
-
- /**
- * Get the max field length for a Lucene instance.
- * @return the max field length for a Lucene instance
- */
- public int getIndexMaxFieldLength() {
- return conf.getInt("sea.max.field.length", -1);
- }
-
- /**
- * Set the max field length for a Lucene instance.
- * @param maxFieldLength the max field length for a Lucene instance
- */
- public void setIndexMaxFieldLength(int maxFieldLength) {
- conf.setInt("sea.max.field.length", maxFieldLength);
- }
-
- /**
- * Get the max number of segments for a Lucene instance.
- * @return the max number of segments for a Lucene instance
- */
- public int getIndexMaxNumSegments() {
- return conf.getInt("sea.max.num.segments", -1);
- }
-
- /**
- * Set the max number of segments for a Lucene instance.
- * @param maxNumSegments the max number of segments for a Lucene instance
- */
- public void setIndexMaxNumSegments(int maxNumSegments) {
- conf.setInt("sea.max.num.segments", maxNumSegments);
- }
-
- /**
- * Check whether to use the compound file format for a Lucene instance.
- * @return true if using the compound file format for a Lucene instance
- */
- public boolean getIndexUseCompoundFile() {
- return conf.getBoolean("sea.use.compound.file", false);
- }
-
- /**
- * Set whether use the compound file format for a Lucene instance.
- * @param useCompoundFile whether to use the compound file format
- */
- public void setIndexUseCompoundFile(boolean useCompoundFile) {
- conf.setBoolean("sea.use.compound.file", useCompoundFile);
- }
-
- /**
- * Get the max ram index size in bytes. The default is 50M.
- * @return the max ram index size in bytes
- */
- public long getMaxRAMSizeInBytes() {
- return conf.getLong("sea.max.ramsize.bytes", 50L << 20);
- }
-
- /**
- * Set the max ram index size in bytes.
- * @param b the max ram index size in bytes
- */
- public void setMaxRAMSizeInBytes(long b) {
- conf.setLong("sea.max.ramsize.bytes", b);
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
+import org.apache.hadoop.contrib.index.example.LineDocInputFormat;
+import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * This class provides the getters and the setters to a number of parameters.
+ * Most of the parameters are related to the index update and the rest are
+ * from the existing Map/Reduce parameters.
+ */
+public class IndexUpdateConfiguration {
+ final Configuration conf;
+
+ /**
+ * Constructor
+ * @param conf
+ */
+ public IndexUpdateConfiguration(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Get the underlying configuration object.
+ * @return the configuration
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ //
+ // existing map/reduce properties
+ //
+ // public int getIOFileBufferSize() {
+ // return getInt("io.file.buffer.size", 4096);
+ // }
+
+ /**
+ * Get the IO sort space in MB.
+ * @return the IO sort space in MB
+ */
+ public int getIOSortMB() {
+ return conf.getInt(MRJobConfig.IO_SORT_MB, 100);
+ }
+
+ /**
+ * Set the IO sort space in MB.
+ * @param mb the IO sort space in MB
+ */
+ public void setIOSortMB(int mb) {
+ conf.setInt(MRJobConfig.IO_SORT_MB, mb);
+ }
+
+ /**
+ * Get the Map/Reduce temp directory.
+ * @return the Map/Reduce temp directory
+ */
+ public String getMapredTempDir() {
+ return conf.get(MRConfig.TEMP_DIR);
+ }
+
+ //
+ // properties for index update
+ //
+ /**
+ * Get the distribution policy class.
+ * @return the distribution policy class
+ */
+ public Class<? extends IDistributionPolicy> getDistributionPolicyClass() {
+ return conf.getClass("sea.distribution.policy",
+ HashingDistributionPolicy.class, IDistributionPolicy.class);
+ }
+
+ /**
+ * Set the distribution policy class.
+ * @param theClass the distribution policy class
+ */
+ public void setDistributionPolicyClass(
+ Class<? extends IDistributionPolicy> theClass) {
+ conf.setClass("sea.distribution.policy", theClass,
+ IDistributionPolicy.class);
+ }
+
+ /**
+ * Get the analyzer class.
+ * @return the analyzer class
+ */
+ public Class<? extends Analyzer> getDocumentAnalyzerClass() {
+ return conf.getClass("sea.document.analyzer", StandardAnalyzer.class,
+ Analyzer.class);
+ }
+
+ /**
+ * Set the analyzer class.
+ * @param theClass the analyzer class
+ */
+ public void setDocumentAnalyzerClass(Class<? extends Analyzer> theClass) {
+ conf.setClass("sea.document.analyzer", theClass, Analyzer.class);
+ }
+
+ /**
+ * Get the index input format class.
+ * @return the index input format class
+ */
+ public Class<? extends InputFormat> getIndexInputFormatClass() {
+ return conf.getClass("sea.input.format", LineDocInputFormat.class,
+ InputFormat.class);
+ }
+
+ /**
+ * Set the index input format class.
+ * @param theClass the index input format class
+ */
+ public void setIndexInputFormatClass(Class<? extends InputFormat> theClass) {
+ conf.setClass("sea.input.format", theClass, InputFormat.class);
+ }
+
+ /**
+ * Get the index updater class.
+ * @return the index updater class
+ */
+ public Class<? extends IIndexUpdater> getIndexUpdaterClass() {
+ return conf.getClass("sea.index.updater", IndexUpdater.class,
+ IIndexUpdater.class);
+ }
+
+ /**
+ * Set the index updater class.
+ * @param theClass the index updater class
+ */
+ public void setIndexUpdaterClass(Class<? extends IIndexUpdater> theClass) {
+ conf.setClass("sea.index.updater", theClass, IIndexUpdater.class);
+ }
+
+ /**
+ * Get the local analysis class.
+ * @return the local analysis class
+ */
+ public Class<? extends ILocalAnalysis> getLocalAnalysisClass() {
+ return conf.getClass("sea.local.analysis", LineDocLocalAnalysis.class,
+ ILocalAnalysis.class);
+ }
+
+ /**
+ * Set the local analysis class.
+ * @param theClass the local analysis class
+ */
+ public void setLocalAnalysisClass(Class<? extends ILocalAnalysis> theClass) {
+ conf.setClass("sea.local.analysis", theClass, ILocalAnalysis.class);
+ }
+
+ /**
+ * Get the string representation of a number of shards.
+ * @return the string representation of a number of shards
+ */
+ public String getIndexShards() {
+ return conf.get("sea.index.shards");
+ }
+
+ /**
+ * Set the string representation of a number of shards.
+ * @param shards the string representation of a number of shards
+ */
+ public void setIndexShards(String shards) {
+ conf.set("sea.index.shards", shards);
+ }
+
+ /**
+ * Get the max field length for a Lucene instance.
+ * @return the max field length for a Lucene instance
+ */
+ public int getIndexMaxFieldLength() {
+ return conf.getInt("sea.max.field.length", -1);
+ }
+
+ /**
+ * Set the max field length for a Lucene instance.
+ * @param maxFieldLength the max field length for a Lucene instance
+ */
+ public void setIndexMaxFieldLength(int maxFieldLength) {
+ conf.setInt("sea.max.field.length", maxFieldLength);
+ }
+
+ /**
+ * Get the max number of segments for a Lucene instance.
+ * @return the max number of segments for a Lucene instance
+ */
+ public int getIndexMaxNumSegments() {
+ return conf.getInt("sea.max.num.segments", -1);
+ }
+
+ /**
+ * Set the max number of segments for a Lucene instance.
+ * @param maxNumSegments the max number of segments for a Lucene instance
+ */
+ public void setIndexMaxNumSegments(int maxNumSegments) {
+ conf.setInt("sea.max.num.segments", maxNumSegments);
+ }
+
+ /**
+ * Check whether to use the compound file format for a Lucene instance.
+ * @return true if using the compound file format for a Lucene instance
+ */
+ public boolean getIndexUseCompoundFile() {
+ return conf.getBoolean("sea.use.compound.file", false);
+ }
+
+ /**
+ * Set whether use the compound file format for a Lucene instance.
+ * @param useCompoundFile whether to use the compound file format
+ */
+ public void setIndexUseCompoundFile(boolean useCompoundFile) {
+ conf.setBoolean("sea.use.compound.file", useCompoundFile);
+ }
+
+ /**
+ * Get the max ram index size in bytes. The default is 50M.
+ * @return the max ram index size in bytes
+ */
+ public long getMaxRAMSizeInBytes() {
+ return conf.getLong("sea.max.ramsize.bytes", 50L << 20);
+ }
+
+ /**
+ * Set the max ram index size in bytes.
+ * @param b the max ram index size in bytes
+ */
+ public void setMaxRAMSizeInBytes(long b) {
+ conf.setLong("sea.max.ramsize.bytes", b);
+ }
+
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java Fri Oct 19 02:25:55 2012
@@ -1,199 +1,199 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.lucene.analysis.Analyzer;
-
-/**
- * This class applies local analysis on a key-value pair and then convert the
- * result docid-operation pair to a shard-and-intermediate form pair.
- */
-public class IndexUpdateMapper<K extends WritableComparable, V extends Writable>
- extends MapReduceBase implements Mapper<K, V, Shard, IntermediateForm> {
- static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class);
-
- /**
- * Get the map output key class.
- * @return the map output key class
- */
- public static Class<? extends WritableComparable> getMapOutputKeyClass() {
- return Shard.class;
- }
-
- /**
- * Get the map output value class.
- * @return the map output value class
- */
- public static Class<? extends Writable> getMapOutputValueClass() {
- return IntermediateForm.class;
- }
-
- IndexUpdateConfiguration iconf;
- private Analyzer analyzer;
- private Shard[] shards;
- private IDistributionPolicy distributionPolicy;
-
- private ILocalAnalysis<K, V> localAnalysis;
- private DocumentID tmpKey;
- private DocumentAndOp tmpValue;
-
- private OutputCollector<DocumentID, DocumentAndOp> tmpCollector =
- new OutputCollector<DocumentID, DocumentAndOp>() {
- public void collect(DocumentID key, DocumentAndOp value)
- throws IOException {
- tmpKey = key;
- tmpValue = value;
- }
- };
-
- /**
- * Map a key-value pair to a shard-and-intermediate form pair. Internally,
- * the local analysis is first applied to map the key-value pair to a
- * document id-and-operation pair, then the docid-and-operation pair is
- * mapped to a shard-intermediate form pair. The intermediate form is of the
- * form of a single-document ram index and/or a single delete term.
- */
- public void map(K key, V value,
- OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
- throws IOException {
-
- synchronized (this) {
- localAnalysis.map(key, value, tmpCollector, reporter);
-
- if (tmpKey != null && tmpValue != null) {
- DocumentAndOp doc = tmpValue;
- IntermediateForm form = new IntermediateForm();
- form.configure(iconf);
- form.process(doc, analyzer);
- form.closeWriter();
-
- if (doc.getOp() == DocumentAndOp.Op.INSERT) {
- int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey);
- if (chosenShard >= 0) {
- // insert into one shard
- output.collect(shards[chosenShard], form);
- } else {
- throw new IOException("Chosen shard for insert must be >= 0");
- }
-
- } else if (doc.getOp() == DocumentAndOp.Op.DELETE) {
- int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey);
- if (chosenShard >= 0) {
- // delete from one shard
- output.collect(shards[chosenShard], form);
- } else {
- // broadcast delete to all shards
- for (int i = 0; i < shards.length; i++) {
- output.collect(shards[i], form);
- }
- }
-
- } else { // UPDATE
- int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey);
- int deleteFromShard =
- distributionPolicy.chooseShardForDelete(tmpKey);
-
- if (insertToShard >= 0) {
- if (insertToShard == deleteFromShard) {
- // update into one shard
- output.collect(shards[insertToShard], form);
- } else {
- // prepare a deletion form
- IntermediateForm deletionForm = new IntermediateForm();
- deletionForm.configure(iconf);
- deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE,
- doc.getTerm()), analyzer);
- deletionForm.closeWriter();
-
- if (deleteFromShard >= 0) {
- // delete from one shard
- output.collect(shards[deleteFromShard], deletionForm);
- } else {
- // broadcast delete to all shards
- for (int i = 0; i < shards.length; i++) {
- output.collect(shards[i], deletionForm);
- }
- }
-
- // prepare an insertion form
- IntermediateForm insertionForm = new IntermediateForm();
- insertionForm.configure(iconf);
- insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT,
- doc.getDocument()), analyzer);
- insertionForm.closeWriter();
-
- // insert into one shard
- output.collect(shards[insertToShard], insertionForm);
- }
- } else {
- throw new IOException("Chosen shard for insert must be >= 0");
- }
- }
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
- */
- public void configure(JobConf job) {
- iconf = new IndexUpdateConfiguration(job);
- analyzer =
- (Analyzer) ReflectionUtils.newInstance(
- iconf.getDocumentAnalyzerClass(), job);
-
- localAnalysis =
- (ILocalAnalysis) ReflectionUtils.newInstance(
- iconf.getLocalAnalysisClass(), job);
- localAnalysis.configure(job);
-
- shards = Shard.getIndexShards(iconf);
-
- distributionPolicy =
- (IDistributionPolicy) ReflectionUtils.newInstance(
- iconf.getDistributionPolicyClass(), job);
- distributionPolicy.init(shards);
-
- LOG.info("sea.document.analyzer = " + analyzer.getClass().getName());
- LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName());
- LOG.info(shards.length + " shards = " + iconf.getIndexShards());
- LOG.info("sea.distribution.policy = "
- + distributionPolicy.getClass().getName());
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#close()
- */
- public void close() throws IOException {
- localAnalysis.close();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.lucene.analysis.Analyzer;
+
+/**
+ * This class applies local analysis on a key-value pair and then convert the
+ * result docid-operation pair to a shard-and-intermediate form pair.
+ */
+public class IndexUpdateMapper<K extends WritableComparable, V extends Writable>
+ extends MapReduceBase implements Mapper<K, V, Shard, IntermediateForm> {
+ static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class);
+
+ /**
+ * Get the map output key class.
+ * @return the map output key class
+ */
+ public static Class<? extends WritableComparable> getMapOutputKeyClass() {
+ return Shard.class;
+ }
+
+ /**
+ * Get the map output value class.
+ * @return the map output value class
+ */
+ public static Class<? extends Writable> getMapOutputValueClass() {
+ return IntermediateForm.class;
+ }
+
+ IndexUpdateConfiguration iconf;
+ private Analyzer analyzer;
+ private Shard[] shards;
+ private IDistributionPolicy distributionPolicy;
+
+ private ILocalAnalysis<K, V> localAnalysis;
+ private DocumentID tmpKey;
+ private DocumentAndOp tmpValue;
+
+ private OutputCollector<DocumentID, DocumentAndOp> tmpCollector =
+ new OutputCollector<DocumentID, DocumentAndOp>() {
+ public void collect(DocumentID key, DocumentAndOp value)
+ throws IOException {
+ tmpKey = key;
+ tmpValue = value;
+ }
+ };
+
+ /**
+ * Map a key-value pair to a shard-and-intermediate form pair. Internally,
+ * the local analysis is first applied to map the key-value pair to a
+ * document id-and-operation pair, then the docid-and-operation pair is
+ * mapped to a shard-intermediate form pair. The intermediate form is of the
+ * form of a single-document ram index and/or a single delete term.
+ */
+ public void map(K key, V value,
+ OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
+ throws IOException {
+
+ synchronized (this) {
+ localAnalysis.map(key, value, tmpCollector, reporter);
+
+ if (tmpKey != null && tmpValue != null) {
+ DocumentAndOp doc = tmpValue;
+ IntermediateForm form = new IntermediateForm();
+ form.configure(iconf);
+ form.process(doc, analyzer);
+ form.closeWriter();
+
+ if (doc.getOp() == DocumentAndOp.Op.INSERT) {
+ int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey);
+ if (chosenShard >= 0) {
+ // insert into one shard
+ output.collect(shards[chosenShard], form);
+ } else {
+ throw new IOException("Chosen shard for insert must be >= 0");
+ }
+
+ } else if (doc.getOp() == DocumentAndOp.Op.DELETE) {
+ int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey);
+ if (chosenShard >= 0) {
+ // delete from one shard
+ output.collect(shards[chosenShard], form);
+ } else {
+ // broadcast delete to all shards
+ for (int i = 0; i < shards.length; i++) {
+ output.collect(shards[i], form);
+ }
+ }
+
+ } else { // UPDATE
+ int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey);
+ int deleteFromShard =
+ distributionPolicy.chooseShardForDelete(tmpKey);
+
+ if (insertToShard >= 0) {
+ if (insertToShard == deleteFromShard) {
+ // update into one shard
+ output.collect(shards[insertToShard], form);
+ } else {
+ // prepare a deletion form
+ IntermediateForm deletionForm = new IntermediateForm();
+ deletionForm.configure(iconf);
+ deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE,
+ doc.getTerm()), analyzer);
+ deletionForm.closeWriter();
+
+ if (deleteFromShard >= 0) {
+ // delete from one shard
+ output.collect(shards[deleteFromShard], deletionForm);
+ } else {
+ // broadcast delete to all shards
+ for (int i = 0; i < shards.length; i++) {
+ output.collect(shards[i], deletionForm);
+ }
+ }
+
+ // prepare an insertion form
+ IntermediateForm insertionForm = new IntermediateForm();
+ insertionForm.configure(iconf);
+ insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT,
+ doc.getDocument()), analyzer);
+ insertionForm.closeWriter();
+
+ // insert into one shard
+ output.collect(shards[insertToShard], insertionForm);
+ }
+ } else {
+ throw new IOException("Chosen shard for insert must be >= 0");
+ }
+ }
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ iconf = new IndexUpdateConfiguration(job);
+ analyzer =
+ (Analyzer) ReflectionUtils.newInstance(
+ iconf.getDocumentAnalyzerClass(), job);
+
+ localAnalysis =
+ (ILocalAnalysis) ReflectionUtils.newInstance(
+ iconf.getLocalAnalysisClass(), job);
+ localAnalysis.configure(job);
+
+ shards = Shard.getIndexShards(iconf);
+
+ distributionPolicy =
+ (IDistributionPolicy) ReflectionUtils.newInstance(
+ iconf.getDistributionPolicyClass(), job);
+ distributionPolicy.init(shards);
+
+ LOG.info("sea.document.analyzer = " + analyzer.getClass().getName());
+ LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName());
+ LOG.info(shards.length + " shards = " + iconf.getIndexShards());
+ LOG.info("sea.distribution.policy = "
+ + distributionPolicy.getClass().getName());
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#close()
+ */
+ public void close() throws IOException {
+ localAnalysis.close();
+ }
+
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java Fri Oct 19 02:25:55 2012
@@ -1,60 +1,60 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
-
-/**
- * This partitioner class puts the values of the same key - in this case the
- * same shard - in the same partition.
- */
-public class IndexUpdatePartitioner implements
- Partitioner<Shard, IntermediateForm> {
-
- private Shard[] shards;
- private Map<Shard, Integer> map;
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.Partitioner#getPartition(java.lang.Object, java.lang.Object, int)
- */
- public int getPartition(Shard key, IntermediateForm value, int numPartitions) {
- int partition = map.get(key).intValue();
- if (partition < numPartitions) {
- return partition;
- } else {
- return numPartitions - 1;
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
- */
- public void configure(JobConf job) {
- shards = Shard.getIndexShards(new IndexUpdateConfiguration(job));
- map = new HashMap<Shard, Integer>();
- for (int i = 0; i < shards.length; i++) {
- map.put(shards[i], i);
- }
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+/**
+ * This partitioner class puts the values of the same key - in this case the
+ * same shard - in the same partition.
+ */
+public class IndexUpdatePartitioner implements
+ Partitioner<Shard, IntermediateForm> {
+
+ private Shard[] shards;
+ private Map<Shard, Integer> map;
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Partitioner#getPartition(java.lang.Object, java.lang.Object, int)
+ */
+ public int getPartition(Shard key, IntermediateForm value, int numPartitions) {
+ int partition = map.get(key).intValue();
+ if (partition < numPartitions) {
+ return partition;
+ } else {
+ return numPartitions - 1;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ shards = Shard.getIndexShards(new IndexUpdateConfiguration(job));
+ map = new HashMap<Shard, Integer>();
+ for (int i = 0; i < shards.length; i++) {
+ map.put(shards[i], i);
+ }
+ }
+
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java Fri Oct 19 02:25:55 2012
@@ -1,143 +1,143 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.contrib.index.lucene.ShardWriter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Closeable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This reducer applies to a shard the changes for it. A "new version" of
- * a shard is created at the end of a reduce. It is important to note that
- * the new version of the shard is not derived from scratch. By leveraging
- * Lucene's update algorithm, the new version of each Lucene instance will
- * share as many files as possible as the previous version.
- */
-public class IndexUpdateReducer extends MapReduceBase implements
- Reducer<Shard, IntermediateForm, Shard, Text> {
- static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class);
- static final Text DONE = new Text("done");
-
- /**
- * Get the reduce output key class.
- * @return the reduce output key class
- */
- public static Class<? extends WritableComparable> getOutputKeyClass() {
- return Shard.class;
- }
-
- /**
- * Get the reduce output value class.
- * @return the reduce output value class
- */
- public static Class<? extends Writable> getOutputValueClass() {
- return Text.class;
- }
-
- private IndexUpdateConfiguration iconf;
- private String mapredTempDir;
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
- */
- public void reduce(Shard key, Iterator<IntermediateForm> values,
- OutputCollector<Shard, Text> output, Reporter reporter)
- throws IOException {
-
- LOG.info("Construct a shard writer for " + key);
- FileSystem fs = FileSystem.get(iconf.getConfiguration());
- String temp =
- mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis();
- final ShardWriter writer = new ShardWriter(fs, key, temp, iconf);
-
- // update the shard
- while (values.hasNext()) {
- IntermediateForm form = values.next();
- writer.process(form);
- reporter.progress();
- }
-
- // close the shard
- final Reporter fReporter = reporter;
- new Closeable() {
- volatile boolean closed = false;
-
- public void close() throws IOException {
- // spawn a thread to give progress heartbeats
- Thread prog = new Thread() {
- public void run() {
- while (!closed) {
- try {
- fReporter.setStatus("closing");
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- continue;
- } catch (Throwable e) {
- return;
- }
- }
- }
- };
-
- try {
- prog.start();
-
- if (writer != null) {
- writer.close();
- }
- } finally {
- closed = true;
- }
- }
- }.close();
- LOG.info("Closed the shard writer for " + key + ", writer = " + writer);
-
- output.collect(key, DONE);
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
- */
- public void configure(JobConf job) {
- iconf = new IndexUpdateConfiguration(job);
- mapredTempDir = iconf.getMapredTempDir();
- mapredTempDir = Shard.normalizePath(mapredTempDir);
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#close()
- */
- public void close() throws IOException {
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.contrib.index.lucene.ShardWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Closeable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This reducer applies to a shard the changes for it. A "new version" of
+ * a shard is created at the end of a reduce. It is important to note that
+ * the new version of the shard is not derived from scratch. By leveraging
+ * Lucene's update algorithm, the new version of each Lucene instance will
+ * share as many files as possible as the previous version.
+ */
+public class IndexUpdateReducer extends MapReduceBase implements
+ Reducer<Shard, IntermediateForm, Shard, Text> {
+ static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class);
+ static final Text DONE = new Text("done");
+
+ /**
+ * Get the reduce output key class.
+ * @return the reduce output key class
+ */
+ public static Class<? extends WritableComparable> getOutputKeyClass() {
+ return Shard.class;
+ }
+
+ /**
+ * Get the reduce output value class.
+ * @return the reduce output value class
+ */
+ public static Class<? extends Writable> getOutputValueClass() {
+ return Text.class;
+ }
+
+ private IndexUpdateConfiguration iconf;
+ private String mapredTempDir;
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ public void reduce(Shard key, Iterator<IntermediateForm> values,
+ OutputCollector<Shard, Text> output, Reporter reporter)
+ throws IOException {
+
+ LOG.info("Construct a shard writer for " + key);
+ FileSystem fs = FileSystem.get(iconf.getConfiguration());
+ String temp =
+ mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis();
+ final ShardWriter writer = new ShardWriter(fs, key, temp, iconf);
+
+ // update the shard
+ while (values.hasNext()) {
+ IntermediateForm form = values.next();
+ writer.process(form);
+ reporter.progress();
+ }
+
+ // close the shard
+ final Reporter fReporter = reporter;
+ new Closeable() {
+ volatile boolean closed = false;
+
+ public void close() throws IOException {
+ // spawn a thread to give progress heartbeats
+ Thread prog = new Thread() {
+ public void run() {
+ while (!closed) {
+ try {
+ fReporter.setStatus("closing");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ continue;
+ } catch (Throwable e) {
+ return;
+ }
+ }
+ }
+ };
+
+ try {
+ prog.start();
+
+ if (writer != null) {
+ writer.close();
+ }
+ } finally {
+ closed = true;
+ }
+ }
+ }.close();
+ LOG.info("Closed the shard writer for " + key + ", writer = " + writer);
+
+ output.collect(key, DONE);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ iconf = new IndexUpdateConfiguration(job);
+ mapredTempDir = iconf.getMapredTempDir();
+ mapredTempDir = Shard.normalizePath(mapredTempDir);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#close()
+ */
+ public void close() throws IOException {
+ }
+
+}
|