| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nutch.crawl; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Random; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.nutch.metadata.Nutch; |
| import org.apache.nutch.util.FSUtils; |
| import org.apache.nutch.util.HadoopFSUtil; |
| import org.apache.nutch.util.LockUtil; |
| import org.apache.nutch.util.NutchConfiguration; |
| import org.apache.nutch.util.NutchJob; |
| import org.apache.nutch.util.NutchTool; |
| import org.apache.nutch.util.TimingUtil; |
| |
| /** |
| * This class takes the output of the fetcher and updates the crawldb |
| * accordingly. |
| */ |
| public class CrawlDb extends NutchTool implements Tool { |
| private static final Logger LOG = LoggerFactory |
| .getLogger(MethodHandles.lookup().lookupClass()); |
| |
| public static final String CRAWLDB_ADDITIONS_ALLOWED = "db.update.additions.allowed"; |
| |
| public static final String CRAWLDB_PURGE_404 = "db.update.purge.404"; |
| public static final String CRAWLDB_PURGE_ORPHANS = "db.update.purge.orphans"; |
| |
| public static final String CURRENT_NAME = "current"; |
| |
| public static final String LOCK_NAME = ".locked"; |
| |
| public CrawlDb() { |
| } |
| |
| public CrawlDb(Configuration conf) { |
| setConf(conf); |
| } |
| |
| public void update(Path crawlDb, Path[] segments, boolean normalize, |
| boolean filter) throws IOException, InterruptedException, ClassNotFoundException { |
| boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, |
| true); |
| update(crawlDb, segments, normalize, filter, additionsAllowed, false); |
| } |
| |
| public void update(Path crawlDb, Path[] segments, boolean normalize, |
| boolean filter, boolean additionsAllowed, boolean force) |
| throws IOException, InterruptedException, ClassNotFoundException { |
| Path lock = lock(getConf(), crawlDb, force); |
| |
| SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
| long start = System.currentTimeMillis(); |
| |
| Job job = CrawlDb.createJob(getConf(), crawlDb); |
| Configuration conf = job.getConfiguration(); |
| conf.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed); |
| conf.setBoolean(CrawlDbFilter.URL_FILTERING, filter); |
| conf.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize); |
| |
| boolean url404Purging = conf.getBoolean(CRAWLDB_PURGE_404, false); |
| |
| if (LOG.isInfoEnabled()) { |
| LOG.info("CrawlDb update: starting at " + sdf.format(start)); |
| LOG.info("CrawlDb update: db: " + crawlDb); |
| LOG.info("CrawlDb update: segments: " + Arrays.asList(segments)); |
| LOG.info("CrawlDb update: additions allowed: " + additionsAllowed); |
| LOG.info("CrawlDb update: URL normalizing: " + normalize); |
| LOG.info("CrawlDb update: URL filtering: " + filter); |
| LOG.info("CrawlDb update: 404 purging: " + url404Purging); |
| } |
| |
| for (int i = 0; i < segments.length; i++) { |
| FileSystem sfs = segments[i].getFileSystem(getConf()); |
| Path fetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME); |
| Path parse = new Path(segments[i], CrawlDatum.PARSE_DIR_NAME); |
| if (sfs.exists(fetch)) { |
| FileInputFormat.addInputPath(job, fetch); |
| if (sfs.exists(parse)) { |
| FileInputFormat.addInputPath(job, parse); |
| } else { |
| LOG.info(" - adding fetched but unparsed segment " + segments[i]); |
| } |
| } else { |
| LOG.info(" - skipping invalid segment " + segments[i]); |
| } |
| } |
| |
| if (LOG.isInfoEnabled()) { |
| LOG.info("CrawlDb update: Merging segment data into db."); |
| } |
| |
| FileSystem fs = crawlDb.getFileSystem(getConf()); |
| Path outPath = FileOutputFormat.getOutputPath(job); |
| try { |
| boolean success = job.waitForCompletion(true); |
| if (!success) { |
| String message = "CrawlDb update job did not succeed, job status:" |
| + job.getStatus().getState() + ", reason: " |
| + job.getStatus().getFailureInfo(); |
| LOG.error(message); |
| NutchJob.cleanupAfterFailure(outPath, lock, fs); |
| throw new RuntimeException(message); |
| } |
| } catch (IOException | InterruptedException | ClassNotFoundException e) { |
| LOG.error("CrawlDb update job failed: {}", e.getMessage()); |
| NutchJob.cleanupAfterFailure(outPath, lock, fs); |
| throw e; |
| } |
| |
| CrawlDb.install(job, crawlDb); |
| |
| if (filter) { |
| long urlsFiltered = job.getCounters() |
| .findCounter("CrawlDB filter", "URLs filtered").getValue(); |
| LOG.info( |
| "CrawlDb update: Total number of existing URLs in CrawlDb rejected by URL filters: {}", |
| urlsFiltered); |
| } |
| |
| long end = System.currentTimeMillis(); |
| LOG.info("CrawlDb update: finished at " + sdf.format(end) + ", elapsed: " |
| + TimingUtil.elapsedTime(start, end)); |
| } |
| |
| /* |
| * Configure a new CrawlDb in a temp folder at crawlDb/<rand> |
| */ |
| public static Job createJob(Configuration config, Path crawlDb) |
| throws IOException { |
| Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random() |
| .nextInt(Integer.MAX_VALUE))); |
| |
| Job job = NutchJob.getInstance(config); |
| job.setJobName("crawldb " + crawlDb); |
| |
| Path current = new Path(crawlDb, CURRENT_NAME); |
| if (current.getFileSystem(job.getConfiguration()).exists(current)) { |
| FileInputFormat.addInputPath(job, current); |
| } |
| job.setInputFormatClass(SequenceFileInputFormat.class); |
| |
| job.setMapperClass(CrawlDbFilter.class); |
| job.setReducerClass(CrawlDbReducer.class); |
| job.setJarByClass(CrawlDb.class); |
| |
| FileOutputFormat.setOutputPath(job, newCrawlDb); |
| job.setOutputFormatClass(MapFileOutputFormat.class); |
| job.setOutputKeyClass(Text.class); |
| job.setOutputValueClass(CrawlDatum.class); |
| |
| // https://issues.apache.org/jira/browse/NUTCH-1110 |
| job.getConfiguration().setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); |
| |
| return job; |
| } |
| |
| public static Path lock(Configuration job, Path crawlDb, boolean force) throws IOException { |
| Path lock = new Path(crawlDb, LOCK_NAME); |
| LockUtil.createLockFile(job, lock, force); |
| return lock; |
| } |
| |
| private static void install(Configuration conf, Path crawlDb, Path tempCrawlDb) |
| throws IOException { |
| boolean preserveBackup = conf.getBoolean("db.preserve.backup", true); |
| FileSystem fs = crawlDb.getFileSystem(conf); |
| Path old = new Path(crawlDb, "old"); |
| Path current = new Path(crawlDb, CURRENT_NAME); |
| if (fs.exists(current)) { |
| FSUtils.replace(fs, old, current, true); |
| } |
| FSUtils.replace(fs, current, tempCrawlDb, true); |
| Path lock = new Path(crawlDb, LOCK_NAME); |
| LockUtil.removeLockFile(fs, lock); |
| if (!preserveBackup && fs.exists(old)) { |
| fs.delete(old, true); |
| } |
| } |
| |
| |
| public static void install(Job job, Path crawlDb) throws IOException { |
| Configuration conf = job.getConfiguration(); |
| Path tempCrawlDb = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat |
| .getOutputPath(job); |
| install(conf, crawlDb, tempCrawlDb); |
| } |
| |
| public static void main(String[] args) throws Exception { |
| int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDb(), args); |
| System.exit(res); |
| } |
| |
| public int run(String[] args) throws Exception { |
| if (args.length < 1) { |
| System.err |
| .println("Usage: CrawlDb <crawldb> (-dir <segments> | <seg1> <seg2> ...) [-force] [-normalize] [-filter] [-noAdditions]"); |
| System.err.println("\tcrawldb\tCrawlDb to update"); |
| System.err |
| .println("\t-dir segments\tparent directory containing all segments to update from"); |
| System.err |
| .println("\tseg1 seg2 ...\tlist of segment names to update from"); |
| System.err |
| .println("\t-force\tforce update even if CrawlDb appears to be locked (CAUTION advised)"); |
| System.err |
| .println("\t-normalize\tuse URLNormalizer on urls in CrawlDb and segment (usually not needed)"); |
| System.err |
| .println("\t-filter\tuse URLFilters on urls in CrawlDb and segment"); |
| System.err |
| .println("\t-noAdditions\tonly update already existing URLs, don't add any newly discovered URLs"); |
| |
| return -1; |
| } |
| boolean normalize = getConf().getBoolean(CrawlDbFilter.URL_NORMALIZING, |
| false); |
| boolean filter = getConf().getBoolean(CrawlDbFilter.URL_FILTERING, false); |
| boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, |
| true); |
| boolean force = false; |
| HashSet<Path> dirs = new HashSet<>(); |
| for (int i = 1; i < args.length; i++) { |
| if (args[i].equals("-normalize")) { |
| normalize = true; |
| } else if (args[i].equals("-filter")) { |
| filter = true; |
| } else if (args[i].equals("-force")) { |
| force = true; |
| } else if (args[i].equals("-noAdditions")) { |
| additionsAllowed = false; |
| } else if (args[i].equals("-dir")) { |
| Path dirPath = new Path(args[++i]); |
| FileSystem fs = dirPath.getFileSystem(getConf()); |
| FileStatus[] paths = fs.listStatus(dirPath, |
| HadoopFSUtil.getPassDirectoriesFilter(fs)); |
| dirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths))); |
| } else { |
| dirs.add(new Path(args[i])); |
| } |
| } |
| try { |
| update(new Path(args[0]), dirs.toArray(new Path[dirs.size()]), normalize, |
| filter, additionsAllowed, force); |
| return 0; |
| } catch (Exception e) { |
| LOG.error("CrawlDb update: " + StringUtils.stringifyException(e)); |
| return -1; |
| } |
| } |
| |
| /* |
| * Used for Nutch REST service |
| */ |
| @Override |
| public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception { |
| |
| Map<String, Object> results = new HashMap<>(); |
| |
| boolean normalize = getConf().getBoolean(CrawlDbFilter.URL_NORMALIZING, |
| false); |
| boolean filter = getConf().getBoolean(CrawlDbFilter.URL_FILTERING, false); |
| boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, |
| true); |
| boolean force = false; |
| HashSet<Path> dirs = new HashSet<>(); |
| |
| if (args.containsKey("normalize")) { |
| normalize = true; |
| } |
| if (args.containsKey("filter")) { |
| filter = true; |
| } |
| if (args.containsKey("force")) { |
| force = true; |
| } |
| if (args.containsKey("noAdditions")) { |
| additionsAllowed = false; |
| } |
| |
| Path crawlDb; |
| if(args.containsKey(Nutch.ARG_CRAWLDB)) { |
| Object crawldbPath = args.get(Nutch.ARG_CRAWLDB); |
| if(crawldbPath instanceof Path) { |
| crawlDb = (Path) crawldbPath; |
| } |
| else { |
| crawlDb = new Path(crawldbPath.toString()); |
| } |
| } |
| else { |
| crawlDb = new Path(crawlId+"/crawldb"); |
| } |
| |
| Path segmentsDir; |
| if(args.containsKey(Nutch.ARG_SEGMENTDIR)) { |
| Object segDir = args.get(Nutch.ARG_SEGMENTDIR); |
| if(segDir instanceof Path) { |
| segmentsDir = (Path) segDir; |
| } |
| else { |
| segmentsDir = new Path(segDir.toString()); |
| } |
| FileSystem fs = segmentsDir.getFileSystem(getConf()); |
| FileStatus[] paths = fs.listStatus(segmentsDir, |
| HadoopFSUtil.getPassDirectoriesFilter(fs)); |
| dirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths))); |
| } |
| else if(args.containsKey(Nutch.ARG_SEGMENTS)) { |
| Object segments = args.get(Nutch.ARG_SEGMENTS); |
| ArrayList<String> segmentList = new ArrayList<>(); |
| if(segments instanceof ArrayList) { |
| segmentList = (ArrayList<String>)segments; |
| } |
| else if(segments instanceof Path){ |
| segmentList.add(segments.toString()); |
| } |
| |
| for(String segment: segmentList) { |
| dirs.add(new Path(segment)); |
| } |
| } |
| else { |
| String segmentDir = crawlId+"/segments"; |
| File dir = new File(segmentDir); |
| File[] segmentsList = dir.listFiles(); |
| Arrays.sort(segmentsList, (f1, f2) -> { |
| if(f1.lastModified()>f2.lastModified()) |
| return -1; |
| else |
| return 0; |
| }); |
| dirs.add(new Path(segmentsList[0].getPath())); |
| } |
| try { |
| update(crawlDb, dirs.toArray(new Path[dirs.size()]), normalize, |
| filter, additionsAllowed, force); |
| results.put(Nutch.VAL_RESULT, Integer.toString(0)); |
| return results; |
| } catch (Exception e) { |
| LOG.error("CrawlDb update: " + StringUtils.stringifyException(e)); |
| results.put(Nutch.VAL_RESULT, Integer.toString(-1)); |
| return results; |
| } |
| } |
| } |