blob: e6b39b5ddc66945765bb8ffa9dd9ba8b74abe10a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.index.mapred;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.lucene.analysis.Analyzer;
/**
* This class applies local analysis on a key-value pair and then convert the
* result docid-operation pair to a shard-and-intermediate form pair.
*/
public class IndexUpdateMapper<K extends WritableComparable, V extends Writable>
extends MapReduceBase implements Mapper<K, V, Shard, IntermediateForm> {
static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class);
/**
* Get the map output key class.
* @return the map output key class
*/
public static Class<? extends WritableComparable> getMapOutputKeyClass() {
return Shard.class;
}
/**
* Get the map output value class.
* @return the map output value class
*/
public static Class<? extends Writable> getMapOutputValueClass() {
return IntermediateForm.class;
}
IndexUpdateConfiguration iconf;
private Analyzer analyzer;
private Shard[] shards;
private IDistributionPolicy distributionPolicy;
private ILocalAnalysis<K, V> localAnalysis;
private DocumentID tmpKey;
private DocumentAndOp tmpValue;
private OutputCollector<DocumentID, DocumentAndOp> tmpCollector =
new OutputCollector<DocumentID, DocumentAndOp>() {
public void collect(DocumentID key, DocumentAndOp value)
throws IOException {
tmpKey = key;
tmpValue = value;
}
};
/**
* Map a key-value pair to a shard-and-intermediate form pair. Internally,
* the local analysis is first applied to map the key-value pair to a
* document id-and-operation pair, then the docid-and-operation pair is
* mapped to a shard-intermediate form pair. The intermediate form is of the
* form of a single-document ram index and/or a single delete term.
*/
public void map(K key, V value,
OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
throws IOException {
synchronized (this) {
localAnalysis.map(key, value, tmpCollector, reporter);
if (tmpKey != null && tmpValue != null) {
DocumentAndOp doc = tmpValue;
IntermediateForm form = new IntermediateForm();
form.configure(iconf);
form.process(doc, analyzer);
form.closeWriter();
if (doc.getOp() == DocumentAndOp.Op.INSERT) {
int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey);
if (chosenShard >= 0) {
// insert into one shard
output.collect(shards[chosenShard], form);
} else {
throw new IOException("Chosen shard for insert must be >= 0");
}
} else if (doc.getOp() == DocumentAndOp.Op.DELETE) {
int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey);
if (chosenShard >= 0) {
// delete from one shard
output.collect(shards[chosenShard], form);
} else {
// broadcast delete to all shards
for (int i = 0; i < shards.length; i++) {
output.collect(shards[i], form);
}
}
} else { // UPDATE
int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey);
int deleteFromShard =
distributionPolicy.chooseShardForDelete(tmpKey);
if (insertToShard >= 0) {
if (insertToShard == deleteFromShard) {
// update into one shard
output.collect(shards[insertToShard], form);
} else {
// prepare a deletion form
IntermediateForm deletionForm = new IntermediateForm();
deletionForm.configure(iconf);
deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE,
doc.getTerm()), analyzer);
deletionForm.closeWriter();
if (deleteFromShard >= 0) {
// delete from one shard
output.collect(shards[deleteFromShard], deletionForm);
} else {
// broadcast delete to all shards
for (int i = 0; i < shards.length; i++) {
output.collect(shards[i], deletionForm);
}
}
// prepare an insertion form
IntermediateForm insertionForm = new IntermediateForm();
insertionForm.configure(iconf);
insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT,
doc.getDocument()), analyzer);
insertionForm.closeWriter();
// insert into one shard
output.collect(shards[insertToShard], insertionForm);
}
} else {
throw new IOException("Chosen shard for insert must be >= 0");
}
}
}
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
*/
public void configure(JobConf job) {
iconf = new IndexUpdateConfiguration(job);
analyzer =
(Analyzer) ReflectionUtils.newInstance(
iconf.getDocumentAnalyzerClass(), job);
localAnalysis =
(ILocalAnalysis) ReflectionUtils.newInstance(
iconf.getLocalAnalysisClass(), job);
localAnalysis.configure(job);
shards = Shard.getIndexShards(iconf);
distributionPolicy =
(IDistributionPolicy) ReflectionUtils.newInstance(
iconf.getDistributionPolicyClass(), job);
distributionPolicy.init(shards);
LOG.info("sea.document.analyzer = " + analyzer.getClass().getName());
LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName());
LOG.info(shards.length + " shards = " + iconf.getIndexShards());
LOG.info("sea.distribution.policy = "
+ distributionPolicy.getClass().getName());
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.MapReduceBase#close()
*/
public void close() throws IOException {
localAnalysis.close();
}
}