blob: 59035dc662ae98657e47b4095d38c79e686f4367 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.wikisearch.ingest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.IteratorSetting.Column;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;
import org.apache.accumulo.examples.wikisearch.output.SortingRFileOutputFormat;
import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
public class WikipediaPartitionedIngester extends Configured implements Tool {
private static final Logger log = Logger.getLogger(WikipediaPartitionedIngester.class);
public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
public final static String SPLIT_FILE = "wikipedia.split_file";
public final static String TABLE_NAME = "wikipedia.table";
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WikipediaPartitionedIngester(), args);
System.exit(res);
}
private void createTables(TableOperations tops, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
TableExistsException {
// Create the shard table
String indexTableName = tableName + "Index";
String reverseIndexTableName = tableName + "ReverseIndex";
String metadataTableName = tableName + "Metadata";
// create the shard table
if (!tops.exists(tableName)) {
// Set a text index combiner on the given field names. No combiner is set if the option is not supplied
String textIndexFamilies = WikipediaMapper.TOKENS_FIELD_NAME;
tops.create(tableName);
if (textIndexFamilies.length() > 0) {
System.out.println("Adding content combiner on the fields: " + textIndexFamilies);
IteratorSetting setting = new IteratorSetting(10, TextIndexCombiner.class);
List<Column> columns = new ArrayList<Column>();
for (String family : StringUtils.split(textIndexFamilies, ',')) {
columns.add(new Column("fi\0" + family));
}
TextIndexCombiner.setColumns(setting, columns);
TextIndexCombiner.setLossyness(setting, true);
tops.attachIterator(tableName, setting, EnumSet.allOf(IteratorScope.class));
}
// Set the locality group for the full content column family
tops.setLocalityGroups(tableName, Collections.singletonMap("WikipediaDocuments", Collections.singleton(new Text(WikipediaMapper.DOCUMENT_COLUMN_FAMILY))));
}
if (!tops.exists(indexTableName)) {
tops.create(indexTableName);
// Add the UID combiner
IteratorSetting setting = new IteratorSetting(19, "UIDAggregator", GlobalIndexUidCombiner.class);
GlobalIndexUidCombiner.setCombineAllColumns(setting, true);
GlobalIndexUidCombiner.setLossyness(setting, true);
tops.attachIterator(indexTableName, setting, EnumSet.allOf(IteratorScope.class));
}
if (!tops.exists(reverseIndexTableName)) {
tops.create(reverseIndexTableName);
// Add the UID combiner
IteratorSetting setting = new IteratorSetting(19, "UIDAggregator", GlobalIndexUidCombiner.class);
GlobalIndexUidCombiner.setCombineAllColumns(setting, true);
GlobalIndexUidCombiner.setLossyness(setting, true);
tops.attachIterator(reverseIndexTableName, setting, EnumSet.allOf(IteratorScope.class));
}
if (!tops.exists(metadataTableName)) {
// Add the SummingCombiner with VARLEN encoding for the frequency column
tops.create(metadataTableName);
IteratorSetting setting = new IteratorSetting(10, SummingCombiner.class);
SummingCombiner.setColumns(setting, Collections.singletonList(new Column("f")));
SummingCombiner.setEncodingType(setting, SummingCombiner.Type.VARLEN);
tops.attachIterator(metadataTableName, setting, EnumSet.allOf(IteratorScope.class));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
if(WikipediaConfiguration.runPartitioner(conf))
{
int result = runPartitionerJob();
if(result != 0)
return result;
}
if(WikipediaConfiguration.runIngest(conf))
{
int result = runIngestJob();
if(result != 0)
return result;
if(WikipediaConfiguration.bulkIngest(conf))
return loadBulkFiles();
}
return 0;
}
private int runPartitionerJob() throws Exception
{
Job partitionerJob = new Job(getConf(), "Partition Wikipedia");
Configuration partitionerConf = partitionerJob.getConfiguration();
partitionerConf.set("mapred.map.tasks.speculative.execution", "false");
configurePartitionerJob(partitionerJob);
List<Path> inputPaths = new ArrayList<Path>();
SortedSet<String> languages = new TreeSet<String>();
FileSystem fs = FileSystem.get(partitionerConf);
Path parent = new Path(partitionerConf.get("wikipedia.input"));
listFiles(parent, fs, inputPaths, languages);
System.out.println("Input files in " + parent + ":" + inputPaths.size());
Path[] inputPathsArray = new Path[inputPaths.size()];
inputPaths.toArray(inputPathsArray);
System.out.println("Languages:" + languages.size());
// setup input format
WikipediaInputFormat.setInputPaths(partitionerJob, inputPathsArray);
partitionerJob.setMapperClass(WikipediaPartitioner.class);
partitionerJob.setNumReduceTasks(0);
// setup output format
partitionerJob.setMapOutputKeyClass(Text.class);
partitionerJob.setMapOutputValueClass(Article.class);
partitionerJob.setOutputKeyClass(Text.class);
partitionerJob.setOutputValueClass(Article.class);
partitionerJob.setOutputFormatClass(SequenceFileOutputFormat.class);
Path outputDir = WikipediaConfiguration.getPartitionedArticlesPath(partitionerConf);
SequenceFileOutputFormat.setOutputPath(partitionerJob, outputDir);
SequenceFileOutputFormat.setCompressOutput(partitionerJob, true);
SequenceFileOutputFormat.setOutputCompressionType(partitionerJob, CompressionType.RECORD);
return partitionerJob.waitForCompletion(true) ? 0 : 1;
}
private int runIngestJob() throws Exception
{
Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia");
Configuration ingestConf = ingestJob.getConfiguration();
ingestConf.set("mapred.map.tasks.speculative.execution", "false");
configureIngestJob(ingestJob);
String tablename = WikipediaConfiguration.getTableName(ingestConf);
Connector connector = WikipediaConfiguration.getConnector(ingestConf);
TableOperations tops = connector.tableOperations();
createTables(tops, tablename);
ingestJob.setMapperClass(WikipediaPartitionedMapper.class);
ingestJob.setNumReduceTasks(0);
// setup input format
ingestJob.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.setInputPaths(ingestJob, WikipediaConfiguration.getPartitionedArticlesPath(ingestConf));
// TODO make split size configurable
SequenceFileInputFormat.setMinInputSplitSize(ingestJob, WikipediaConfiguration.getMinInputSplitSize(ingestConf));
// setup output format
ingestJob.setMapOutputKeyClass(Text.class);
ingestJob.setMapOutputValueClass(Mutation.class);
if(WikipediaConfiguration.bulkIngest(ingestConf))
{
ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
String bulkIngestDir = WikipediaConfiguration.bulkIngestDir(ingestConf);
if(bulkIngestDir == null)
{
log.error("Bulk ingest dir not set");
return 1;
}
SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
} else {
ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
String user = WikipediaConfiguration.getUser(ingestConf);
byte[] password = WikipediaConfiguration.getPassword(ingestConf);
AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename);
AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers);
}
return ingestJob.waitForCompletion(true) ? 0 : 1;
}
private int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException
{
Configuration conf = getConf();
Connector connector = WikipediaConfiguration.getConnector(conf);
FileSystem fs = FileSystem.get(conf);
String directory = WikipediaConfiguration.bulkIngestDir(conf);
String failureDirectory = WikipediaConfiguration.bulkIngestFailureDir(conf);
for(FileStatus status: fs.listStatus(new Path(directory)))
{
if(status.isDir() == false)
continue;
Path dir = status.getPath();
Path failPath = new Path(failureDirectory+"/"+dir.getName());
fs.mkdirs(failPath);
connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failPath.toString(), true);
}
return 0;
}
public final static PathFilter partFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith("part");
};
};
protected void configurePartitionerJob(Job job) {
Configuration conf = job.getConfiguration();
job.setJarByClass(WikipediaPartitionedIngester.class);
job.setInputFormatClass(WikipediaInputFormat.class);
conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
}
protected void configureIngestJob(Job job) {
job.setJarByClass(WikipediaPartitionedIngester.class);
}
protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
protected void listFiles(Path path, FileSystem fs, List<Path> files, Set<String> languages) throws IOException {
for (FileStatus status : fs.listStatus(path)) {
if (status.isDir()) {
listFiles(status.getPath(), fs, files, languages);
} else {
Path p = status.getPath();
Matcher matcher = filePattern.matcher(p.getName());
if (matcher.matches()) {
languages.add(matcher.group(1));
files.add(p);
}
}
}
}
}