blob: 9b014112d29fa482900837d932b32eedc055ea70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.crawl;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.invoke.MethodHandles;
import java.net.URLDecoder;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.URLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Generic deduplicator which groups fetched URLs with the same digest and marks
* all of them as duplicate except the one with the highest score (based on the
* score in the crawldb, which is not necessarily the same as the score
* indexed). If two (or more) documents have the same score, then the document
* with the latest timestamp is kept. If the documents have the same timestamp
* then the one with the shortest URL is kept. The documents marked as duplicate
* can then be deleted with the command CleaningJob.
***/
public class DeduplicationJob extends NutchTool implements Tool {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
private final static Text urlKey = new Text("_URLTEMPKEY_");
private final static String DEDUPLICATION_GROUP_MODE = "deduplication.group.mode";
private final static String DEDUPLICATION_COMPARE_ORDER = "deduplication.compare.order";
public static class DBFilter extends
Mapper<Text, CrawlDatum, BytesWritable, CrawlDatum> {
private String groupMode;
@Override
public void setup(Mapper<Text, CrawlDatum, BytesWritable, CrawlDatum>.Context context) {
Configuration arg0 = context.getConfiguration();
groupMode = arg0.get(DEDUPLICATION_GROUP_MODE);
}
@Override
public void map(Text key, CrawlDatum value,
Context context)
throws IOException, InterruptedException {
if (value.getStatus() == CrawlDatum.STATUS_DB_FETCHED
|| value.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
// || value.getStatus() ==CrawlDatum.STATUS_DB_GONE){
byte[] signature = value.getSignature();
if (signature == null)
return;
String url = key.toString();
BytesWritable sig = null;
byte[] data;
switch (groupMode) {
case "none":
sig = new BytesWritable(signature);
break;
case "host":
byte[] host = URLUtil.getHost(url).getBytes();
data = new byte[signature.length + host.length];
System.arraycopy(signature, 0, data, 0, signature.length);
System.arraycopy(host, 0, data, signature.length, host.length);
sig = new BytesWritable(data);
break;
case "domain":
byte[] domain = URLUtil.getDomainName(url).getBytes();
data = new byte[signature.length + domain.length];
System.arraycopy(signature, 0, data, 0, signature.length);
System.arraycopy(domain, 0, data, signature.length, domain.length);
sig = new BytesWritable(data);
break;
}
// add the URL as a temporary MD
value.getMetaData().put(urlKey, key);
// reduce on the signature optionally grouped on host or domain or not at all
context.write(sig, value);
}
}
}
public static class DedupReducer extends
Reducer<BytesWritable, CrawlDatum, Text, CrawlDatum> {
private String[] compareOrder;
@Override
public void setup(Reducer<BytesWritable, CrawlDatum, Text, CrawlDatum>.Context context) {
Configuration conf = context.getConfiguration();
compareOrder = conf.get(DEDUPLICATION_COMPARE_ORDER).split(",");
}
private void writeOutAsDuplicate(CrawlDatum datum,
Context context)
throws IOException, InterruptedException {
datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE);
Text key = (Text) datum.getMetaData().remove(urlKey);
context.getCounter("DeduplicationJobStatus",
"Documents marked as duplicate").increment(1);
context.write(key, datum);
}
@Override
public void reduce(BytesWritable key, Iterable<CrawlDatum> values,
Context context) throws IOException, InterruptedException {
CrawlDatum existingDoc = null;
for (CrawlDatum newDoc : values) {
if (existingDoc == null) {
existingDoc = new CrawlDatum();
existingDoc.set(newDoc);
continue;
}
CrawlDatum duplicate = getDuplicate(existingDoc, newDoc);
if (duplicate != null) {
writeOutAsDuplicate(duplicate, context);
if (duplicate == existingDoc) {
// keep new
existingDoc.set(newDoc);
}
}
}
}
private CrawlDatum getDuplicate(CrawlDatum existingDoc, CrawlDatum newDoc)
throws IOException {
for (int i = 0; i < compareOrder.length; i++) {
switch (compareOrder[i]) {
case "score":
// compare based on score
if (existingDoc.getScore() < newDoc.getScore()) {
return existingDoc;
} else if (existingDoc.getScore() > newDoc.getScore()) {
// mark new one as duplicate
return newDoc;
}
break;
case "fetchTime":
// same score? delete the one which is oldest
if (existingDoc.getFetchTime() > newDoc.getFetchTime()) {
// mark new one as duplicate
return newDoc;
} else if (existingDoc.getFetchTime() < newDoc.getFetchTime()) {
// mark existing one as duplicate
return existingDoc;
}
break;
case "httpsOverHttp":
// prefer https:// over http:// if URLs are identical except for the
// protocol
String url1 = existingDoc.getMetaData().get(urlKey).toString();
String url2 = newDoc.getMetaData().get(urlKey).toString();
if (url1.startsWith("https://") && url2.startsWith("http://")
&& url1.substring(8).equals(url2.substring(7))) {
// existingDoc with https://, mark newDoc as duplicate
return newDoc;
} else if (url2.startsWith("https://") && url1.startsWith("http://")
&& url2.substring(8).equals(url1.substring(7))) {
// newDoc with https://, mark existingDoc as duplicate
return existingDoc;
}
break;
case "urlLength":
// same time? keep the one which has the shortest URL
String urlExisting;
String urlnewDoc;
try {
urlExisting = URLDecoder.decode(
existingDoc.getMetaData().get(urlKey).toString(), "UTF8");
urlnewDoc = URLDecoder
.decode(newDoc.getMetaData().get(urlKey).toString(), "UTF8");
} catch (UnsupportedEncodingException e) {
LOG.error("Error decoding: " + urlKey);
throw new IOException("UnsupportedEncodingException for " + urlKey);
}
if (urlExisting.length() < urlnewDoc.length()) {
// mark new one as duplicate
return newDoc;
} else if (urlExisting.length() > urlnewDoc.length()) {
// mark existing one as duplicate
return existingDoc;
}
break;
}
}
return null; // no decision possible
}
}
/** Combine multiple new entries for a url. */
public static class StatusUpdateReducer extends
Reducer<Text, CrawlDatum, Text, CrawlDatum> {
@Override
public void setup(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
}
private CrawlDatum old = new CrawlDatum();
private CrawlDatum duplicate = new CrawlDatum();
@Override
public void reduce(Text key, Iterable<CrawlDatum> values,
Context context)
throws IOException, InterruptedException {
boolean duplicateSet = false;
for (CrawlDatum val : values) {
if (val.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
duplicate.set(val);
duplicateSet = true;
} else {
old.set(val);
}
}
// keep the duplicate if there is one
if (duplicateSet) {
context.write(key, duplicate);
return;
}
// no duplicate? keep old one then
context.write(key, old);
}
}
public int run(String[] args) throws IOException {
if (args.length < 1) {
System.err.println("Usage: DeduplicationJob <crawldb> [-group <none|host|domain>] [-compareOrder <score>,<fetchTime>,<httpsOverHttp>,<urlLength>]");
return 1;
}
String group = "none";
Path crawlDb = new Path(args[0]);
String compareOrder = "score,fetchTime,urlLength";
for (int i = 1; i < args.length; i++) {
if (args[i].equals("-group"))
group = args[++i];
if (args[i].equals("-compareOrder")) {
compareOrder = args[++i];
if (compareOrder.indexOf("score") == -1 ||
compareOrder.indexOf("fetchTime") == -1 ||
compareOrder.indexOf("urlLength") == -1) {
System.err.println("DeduplicationJob: compareOrder must contain score, fetchTime and urlLength.");
return 1;
}
}
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("DeduplicationJob: starting at " + sdf.format(start));
Path tempDir = new Path(crawlDb, "dedup-temp-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
Job job = NutchJob.getInstance(getConf());
Configuration conf = job.getConfiguration();
job.setJobName("Deduplication on " + crawlDb);
conf.set(DEDUPLICATION_GROUP_MODE, group);
conf.set(DEDUPLICATION_COMPARE_ORDER, compareOrder);
job.setJarByClass(DeduplicationJob.class);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormatClass(SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(job, tempDir);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(CrawlDatum.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setMapperClass(DBFilter.class);
job.setReducerClass(DedupReducer.class);
FileSystem fs = tempDir.getFileSystem(getConf());
try {
boolean success = job.waitForCompletion(true);
if (!success) {
String message = "Crawl job did not succeed, job status:"
+ job.getStatus().getState() + ", reason: "
+ job.getStatus().getFailureInfo();
LOG.error(message);
fs.delete(tempDir, true);
throw new RuntimeException(message);
}
CounterGroup g = job.getCounters().getGroup("DeduplicationJobStatus");
if (g != null) {
Counter counter = g.findCounter("Documents marked as duplicate");
long dups = counter.getValue();
LOG.info("Deduplication: " + (int) dups
+ " documents marked as duplicates");
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("DeduplicationJob: " + StringUtils.stringifyException(e));
fs.delete(tempDir, true);
return -1;
}
// merge with existing crawl db
if (LOG.isInfoEnabled()) {
LOG.info("Deduplication: Updating status of duplicate urls into crawl db.");
}
Job mergeJob = CrawlDb.createJob(getConf(), crawlDb);
FileInputFormat.addInputPath(mergeJob, tempDir);
mergeJob.setReducerClass(StatusUpdateReducer.class);
mergeJob.setJarByClass(DeduplicationJob.class);
fs = crawlDb.getFileSystem(getConf());
Path outPath = FileOutputFormat.getOutputPath(job);
Path lock = CrawlDb.lock(getConf(), crawlDb, false);
try {
boolean success = mergeJob.waitForCompletion(true);
if (!success) {
String message = "Crawl job did not succeed, job status:"
+ mergeJob.getStatus().getState() + ", reason: "
+ mergeJob.getStatus().getFailureInfo();
LOG.error(message);
fs.delete(tempDir, true);
NutchJob.cleanupAfterFailure(outPath, lock, fs);
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("DeduplicationMergeJob: " + StringUtils.stringifyException(e));
fs.delete(tempDir, true);
NutchJob.cleanupAfterFailure(outPath, lock, fs);
return -1;
}
CrawlDb.install(mergeJob, crawlDb);
// clean up
fs.delete(tempDir, true);
long end = System.currentTimeMillis();
LOG.info("Deduplication finished at " + sdf.format(end) + ", elapsed: "
+ TimingUtil.elapsedTime(start, end));
return 0;
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(NutchConfiguration.create(),
new DeduplicationJob(), args);
System.exit(result);
}
@Override
public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
Map<String, Object> results = new HashMap<>();
String[] arg = new String[1];
String crawldb;
if(args.containsKey(Nutch.ARG_CRAWLDB)) {
crawldb = (String)args.get(Nutch.ARG_CRAWLDB);
}
else {
crawldb = crawlId+"/crawldb";
}
arg[0] = crawldb;
int res = run(arg);
results.put(Nutch.VAL_RESULT, Integer.toString(res));
return results;
}
}