Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java Fri Oct 19 02:25:55 2012 @@ -1,208 +1,208 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.Writable; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.Term; - -/** - * This class represents an indexing operation. The operation can be an insert, - * a delete or an update. If the operation is an insert or an update, a (new) - * document must be specified. If the operation is a delete or an update, a - * delete term must be specified. - */ -public class DocumentAndOp implements Writable { - - /** - * This class represents the type of an operation - an insert, a delete or - * an update. - */ - public static final class Op { - public static final Op INSERT = new Op("INSERT"); - public static final Op DELETE = new Op("DELETE"); - public static final Op UPDATE = new Op("UPDATE"); - - private String name; - - private Op(String name) { - this.name = name; - } - - public String toString() { - return name; - } - } - - private Op op; - private Document doc; - private Term term; - - /** - * Constructor for no operation. - */ - public DocumentAndOp() { - } - - /** - * Constructor for an insert operation. - * @param op - * @param doc - */ - public DocumentAndOp(Op op, Document doc) { - assert (op == Op.INSERT); - this.op = op; - this.doc = doc; - this.term = null; - } - - /** - * Constructor for a delete operation. - * @param op - * @param term - */ - public DocumentAndOp(Op op, Term term) { - assert (op == Op.DELETE); - this.op = op; - this.doc = null; - this.term = term; - } - - /** - * Constructor for an insert, a delete or an update operation. - * @param op - * @param doc - * @param term - */ - public DocumentAndOp(Op op, Document doc, Term term) { - if (op == Op.INSERT) { - assert (doc != null); - assert (term == null); - } else if (op == Op.DELETE) { - assert (doc == null); - assert (term != null); - } else { - assert (op == Op.UPDATE); - assert (doc != null); - assert (term != null); - } - this.op = op; - this.doc = doc; - this.term = term; - } - - /** - * Set the instance to be an insert operation. - * @param doc - */ - public void setInsert(Document doc) { - this.op = Op.INSERT; - this.doc = doc; - this.term = null; - } - - /** - * Set the instance to be a delete operation. - * @param term - */ - public void setDelete(Term term) { - this.op = Op.DELETE; - this.doc = null; - this.term = term; - } - - /** - * Set the instance to be an update operation. - * @param doc - * @param term - */ - public void setUpdate(Document doc, Term term) { - this.op = Op.UPDATE; - this.doc = doc; - this.term = term; - } - - /** - * Get the type of operation. - * @return the type of the operation. - */ - public Op getOp() { - return op; - } - - /** - * Get the document. - * @return the document - */ - public Document getDocument() { - return doc; - } - - /** - * Get the term. - * @return the term - */ - public Term getTerm() { - return term; - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - public String toString() { - StringBuilder buffer = new StringBuilder(); - buffer.append(this.getClass().getName()); - buffer.append("[op="); - buffer.append(op); - buffer.append(", doc="); - if (doc != null) { - buffer.append(doc); - } else { - buffer.append("null"); - } - buffer.append(", term="); - if (term != null) { - buffer.append(term); - } else { - buffer.append("null"); - } - buffer.append("]"); - return buffer.toString(); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) - */ - public void write(DataOutput out) throws IOException { - throw new IOException(this.getClass().getName() - + ".write should never be called"); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) - */ - public void readFields(DataInput in) throws IOException { - throw new IOException(this.getClass().getName() - + ".readFields should never be called"); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.Term; + +/** + * This class represents an indexing operation. The operation can be an insert, + * a delete or an update. If the operation is an insert or an update, a (new) + * document must be specified. If the operation is a delete or an update, a + * delete term must be specified. + */ +public class DocumentAndOp implements Writable { + + /** + * This class represents the type of an operation - an insert, a delete or + * an update. + */ + public static final class Op { + public static final Op INSERT = new Op("INSERT"); + public static final Op DELETE = new Op("DELETE"); + public static final Op UPDATE = new Op("UPDATE"); + + private String name; + + private Op(String name) { + this.name = name; + } + + public String toString() { + return name; + } + } + + private Op op; + private Document doc; + private Term term; + + /** + * Constructor for no operation. + */ + public DocumentAndOp() { + } + + /** + * Constructor for an insert operation. + * @param op + * @param doc + */ + public DocumentAndOp(Op op, Document doc) { + assert (op == Op.INSERT); + this.op = op; + this.doc = doc; + this.term = null; + } + + /** + * Constructor for a delete operation. + * @param op + * @param term + */ + public DocumentAndOp(Op op, Term term) { + assert (op == Op.DELETE); + this.op = op; + this.doc = null; + this.term = term; + } + + /** + * Constructor for an insert, a delete or an update operation. + * @param op + * @param doc + * @param term + */ + public DocumentAndOp(Op op, Document doc, Term term) { + if (op == Op.INSERT) { + assert (doc != null); + assert (term == null); + } else if (op == Op.DELETE) { + assert (doc == null); + assert (term != null); + } else { + assert (op == Op.UPDATE); + assert (doc != null); + assert (term != null); + } + this.op = op; + this.doc = doc; + this.term = term; + } + + /** + * Set the instance to be an insert operation. + * @param doc + */ + public void setInsert(Document doc) { + this.op = Op.INSERT; + this.doc = doc; + this.term = null; + } + + /** + * Set the instance to be a delete operation. + * @param term + */ + public void setDelete(Term term) { + this.op = Op.DELETE; + this.doc = null; + this.term = term; + } + + /** + * Set the instance to be an update operation. + * @param doc + * @param term + */ + public void setUpdate(Document doc, Term term) { + this.op = Op.UPDATE; + this.doc = doc; + this.term = term; + } + + /** + * Get the type of operation. + * @return the type of the operation. + */ + public Op getOp() { + return op; + } + + /** + * Get the document. + * @return the document + */ + public Document getDocument() { + return doc; + } + + /** + * Get the term. + * @return the term + */ + public Term getTerm() { + return term; + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append(this.getClass().getName()); + buffer.append("[op="); + buffer.append(op); + buffer.append(", doc="); + if (doc != null) { + buffer.append(doc); + } else { + buffer.append("null"); + } + buffer.append(", term="); + if (term != null) { + buffer.append(term); + } else { + buffer.append("null"); + } + buffer.append("]"); + return buffer.toString(); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ + public void write(DataOutput out) throws IOException { + throw new IOException(this.getClass().getName() + + ".write should never be called"); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ + public void readFields(DataInput in) throws IOException { + throw new IOException(this.getClass().getName() + + ".readFields should never be called"); + } +} Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java Fri Oct 19 02:25:55 2012 @@ -1,89 +1,89 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; - -/** - * The class represents a document id, which is of type text. - */ -public class DocumentID implements WritableComparable { - private final Text docID; - - /** - * Constructor. - */ - public DocumentID() { - docID = new Text(); - } - - /** - * The text of the document id. - * @return the text - */ - public Text getText() { - return docID; - } - - /* (non-Javadoc) - * @see java.lang.Comparable#compareTo(java.lang.Object) - */ - public int compareTo(Object obj) { - if (this == obj) { - return 0; - } else { - return docID.compareTo(((DocumentID) obj).docID); - } - } - - /* (non-Javadoc) - * @see java.lang.Object#hashCode() - */ - public int hashCode() { - return docID.hashCode(); - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - public String toString() { - return this.getClass().getName() + "[" + docID + "]"; - } - - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) - */ - public void write(DataOutput out) throws IOException { - throw new IOException(this.getClass().getName() - + ".write should never be called"); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) - */ - public void readFields(DataInput in) throws IOException { - throw new IOException(this.getClass().getName() - + ".readFields should never be called"); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +/** + * The class represents a document id, which is of type text. + */ +public class DocumentID implements WritableComparable { + private final Text docID; + + /** + * Constructor. + */ + public DocumentID() { + docID = new Text(); + } + + /** + * The text of the document id. + * @return the text + */ + public Text getText() { + return docID; + } + + /* (non-Javadoc) + * @see java.lang.Comparable#compareTo(java.lang.Object) + */ + public int compareTo(Object obj) { + if (this == obj) { + return 0; + } else { + return docID.compareTo(((DocumentID) obj).docID); + } + } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return docID.hashCode(); + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + public String toString() { + return this.getClass().getName() + "[" + docID + "]"; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ + public void write(DataOutput out) throws IOException { + throw new IOException(this.getClass().getName() + + ".write should never be called"); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ + public void readFields(DataInput in) throws IOException { + throw new IOException(this.getClass().getName() + + ".readFields should never be called"); + } +} Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java Fri Oct 19 02:25:55 2012 @@ -1,50 +1,50 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -/** - * A distribution policy decides, given a document with a document id, which - * one shard the request should be sent to if the request is an insert, and - * which shard(s) the request should be sent to if the request is a delete. - */ -public interface IDistributionPolicy { - - /** - * Initialization. It must be called before any chooseShard() is called. - * @param shards - */ - void init(Shard[] shards); - - /** - * Choose a shard to send an insert request. - * @param key - * @return the index of the chosen shard - */ - int chooseShardForInsert(DocumentID key); - - /** - * Choose a shard or all shards to send a delete request. E.g. a round-robin - * distribution policy would send a delete request to all the shards. - * -1 represents all the shards. - * @param key - * @return the index of the chosen shard, -1 if all the shards are chosen - */ - int chooseShardForDelete(DocumentID key); - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +/** + * A distribution policy decides, given a document with a document id, which + * one shard the request should be sent to if the request is an insert, and + * which shard(s) the request should be sent to if the request is a delete. + */ +public interface IDistributionPolicy { + + /** + * Initialization. It must be called before any chooseShard() is called. + * @param shards + */ + void init(Shard[] shards); + + /** + * Choose a shard to send an insert request. + * @param key + * @return the index of the chosen shard + */ + int chooseShardForInsert(DocumentID key); + + /** + * Choose a shard or all shards to send a delete request. E.g. a round-robin + * distribution policy would send a delete request to all the shards. + * -1 represents all the shards. + * @param key + * @return the index of the chosen shard, -1 if all the shards are chosen + */ + int chooseShardForDelete(DocumentID key); + +} Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java Fri Oct 19 02:25:55 2012 @@ -1,46 +1,46 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - -/** - * A class implements an index updater interface should create a Map/Reduce job - * configuration and run the Map/Reduce job to analyze documents and update - * Lucene instances in parallel. - */ -public interface IIndexUpdater { - - /** - * Create a Map/Reduce job configuration and run the Map/Reduce job to - * analyze documents and update Lucene instances in parallel. - * @param conf - * @param inputPaths - * @param outputPath - * @param numMapTasks - * @param shards - * @throws IOException - */ - void run(Configuration conf, Path[] inputPaths, Path outputPath, - int numMapTasks, Shard[] shards) throws IOException; - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +/** + * A class implements an index updater interface should create a Map/Reduce job + * configuration and run the Map/Reduce job to analyze documents and update + * Lucene instances in parallel. + */ +public interface IIndexUpdater { + + /** + * Create a Map/Reduce job configuration and run the Map/Reduce job to + * analyze documents and update Lucene instances in parallel. + * @param conf + * @param inputPaths + * @param outputPath + * @param numMapTasks + * @param shards + * @throws IOException + */ + void run(Configuration conf, Path[] inputPaths, Path outputPath, + int numMapTasks, Shard[] shards) throws IOException; + +} Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java Fri Oct 19 02:25:55 2012 @@ -1,32 +1,32 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.Mapper; - -/** - * Application specific local analysis. The output type must be (DocumentID, - * DocumentAndOp). - */ -public interface ILocalAnalysis - extends Mapper { - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Mapper; + +/** + * Application specific local analysis. The output type must be (DocumentID, + * DocumentAndOp). + */ +public interface ILocalAnalysis + extends Mapper { + +} Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java Fri Oct 19 02:25:55 2012 @@ -1,111 +1,111 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -/** - * This combiner combines multiple intermediate forms into one intermediate - * form. More specifically, the input intermediate forms are a single-document - * ram index and/or a single delete term. An output intermediate form contains - * a multi-document ram index and/or multiple delete terms. - */ -public class IndexUpdateCombiner extends MapReduceBase implements - Reducer { - static final Log LOG = LogFactory.getLog(IndexUpdateCombiner.class); - - IndexUpdateConfiguration iconf; - long maxSizeInBytes; - long nearMaxSizeInBytes; - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) - */ - public void reduce(Shard key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { - - String message = key.toString(); - IntermediateForm form = null; - - while (values.hasNext()) { - IntermediateForm singleDocForm = values.next(); - long formSize = form == null ? 0 : form.totalSizeInBytes(); - long singleDocFormSize = singleDocForm.totalSizeInBytes(); - - if (form != null && formSize + singleDocFormSize > maxSizeInBytes) { - closeForm(form, message); - output.collect(key, form); - form = null; - } - - if (form == null && singleDocFormSize >= nearMaxSizeInBytes) { - output.collect(key, singleDocForm); - } else { - if (form == null) { - form = createForm(message); - } - form.process(singleDocForm); - } - } - - if (form != null) { - closeForm(form, message); - output.collect(key, form); - } - } - - private IntermediateForm createForm(String message) throws IOException { - LOG.info("Construct a form writer for " + message); - IntermediateForm form = new IntermediateForm(); - form.configure(iconf); - return form; - } - - private void closeForm(IntermediateForm form, String message) - throws IOException { - form.closeWriter(); - LOG.info("Closed the form writer for " + message + ", form = " + form); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) - */ - public void configure(JobConf job) { - iconf = new IndexUpdateConfiguration(job); - maxSizeInBytes = iconf.getMaxRAMSizeInBytes(); - nearMaxSizeInBytes = maxSizeInBytes - (maxSizeInBytes >>> 3); // 7/8 of max - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#close() - */ - public void close() throws IOException { - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * This combiner combines multiple intermediate forms into one intermediate + * form. More specifically, the input intermediate forms are a single-document + * ram index and/or a single delete term. An output intermediate form contains + * a multi-document ram index and/or multiple delete terms. + */ +public class IndexUpdateCombiner extends MapReduceBase implements + Reducer { + static final Log LOG = LogFactory.getLog(IndexUpdateCombiner.class); + + IndexUpdateConfiguration iconf; + long maxSizeInBytes; + long nearMaxSizeInBytes; + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) + */ + public void reduce(Shard key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + + String message = key.toString(); + IntermediateForm form = null; + + while (values.hasNext()) { + IntermediateForm singleDocForm = values.next(); + long formSize = form == null ? 0 : form.totalSizeInBytes(); + long singleDocFormSize = singleDocForm.totalSizeInBytes(); + + if (form != null && formSize + singleDocFormSize > maxSizeInBytes) { + closeForm(form, message); + output.collect(key, form); + form = null; + } + + if (form == null && singleDocFormSize >= nearMaxSizeInBytes) { + output.collect(key, singleDocForm); + } else { + if (form == null) { + form = createForm(message); + } + form.process(singleDocForm); + } + } + + if (form != null) { + closeForm(form, message); + output.collect(key, form); + } + } + + private IntermediateForm createForm(String message) throws IOException { + LOG.info("Construct a form writer for " + message); + IntermediateForm form = new IntermediateForm(); + form.configure(iconf); + return form; + } + + private void closeForm(IntermediateForm form, String message) + throws IOException { + form.closeWriter(); + LOG.info("Closed the form writer for " + message + ", form = " + form); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) + */ + public void configure(JobConf job) { + iconf = new IndexUpdateConfiguration(job); + maxSizeInBytes = iconf.getMaxRAMSizeInBytes(); + nearMaxSizeInBytes = maxSizeInBytes - (maxSizeInBytes >>> 3); // 7/8 of max + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#close() + */ + public void close() throws IOException { + } + +} Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java Fri Oct 19 02:25:55 2012 @@ -1,256 +1,256 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy; -import org.apache.hadoop.contrib.index.example.LineDocInputFormat; -import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; - -/** - * This class provides the getters and the setters to a number of parameters. - * Most of the parameters are related to the index update and the rest are - * from the existing Map/Reduce parameters. - */ -public class IndexUpdateConfiguration { - final Configuration conf; - - /** - * Constructor - * @param conf - */ - public IndexUpdateConfiguration(Configuration conf) { - this.conf = conf; - } - - /** - * Get the underlying configuration object. - * @return the configuration - */ - public Configuration getConfiguration() { - return conf; - } - - // - // existing map/reduce properties - // - // public int getIOFileBufferSize() { - // return getInt("io.file.buffer.size", 4096); - // } - - /** - * Get the IO sort space in MB. - * @return the IO sort space in MB - */ - public int getIOSortMB() { - return conf.getInt(MRJobConfig.IO_SORT_MB, 100); - } - - /** - * Set the IO sort space in MB. - * @param mb the IO sort space in MB - */ - public void setIOSortMB(int mb) { - conf.setInt(MRJobConfig.IO_SORT_MB, mb); - } - - /** - * Get the Map/Reduce temp directory. - * @return the Map/Reduce temp directory - */ - public String getMapredTempDir() { - return conf.get(MRConfig.TEMP_DIR); - } - - // - // properties for index update - // - /** - * Get the distribution policy class. - * @return the distribution policy class - */ - public Class getDistributionPolicyClass() { - return conf.getClass("sea.distribution.policy", - HashingDistributionPolicy.class, IDistributionPolicy.class); - } - - /** - * Set the distribution policy class. - * @param theClass the distribution policy class - */ - public void setDistributionPolicyClass( - Class theClass) { - conf.setClass("sea.distribution.policy", theClass, - IDistributionPolicy.class); - } - - /** - * Get the analyzer class. - * @return the analyzer class - */ - public Class getDocumentAnalyzerClass() { - return conf.getClass("sea.document.analyzer", StandardAnalyzer.class, - Analyzer.class); - } - - /** - * Set the analyzer class. - * @param theClass the analyzer class - */ - public void setDocumentAnalyzerClass(Class theClass) { - conf.setClass("sea.document.analyzer", theClass, Analyzer.class); - } - - /** - * Get the index input format class. - * @return the index input format class - */ - public Class getIndexInputFormatClass() { - return conf.getClass("sea.input.format", LineDocInputFormat.class, - InputFormat.class); - } - - /** - * Set the index input format class. - * @param theClass the index input format class - */ - public void setIndexInputFormatClass(Class theClass) { - conf.setClass("sea.input.format", theClass, InputFormat.class); - } - - /** - * Get the index updater class. - * @return the index updater class - */ - public Class getIndexUpdaterClass() { - return conf.getClass("sea.index.updater", IndexUpdater.class, - IIndexUpdater.class); - } - - /** - * Set the index updater class. - * @param theClass the index updater class - */ - public void setIndexUpdaterClass(Class theClass) { - conf.setClass("sea.index.updater", theClass, IIndexUpdater.class); - } - - /** - * Get the local analysis class. - * @return the local analysis class - */ - public Class getLocalAnalysisClass() { - return conf.getClass("sea.local.analysis", LineDocLocalAnalysis.class, - ILocalAnalysis.class); - } - - /** - * Set the local analysis class. - * @param theClass the local analysis class - */ - public void setLocalAnalysisClass(Class theClass) { - conf.setClass("sea.local.analysis", theClass, ILocalAnalysis.class); - } - - /** - * Get the string representation of a number of shards. - * @return the string representation of a number of shards - */ - public String getIndexShards() { - return conf.get("sea.index.shards"); - } - - /** - * Set the string representation of a number of shards. - * @param shards the string representation of a number of shards - */ - public void setIndexShards(String shards) { - conf.set("sea.index.shards", shards); - } - - /** - * Get the max field length for a Lucene instance. - * @return the max field length for a Lucene instance - */ - public int getIndexMaxFieldLength() { - return conf.getInt("sea.max.field.length", -1); - } - - /** - * Set the max field length for a Lucene instance. - * @param maxFieldLength the max field length for a Lucene instance - */ - public void setIndexMaxFieldLength(int maxFieldLength) { - conf.setInt("sea.max.field.length", maxFieldLength); - } - - /** - * Get the max number of segments for a Lucene instance. - * @return the max number of segments for a Lucene instance - */ - public int getIndexMaxNumSegments() { - return conf.getInt("sea.max.num.segments", -1); - } - - /** - * Set the max number of segments for a Lucene instance. - * @param maxNumSegments the max number of segments for a Lucene instance - */ - public void setIndexMaxNumSegments(int maxNumSegments) { - conf.setInt("sea.max.num.segments", maxNumSegments); - } - - /** - * Check whether to use the compound file format for a Lucene instance. - * @return true if using the compound file format for a Lucene instance - */ - public boolean getIndexUseCompoundFile() { - return conf.getBoolean("sea.use.compound.file", false); - } - - /** - * Set whether use the compound file format for a Lucene instance. - * @param useCompoundFile whether to use the compound file format - */ - public void setIndexUseCompoundFile(boolean useCompoundFile) { - conf.setBoolean("sea.use.compound.file", useCompoundFile); - } - - /** - * Get the max ram index size in bytes. The default is 50M. - * @return the max ram index size in bytes - */ - public long getMaxRAMSizeInBytes() { - return conf.getLong("sea.max.ramsize.bytes", 50L << 20); - } - - /** - * Set the max ram index size in bytes. - * @param b the max ram index size in bytes - */ - public void setMaxRAMSizeInBytes(long b) { - conf.setLong("sea.max.ramsize.bytes", b); - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy; +import org.apache.hadoop.contrib.index.example.LineDocInputFormat; +import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; + +/** + * This class provides the getters and the setters to a number of parameters. + * Most of the parameters are related to the index update and the rest are + * from the existing Map/Reduce parameters. + */ +public class IndexUpdateConfiguration { + final Configuration conf; + + /** + * Constructor + * @param conf + */ + public IndexUpdateConfiguration(Configuration conf) { + this.conf = conf; + } + + /** + * Get the underlying configuration object. + * @return the configuration + */ + public Configuration getConfiguration() { + return conf; + } + + // + // existing map/reduce properties + // + // public int getIOFileBufferSize() { + // return getInt("io.file.buffer.size", 4096); + // } + + /** + * Get the IO sort space in MB. + * @return the IO sort space in MB + */ + public int getIOSortMB() { + return conf.getInt(MRJobConfig.IO_SORT_MB, 100); + } + + /** + * Set the IO sort space in MB. + * @param mb the IO sort space in MB + */ + public void setIOSortMB(int mb) { + conf.setInt(MRJobConfig.IO_SORT_MB, mb); + } + + /** + * Get the Map/Reduce temp directory. + * @return the Map/Reduce temp directory + */ + public String getMapredTempDir() { + return conf.get(MRConfig.TEMP_DIR); + } + + // + // properties for index update + // + /** + * Get the distribution policy class. + * @return the distribution policy class + */ + public Class getDistributionPolicyClass() { + return conf.getClass("sea.distribution.policy", + HashingDistributionPolicy.class, IDistributionPolicy.class); + } + + /** + * Set the distribution policy class. + * @param theClass the distribution policy class + */ + public void setDistributionPolicyClass( + Class theClass) { + conf.setClass("sea.distribution.policy", theClass, + IDistributionPolicy.class); + } + + /** + * Get the analyzer class. + * @return the analyzer class + */ + public Class getDocumentAnalyzerClass() { + return conf.getClass("sea.document.analyzer", StandardAnalyzer.class, + Analyzer.class); + } + + /** + * Set the analyzer class. + * @param theClass the analyzer class + */ + public void setDocumentAnalyzerClass(Class theClass) { + conf.setClass("sea.document.analyzer", theClass, Analyzer.class); + } + + /** + * Get the index input format class. + * @return the index input format class + */ + public Class getIndexInputFormatClass() { + return conf.getClass("sea.input.format", LineDocInputFormat.class, + InputFormat.class); + } + + /** + * Set the index input format class. + * @param theClass the index input format class + */ + public void setIndexInputFormatClass(Class theClass) { + conf.setClass("sea.input.format", theClass, InputFormat.class); + } + + /** + * Get the index updater class. + * @return the index updater class + */ + public Class getIndexUpdaterClass() { + return conf.getClass("sea.index.updater", IndexUpdater.class, + IIndexUpdater.class); + } + + /** + * Set the index updater class. + * @param theClass the index updater class + */ + public void setIndexUpdaterClass(Class theClass) { + conf.setClass("sea.index.updater", theClass, IIndexUpdater.class); + } + + /** + * Get the local analysis class. + * @return the local analysis class + */ + public Class getLocalAnalysisClass() { + return conf.getClass("sea.local.analysis", LineDocLocalAnalysis.class, + ILocalAnalysis.class); + } + + /** + * Set the local analysis class. + * @param theClass the local analysis class + */ + public void setLocalAnalysisClass(Class theClass) { + conf.setClass("sea.local.analysis", theClass, ILocalAnalysis.class); + } + + /** + * Get the string representation of a number of shards. + * @return the string representation of a number of shards + */ + public String getIndexShards() { + return conf.get("sea.index.shards"); + } + + /** + * Set the string representation of a number of shards. + * @param shards the string representation of a number of shards + */ + public void setIndexShards(String shards) { + conf.set("sea.index.shards", shards); + } + + /** + * Get the max field length for a Lucene instance. + * @return the max field length for a Lucene instance + */ + public int getIndexMaxFieldLength() { + return conf.getInt("sea.max.field.length", -1); + } + + /** + * Set the max field length for a Lucene instance. + * @param maxFieldLength the max field length for a Lucene instance + */ + public void setIndexMaxFieldLength(int maxFieldLength) { + conf.setInt("sea.max.field.length", maxFieldLength); + } + + /** + * Get the max number of segments for a Lucene instance. + * @return the max number of segments for a Lucene instance + */ + public int getIndexMaxNumSegments() { + return conf.getInt("sea.max.num.segments", -1); + } + + /** + * Set the max number of segments for a Lucene instance. + * @param maxNumSegments the max number of segments for a Lucene instance + */ + public void setIndexMaxNumSegments(int maxNumSegments) { + conf.setInt("sea.max.num.segments", maxNumSegments); + } + + /** + * Check whether to use the compound file format for a Lucene instance. + * @return true if using the compound file format for a Lucene instance + */ + public boolean getIndexUseCompoundFile() { + return conf.getBoolean("sea.use.compound.file", false); + } + + /** + * Set whether use the compound file format for a Lucene instance. + * @param useCompoundFile whether to use the compound file format + */ + public void setIndexUseCompoundFile(boolean useCompoundFile) { + conf.setBoolean("sea.use.compound.file", useCompoundFile); + } + + /** + * Get the max ram index size in bytes. The default is 50M. + * @return the max ram index size in bytes + */ + public long getMaxRAMSizeInBytes() { + return conf.getLong("sea.max.ramsize.bytes", 50L << 20); + } + + /** + * Set the max ram index size in bytes. + * @param b the max ram index size in bytes + */ + public void setMaxRAMSizeInBytes(long b) { + conf.setLong("sea.max.ramsize.bytes", b); + } + +} Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java Fri Oct 19 02:25:55 2012 @@ -1,199 +1,199 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.lucene.analysis.Analyzer; - -/** - * This class applies local analysis on a key-value pair and then convert the - * result docid-operation pair to a shard-and-intermediate form pair. - */ -public class IndexUpdateMapper - extends MapReduceBase implements Mapper { - static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class); - - /** - * Get the map output key class. - * @return the map output key class - */ - public static Class getMapOutputKeyClass() { - return Shard.class; - } - - /** - * Get the map output value class. - * @return the map output value class - */ - public static Class getMapOutputValueClass() { - return IntermediateForm.class; - } - - IndexUpdateConfiguration iconf; - private Analyzer analyzer; - private Shard[] shards; - private IDistributionPolicy distributionPolicy; - - private ILocalAnalysis localAnalysis; - private DocumentID tmpKey; - private DocumentAndOp tmpValue; - - private OutputCollector tmpCollector = - new OutputCollector() { - public void collect(DocumentID key, DocumentAndOp value) - throws IOException { - tmpKey = key; - tmpValue = value; - } - }; - - /** - * Map a key-value pair to a shard-and-intermediate form pair. Internally, - * the local analysis is first applied to map the key-value pair to a - * document id-and-operation pair, then the docid-and-operation pair is - * mapped to a shard-intermediate form pair. The intermediate form is of the - * form of a single-document ram index and/or a single delete term. - */ - public void map(K key, V value, - OutputCollector output, Reporter reporter) - throws IOException { - - synchronized (this) { - localAnalysis.map(key, value, tmpCollector, reporter); - - if (tmpKey != null && tmpValue != null) { - DocumentAndOp doc = tmpValue; - IntermediateForm form = new IntermediateForm(); - form.configure(iconf); - form.process(doc, analyzer); - form.closeWriter(); - - if (doc.getOp() == DocumentAndOp.Op.INSERT) { - int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey); - if (chosenShard >= 0) { - // insert into one shard - output.collect(shards[chosenShard], form); - } else { - throw new IOException("Chosen shard for insert must be >= 0"); - } - - } else if (doc.getOp() == DocumentAndOp.Op.DELETE) { - int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey); - if (chosenShard >= 0) { - // delete from one shard - output.collect(shards[chosenShard], form); - } else { - // broadcast delete to all shards - for (int i = 0; i < shards.length; i++) { - output.collect(shards[i], form); - } - } - - } else { // UPDATE - int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey); - int deleteFromShard = - distributionPolicy.chooseShardForDelete(tmpKey); - - if (insertToShard >= 0) { - if (insertToShard == deleteFromShard) { - // update into one shard - output.collect(shards[insertToShard], form); - } else { - // prepare a deletion form - IntermediateForm deletionForm = new IntermediateForm(); - deletionForm.configure(iconf); - deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE, - doc.getTerm()), analyzer); - deletionForm.closeWriter(); - - if (deleteFromShard >= 0) { - // delete from one shard - output.collect(shards[deleteFromShard], deletionForm); - } else { - // broadcast delete to all shards - for (int i = 0; i < shards.length; i++) { - output.collect(shards[i], deletionForm); - } - } - - // prepare an insertion form - IntermediateForm insertionForm = new IntermediateForm(); - insertionForm.configure(iconf); - insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT, - doc.getDocument()), analyzer); - insertionForm.closeWriter(); - - // insert into one shard - output.collect(shards[insertToShard], insertionForm); - } - } else { - throw new IOException("Chosen shard for insert must be >= 0"); - } - } - } - } - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) - */ - public void configure(JobConf job) { - iconf = new IndexUpdateConfiguration(job); - analyzer = - (Analyzer) ReflectionUtils.newInstance( - iconf.getDocumentAnalyzerClass(), job); - - localAnalysis = - (ILocalAnalysis) ReflectionUtils.newInstance( - iconf.getLocalAnalysisClass(), job); - localAnalysis.configure(job); - - shards = Shard.getIndexShards(iconf); - - distributionPolicy = - (IDistributionPolicy) ReflectionUtils.newInstance( - iconf.getDistributionPolicyClass(), job); - distributionPolicy.init(shards); - - LOG.info("sea.document.analyzer = " + analyzer.getClass().getName()); - LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName()); - LOG.info(shards.length + " shards = " + iconf.getIndexShards()); - LOG.info("sea.distribution.policy = " - + distributionPolicy.getClass().getName()); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#close() - */ - public void close() throws IOException { - localAnalysis.close(); - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.lucene.analysis.Analyzer; + +/** + * This class applies local analysis on a key-value pair and then convert the + * result docid-operation pair to a shard-and-intermediate form pair. + */ +public class IndexUpdateMapper + extends MapReduceBase implements Mapper { + static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class); + + /** + * Get the map output key class. + * @return the map output key class + */ + public static Class getMapOutputKeyClass() { + return Shard.class; + } + + /** + * Get the map output value class. + * @return the map output value class + */ + public static Class getMapOutputValueClass() { + return IntermediateForm.class; + } + + IndexUpdateConfiguration iconf; + private Analyzer analyzer; + private Shard[] shards; + private IDistributionPolicy distributionPolicy; + + private ILocalAnalysis localAnalysis; + private DocumentID tmpKey; + private DocumentAndOp tmpValue; + + private OutputCollector tmpCollector = + new OutputCollector() { + public void collect(DocumentID key, DocumentAndOp value) + throws IOException { + tmpKey = key; + tmpValue = value; + } + }; + + /** + * Map a key-value pair to a shard-and-intermediate form pair. Internally, + * the local analysis is first applied to map the key-value pair to a + * document id-and-operation pair, then the docid-and-operation pair is + * mapped to a shard-intermediate form pair. The intermediate form is of the + * form of a single-document ram index and/or a single delete term. + */ + public void map(K key, V value, + OutputCollector output, Reporter reporter) + throws IOException { + + synchronized (this) { + localAnalysis.map(key, value, tmpCollector, reporter); + + if (tmpKey != null && tmpValue != null) { + DocumentAndOp doc = tmpValue; + IntermediateForm form = new IntermediateForm(); + form.configure(iconf); + form.process(doc, analyzer); + form.closeWriter(); + + if (doc.getOp() == DocumentAndOp.Op.INSERT) { + int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey); + if (chosenShard >= 0) { + // insert into one shard + output.collect(shards[chosenShard], form); + } else { + throw new IOException("Chosen shard for insert must be >= 0"); + } + + } else if (doc.getOp() == DocumentAndOp.Op.DELETE) { + int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey); + if (chosenShard >= 0) { + // delete from one shard + output.collect(shards[chosenShard], form); + } else { + // broadcast delete to all shards + for (int i = 0; i < shards.length; i++) { + output.collect(shards[i], form); + } + } + + } else { // UPDATE + int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey); + int deleteFromShard = + distributionPolicy.chooseShardForDelete(tmpKey); + + if (insertToShard >= 0) { + if (insertToShard == deleteFromShard) { + // update into one shard + output.collect(shards[insertToShard], form); + } else { + // prepare a deletion form + IntermediateForm deletionForm = new IntermediateForm(); + deletionForm.configure(iconf); + deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE, + doc.getTerm()), analyzer); + deletionForm.closeWriter(); + + if (deleteFromShard >= 0) { + // delete from one shard + output.collect(shards[deleteFromShard], deletionForm); + } else { + // broadcast delete to all shards + for (int i = 0; i < shards.length; i++) { + output.collect(shards[i], deletionForm); + } + } + + // prepare an insertion form + IntermediateForm insertionForm = new IntermediateForm(); + insertionForm.configure(iconf); + insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT, + doc.getDocument()), analyzer); + insertionForm.closeWriter(); + + // insert into one shard + output.collect(shards[insertToShard], insertionForm); + } + } else { + throw new IOException("Chosen shard for insert must be >= 0"); + } + } + } + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) + */ + public void configure(JobConf job) { + iconf = new IndexUpdateConfiguration(job); + analyzer = + (Analyzer) ReflectionUtils.newInstance( + iconf.getDocumentAnalyzerClass(), job); + + localAnalysis = + (ILocalAnalysis) ReflectionUtils.newInstance( + iconf.getLocalAnalysisClass(), job); + localAnalysis.configure(job); + + shards = Shard.getIndexShards(iconf); + + distributionPolicy = + (IDistributionPolicy) ReflectionUtils.newInstance( + iconf.getDistributionPolicyClass(), job); + distributionPolicy.init(shards); + + LOG.info("sea.document.analyzer = " + analyzer.getClass().getName()); + LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName()); + LOG.info(shards.length + " shards = " + iconf.getIndexShards()); + LOG.info("sea.distribution.policy = " + + distributionPolicy.getClass().getName()); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#close() + */ + public void close() throws IOException { + localAnalysis.close(); + } + +} Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java Fri Oct 19 02:25:55 2012 @@ -1,60 +1,60 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Partitioner; - -/** - * This partitioner class puts the values of the same key - in this case the - * same shard - in the same partition. - */ -public class IndexUpdatePartitioner implements - Partitioner { - - private Shard[] shards; - private Map map; - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.Partitioner#getPartition(java.lang.Object, java.lang.Object, int) - */ - public int getPartition(Shard key, IntermediateForm value, int numPartitions) { - int partition = map.get(key).intValue(); - if (partition < numPartitions) { - return partition; - } else { - return numPartitions - 1; - } - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf) - */ - public void configure(JobConf job) { - shards = Shard.getIndexShards(new IndexUpdateConfiguration(job)); - map = new HashMap(); - for (int i = 0; i < shards.length; i++) { - map.put(shards[i], i); - } - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; + +/** + * This partitioner class puts the values of the same key - in this case the + * same shard - in the same partition. + */ +public class IndexUpdatePartitioner implements + Partitioner { + + private Shard[] shards; + private Map map; + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.Partitioner#getPartition(java.lang.Object, java.lang.Object, int) + */ + public int getPartition(Shard key, IntermediateForm value, int numPartitions) { + int partition = map.get(key).intValue(); + if (partition < numPartitions) { + return partition; + } else { + return numPartitions - 1; + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf) + */ + public void configure(JobConf job) { + shards = Shard.getIndexShards(new IndexUpdateConfiguration(job)); + map = new HashMap(); + for (int i = 0; i < shards.length; i++) { + map.put(shards[i], i); + } + } + +} Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java Fri Oct 19 02:25:55 2012 @@ -1,143 +1,143 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.contrib.index.lucene.ShardWriter; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Closeable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -/** - * This reducer applies to a shard the changes for it. A "new version" of - * a shard is created at the end of a reduce. It is important to note that - * the new version of the shard is not derived from scratch. By leveraging - * Lucene's update algorithm, the new version of each Lucene instance will - * share as many files as possible as the previous version. - */ -public class IndexUpdateReducer extends MapReduceBase implements - Reducer { - static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class); - static final Text DONE = new Text("done"); - - /** - * Get the reduce output key class. - * @return the reduce output key class - */ - public static Class getOutputKeyClass() { - return Shard.class; - } - - /** - * Get the reduce output value class. - * @return the reduce output value class - */ - public static Class getOutputValueClass() { - return Text.class; - } - - private IndexUpdateConfiguration iconf; - private String mapredTempDir; - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) - */ - public void reduce(Shard key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { - - LOG.info("Construct a shard writer for " + key); - FileSystem fs = FileSystem.get(iconf.getConfiguration()); - String temp = - mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis(); - final ShardWriter writer = new ShardWriter(fs, key, temp, iconf); - - // update the shard - while (values.hasNext()) { - IntermediateForm form = values.next(); - writer.process(form); - reporter.progress(); - } - - // close the shard - final Reporter fReporter = reporter; - new Closeable() { - volatile boolean closed = false; - - public void close() throws IOException { - // spawn a thread to give progress heartbeats - Thread prog = new Thread() { - public void run() { - while (!closed) { - try { - fReporter.setStatus("closing"); - Thread.sleep(1000); - } catch (InterruptedException e) { - continue; - } catch (Throwable e) { - return; - } - } - } - }; - - try { - prog.start(); - - if (writer != null) { - writer.close(); - } - } finally { - closed = true; - } - } - }.close(); - LOG.info("Closed the shard writer for " + key + ", writer = " + writer); - - output.collect(key, DONE); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) - */ - public void configure(JobConf job) { - iconf = new IndexUpdateConfiguration(job); - mapredTempDir = iconf.getMapredTempDir(); - mapredTempDir = Shard.normalizePath(mapredTempDir); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#close() - */ - public void close() throws IOException { - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.contrib.index.lucene.ShardWriter; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Closeable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * This reducer applies to a shard the changes for it. A "new version" of + * a shard is created at the end of a reduce. It is important to note that + * the new version of the shard is not derived from scratch. By leveraging + * Lucene's update algorithm, the new version of each Lucene instance will + * share as many files as possible as the previous version. + */ +public class IndexUpdateReducer extends MapReduceBase implements + Reducer { + static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class); + static final Text DONE = new Text("done"); + + /** + * Get the reduce output key class. + * @return the reduce output key class + */ + public static Class getOutputKeyClass() { + return Shard.class; + } + + /** + * Get the reduce output value class. + * @return the reduce output value class + */ + public static Class getOutputValueClass() { + return Text.class; + } + + private IndexUpdateConfiguration iconf; + private String mapredTempDir; + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) + */ + public void reduce(Shard key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + + LOG.info("Construct a shard writer for " + key); + FileSystem fs = FileSystem.get(iconf.getConfiguration()); + String temp = + mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis(); + final ShardWriter writer = new ShardWriter(fs, key, temp, iconf); + + // update the shard + while (values.hasNext()) { + IntermediateForm form = values.next(); + writer.process(form); + reporter.progress(); + } + + // close the shard + final Reporter fReporter = reporter; + new Closeable() { + volatile boolean closed = false; + + public void close() throws IOException { + // spawn a thread to give progress heartbeats + Thread prog = new Thread() { + public void run() { + while (!closed) { + try { + fReporter.setStatus("closing"); + Thread.sleep(1000); + } catch (InterruptedException e) { + continue; + } catch (Throwable e) { + return; + } + } + } + }; + + try { + prog.start(); + + if (writer != null) { + writer.close(); + } + } finally { + closed = true; + } + } + }.close(); + LOG.info("Closed the shard writer for " + key + ", writer = " + writer); + + output.collect(key, DONE); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) + */ + public void configure(JobConf job) { + iconf = new IndexUpdateConfiguration(job); + mapredTempDir = iconf.getMapredTempDir(); + mapredTempDir = Shard.normalizePath(mapredTempDir); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#close() + */ + public void close() throws IOException { + } + +}