| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.contrib.index.mapred; |
| |
| import java.io.IOException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory; |
| import org.apache.hadoop.contrib.index.lucene.LuceneUtil; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapred.FileInputFormat; |
| import org.apache.hadoop.mapred.FileOutputFormat; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.JobConf; |
| |
| /** |
| * An implementation of an index updater interface which creates a Map/Reduce |
| * job configuration and run the Map/Reduce job to analyze documents and update |
| * Lucene instances in parallel. |
| */ |
| public class IndexUpdater implements IIndexUpdater { |
| public static final Log LOG = LogFactory.getLog(IndexUpdater.class); |
| |
| public IndexUpdater() { |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.hadoop.contrib.index.mapred.IIndexUpdater#run(org.apache.hadoop.conf.Configuration, org.apache.hadoop.fs.Path[], org.apache.hadoop.fs.Path, int, org.apache.hadoop.contrib.index.mapred.Shard[]) |
| */ |
| public void run(Configuration conf, Path[] inputPaths, Path outputPath, |
| int numMapTasks, Shard[] shards) throws IOException { |
| JobConf jobConf = |
| createJob(conf, inputPaths, outputPath, numMapTasks, shards); |
| JobClient.runJob(jobConf); |
| } |
| |
| JobConf createJob(Configuration conf, Path[] inputPaths, Path outputPath, |
| int numMapTasks, Shard[] shards) throws IOException { |
| // set the starting generation for each shard |
| // when a reduce task fails, a new reduce task |
| // has to know where to re-start |
| setShardGeneration(conf, shards); |
| |
| // iconf.set sets properties in conf |
| IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf); |
| Shard.setIndexShards(iconf, shards); |
| |
| // MapTask.MapOutputBuffer uses JobContext.IO_SORT_MB to decide its max buffer size |
| // (max buffer size = 1/2 * JobContext.IO_SORT_MB). |
| // Here we half-en JobContext.IO_SORT_MB because we use the other half memory to |
| // build an intermediate form/index in Combiner. |
| iconf.setIOSortMB(iconf.getIOSortMB() / 2); |
| |
| // create the job configuration |
| JobConf jobConf = new JobConf(conf, IndexUpdater.class); |
| jobConf.setJobName(this.getClass().getName() + "_" |
| + System.currentTimeMillis()); |
| |
| // provided by application |
| FileInputFormat.setInputPaths(jobConf, inputPaths); |
| FileOutputFormat.setOutputPath(jobConf, outputPath); |
| |
| jobConf.setNumMapTasks(numMapTasks); |
| |
| // already set shards |
| jobConf.setNumReduceTasks(shards.length); |
| |
| jobConf.setInputFormat(iconf.getIndexInputFormatClass()); |
| |
| Path[] inputs = FileInputFormat.getInputPaths(jobConf); |
| StringBuilder buffer = new StringBuilder(inputs[0].toString()); |
| for (int i = 1; i < inputs.length; i++) { |
| buffer.append(","); |
| buffer.append(inputs[i].toString()); |
| } |
| LOG.info("mapred.input.dir = " + buffer.toString()); |
| LOG.info("mapreduce.output.fileoutputformat.outputdir = " + |
| FileOutputFormat.getOutputPath(jobConf).toString()); |
| LOG.info("mapreduce.job.maps = " + jobConf.getNumMapTasks()); |
| LOG.info("mapreduce.job.reduces = " + jobConf.getNumReduceTasks()); |
| LOG.info(shards.length + " shards = " + iconf.getIndexShards()); |
| // better if we don't create the input format instance |
| LOG.info("mapred.input.format.class = " |
| + jobConf.getInputFormat().getClass().getName()); |
| |
| // set by the system |
| jobConf.setMapOutputKeyClass(IndexUpdateMapper.getMapOutputKeyClass()); |
| jobConf.setMapOutputValueClass(IndexUpdateMapper.getMapOutputValueClass()); |
| jobConf.setOutputKeyClass(IndexUpdateReducer.getOutputKeyClass()); |
| jobConf.setOutputValueClass(IndexUpdateReducer.getOutputValueClass()); |
| |
| jobConf.setMapperClass(IndexUpdateMapper.class); |
| jobConf.setPartitionerClass(IndexUpdatePartitioner.class); |
| jobConf.setCombinerClass(IndexUpdateCombiner.class); |
| jobConf.setReducerClass(IndexUpdateReducer.class); |
| |
| jobConf.setOutputFormat(IndexUpdateOutputFormat.class); |
| |
| return jobConf; |
| } |
| |
| void setShardGeneration(Configuration conf, Shard[] shards) |
| throws IOException { |
| FileSystem fs = FileSystem.get(conf); |
| |
| for (int i = 0; i < shards.length; i++) { |
| Path path = new Path(shards[i].getDirectory()); |
| long generation = -1; |
| |
| if (fs.exists(path)) { |
| FileSystemDirectory dir = null; |
| |
| try { |
| dir = new FileSystemDirectory(fs, path, false, conf); |
| generation = LuceneUtil.getCurrentSegmentGeneration(dir); |
| } finally { |
| if (dir != null) { |
| dir.close(); |
| } |
| } |
| } |
| |
| if (generation != shards[i].getGeneration()) { |
| // set the starting generation for the shard |
| shards[i] = |
| new Shard(shards[i].getVersion(), shards[i].getDirectory(), |
| generation); |
| } |
| } |
| } |
| } |