blob: 792ee494dafd7702709a42de306c3fcc98e4f9b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.fetcher;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.NutchWritable;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TimingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A queue-based fetcher.
*
* <p>
* This fetcher uses a well-known model of one producer (a QueueFeeder) and many
* consumers (FetcherThread-s).
*
* <p>
* QueueFeeder reads input fetchlists and populates a set of FetchItemQueue-s,
* which hold FetchItem-s that describe the items to be fetched. There are as
* many queues as there are unique hosts, but at any given time the total number
* of fetch items in all queues is less than a fixed number (currently set to a
* multiple of the number of threads).
*
* <p>
* As items are consumed from the queues, the QueueFeeder continues to add new
* input items, so that their total count stays fixed (FetcherThread-s may also
* add new items to the queues e.g. as a results of redirection) - until all
* input items are exhausted, at which point the number of items in the queues
* begins to decrease. When this number reaches 0 fetcher will finish.
*
* <p>
* This fetcher implementation handles per-host blocking itself, instead of
* delegating this work to protocol-specific plugins. Each per-host queue
* handles its own "politeness" settings, such as the maximum number of
* concurrent requests and crawl delay between consecutive requests - and also a
* list of requests in progress, and the time the last request was finished. As
* FetcherThread-s ask for new items to be fetched, queues may return eligible
* items or null if for "politeness" reasons this host's queue is not yet ready.
*
* <p>
* If there are still unfetched items in the queues, but none of the items are
* ready, FetcherThread-s will spin-wait until either some items become
* available, or a timeout is reached (at which point the Fetcher will abort,
* assuming the task is hung).
*
* @author Andrzej Bialecki
*/
public class Fetcher extends NutchTool implements Tool {
public static final int PERM_REFRESH_TIME = 5;
public static final String CONTENT_REDIR = "content";
public static final String PROTOCOL_REDIR = "protocol";
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
public static class InputFormat
extends SequenceFileInputFormat<Text, CrawlDatum> {
/**
* Don't split inputs to keep things polite - a single fetch list must be
* processed in one fetcher task. Do not split a fetch lists and assigning
* the splits to multiple parallel tasks.
*/
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<FileStatus> files = listStatus(job);
List<InputSplit> splits = new ArrayList<>();
for (FileStatus cur : files) {
splits.add(
new FileSplit(cur.getPath(), 0, cur.getLen(), (String[]) null));
}
return splits;
}
}
public Fetcher() {
super(null);
}
public Fetcher(Configuration conf) {
super(conf);
}
public static boolean isParsing(Configuration conf) {
return conf.getBoolean("fetcher.parse", true);
}
public static boolean isStoringContent(Configuration conf) {
return conf.getBoolean("fetcher.store.content", true);
}
public static class FetcherRun extends
Mapper<Text, CrawlDatum, Text, NutchWritable> {
private String segmentName;
private AtomicInteger activeThreads = new AtomicInteger(0);
private AtomicInteger spinWaiting = new AtomicInteger(0);
private long start = System.currentTimeMillis();
private AtomicLong lastRequestStart = new AtomicLong(start);
private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
private AtomicInteger errors = new AtomicInteger(0); // total pages errored
private boolean storingContent;
private boolean parsing;
private AtomicInteger getActiveThreads() {
return activeThreads;
}
private void reportStatus(Context context, FetchItemQueues fetchQueues, int pagesLastSec, int bytesLastSec)
throws IOException {
StringBuilder status = new StringBuilder();
Long elapsed = Long.valueOf((System.currentTimeMillis() - start) / 1000);
float avgPagesSec = (float) pages.get() / elapsed.floatValue();
long avgBytesSec = (bytes.get() / 128l) / elapsed.longValue();
status.append(activeThreads).append(" threads (").append(spinWaiting.get())
.append(" waiting), ");
status.append(fetchQueues.getQueueCount()).append(" queues, ");
status.append(fetchQueues.getTotalSize()).append(" URLs queued, ");
status.append(pages).append(" pages, ").append(errors).append(" errors, ");
status.append(String.format("%.2f", avgPagesSec)).append(" pages/s (");
status.append(pagesLastSec).append(" last sec), ");
status.append(avgBytesSec).append(" kbits/s (")
.append((bytesLastSec / 128)).append(" last sec)");
context.setStatus(status.toString());
}
@Override
public void setup(Mapper<Text, CrawlDatum, Text, NutchWritable>.Context context) {
Configuration conf = context.getConfiguration();
segmentName = conf.get(Nutch.SEGMENT_NAME_KEY);
storingContent = isStoringContent(conf);
parsing = isParsing(conf);
}
@Override
public void run(Context innerContext) throws IOException {
setup(innerContext);
Configuration conf = innerContext.getConfiguration();
LinkedList<FetcherThread> fetcherThreads = new LinkedList<>();
FetchItemQueues fetchQueues = new FetchItemQueues(conf);
QueueFeeder feeder;
int threadCount = conf.getInt("fetcher.threads.fetch", 10);
if (LOG.isInfoEnabled()) {
LOG.info("Fetcher: threads: {}", threadCount);
}
int timeoutDivisor = conf.getInt("fetcher.threads.timeout.divisor", 2);
if (LOG.isInfoEnabled()) {
LOG.info("Fetcher: time-out divisor: {}", timeoutDivisor);
}
int queueDepthMuliplier = conf.getInt(
"fetcher.queue.depth.multiplier", 50);
feeder = new QueueFeeder(innerContext, fetchQueues, threadCount
* queueDepthMuliplier);
// the value of the time limit is either -1 or the time where it should
// finish
long timelimit = conf.getLong("fetcher.timelimit", -1);
if (timelimit != -1)
feeder.setTimeLimit(timelimit);
feeder.start();
for (int i = 0; i < threadCount; i++) { // spawn threads
FetcherThread t = new FetcherThread(conf, getActiveThreads(), fetchQueues,
feeder, spinWaiting, lastRequestStart, innerContext, errors, segmentName,
parsing, storingContent, pages, bytes);
fetcherThreads.add(t);
t.start();
}
// select a timeout that avoids a task timeout
long timeout = conf.getInt("mapreduce.task.timeout", 10 * 60 * 1000)
/ timeoutDivisor;
// Used for threshold check, holds pages and bytes processed in the last
// second
int pagesLastSec;
int bytesLastSec;
int throughputThresholdNumRetries = 0;
int throughputThresholdPages = conf.getInt(
"fetcher.throughput.threshold.pages", -1);
if (LOG.isInfoEnabled()) {
LOG.info("Fetcher: throughput threshold: {}", throughputThresholdPages);
}
int throughputThresholdMaxRetries = conf.getInt(
"fetcher.throughput.threshold.retries", 5);
if (LOG.isInfoEnabled()) {
LOG.info("Fetcher: throughput threshold retries: {}",
throughputThresholdMaxRetries);
}
long throughputThresholdTimeLimit = conf.getLong(
"fetcher.throughput.threshold.check.after", -1);
int targetBandwidth = conf.getInt("fetcher.bandwidth.target", -1) * 1000;
int maxNumThreads = conf.getInt("fetcher.maxNum.threads", threadCount);
if (maxNumThreads < threadCount) {
LOG.info("fetcher.maxNum.threads can't be < than {} : using {} instead",
threadCount, threadCount);
maxNumThreads = threadCount;
}
int bandwidthTargetCheckEveryNSecs = conf.getInt(
"fetcher.bandwidth.target.check.everyNSecs", 30);
if (bandwidthTargetCheckEveryNSecs < 1) {
LOG.info("fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : using 1 instead");
bandwidthTargetCheckEveryNSecs = 1;
}
int maxThreadsPerQueue = conf.getInt("fetcher.threads.per.queue", 1);
int bandwidthTargetCheckCounter = 0;
long bytesAtLastBWTCheck = 0l;
do { // wait for threads to exit
pagesLastSec = pages.get();
bytesLastSec = (int) bytes.get();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
pagesLastSec = pages.get() - pagesLastSec;
bytesLastSec = (int) bytes.get() - bytesLastSec;
innerContext.getCounter("FetcherStatus", "bytes_downloaded").increment(bytesLastSec);
reportStatus(innerContext, fetchQueues, pagesLastSec, bytesLastSec);
LOG.info("-activeThreads=" + activeThreads + ", spinWaiting="
+ spinWaiting.get() + ", fetchQueues.totalSize="
+ fetchQueues.getTotalSize() + ", fetchQueues.getQueueCount="
+ fetchQueues.getQueueCount());
if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
fetchQueues.dump();
}
// if throughput threshold is enabled
if (throughputThresholdTimeLimit < System.currentTimeMillis()
&& throughputThresholdPages != -1) {
// Check if we're dropping below the threshold
if (pagesLastSec < throughputThresholdPages) {
throughputThresholdNumRetries++;
LOG.warn("{}: dropping below configured threshold of {} pages per second",
Integer.toString(throughputThresholdNumRetries), Integer.toString(throughputThresholdPages));
// Quit if we dropped below threshold too many times
if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
LOG.warn("Dropped below threshold too many times, killing!");
// Disable the threshold checker
throughputThresholdPages = -1;
// Empty the queues cleanly and get number of items that were
// dropped
int hitByThrougputThreshold = fetchQueues.emptyQueues();
if (hitByThrougputThreshold != 0)
innerContext.getCounter("FetcherStatus", "hitByThrougputThreshold").increment(
hitByThrougputThreshold);
}
}
}
// adjust the number of threads if a target bandwidth has been set
if (targetBandwidth > 0) {
if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs)
bandwidthTargetCheckCounter++;
else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)
/ bandwidthTargetCheckEveryNSecs;
bytesAtLastBWTCheck = bytes.get();
bandwidthTargetCheckCounter = 0;
int averageBdwPerThread = 0;
if (activeThreads.get() > 0)
averageBdwPerThread = Math.round(bpsSinceLastCheck
/ activeThreads.get());
LOG.info("averageBdwPerThread : {} kbps", (averageBdwPerThread / 1000));
if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0) {
// check whether it is worth doing e.g. more queues than threads
if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads
.get()) {
long remainingBdw = targetBandwidth - bpsSinceLastCheck;
int additionalThreads = Math.round(remainingBdw
/ averageBdwPerThread);
int availableThreads = maxNumThreads - activeThreads.get();
// determine the number of available threads (min between
// availableThreads and additionalThreads)
additionalThreads = (availableThreads < additionalThreads ? availableThreads
: additionalThreads);
LOG.info("Has space for more threads ({} vs {} kbps) \t=> adding {} new threads",
new Object[]{(bpsSinceLastCheck / 1000), (targetBandwidth / 1000), additionalThreads});
// activate new threads
for (int i = 0; i < additionalThreads; i++) {
FetcherThread thread = new FetcherThread(conf, getActiveThreads(), fetchQueues,
feeder, spinWaiting, lastRequestStart, innerContext, errors, segmentName, parsing,
storingContent, pages, bytes);
fetcherThreads.add(thread);
thread.start();
}
}
} else if (bpsSinceLastCheck > targetBandwidth
&& averageBdwPerThread > 0) {
// if the bandwidth we're using is greater then the expected
// bandwidth, we have to stop some threads
long excessBdw = bpsSinceLastCheck - targetBandwidth;
int excessThreads = Math.round(excessBdw / averageBdwPerThread);
LOG.info("Exceeding target bandwidth ({} vs {} kbps). \t=> excessThreads = {}",
new Object[]{bpsSinceLastCheck / 1000, (targetBandwidth / 1000), excessThreads});
// keep at least one
if (excessThreads >= fetcherThreads.size())
excessThreads = 0;
// de-activates threads
for (int i = 0; i < excessThreads; i++) {
FetcherThread thread = fetcherThreads.removeLast();
thread.setHalted(true);
}
}
}
}
// check timelimit
if (!feeder.isAlive()) {
int hitByTimeLimit = fetchQueues.checkTimelimit();
if (hitByTimeLimit != 0)
innerContext.getCounter("FetcherStatus", "hitByTimeLimit").increment(
hitByTimeLimit);
}
// some requests seem to hang, despite all intentions
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
if (LOG.isWarnEnabled()) {
LOG.warn("Aborting with {} hung threads.", activeThreads);
for (int i = 0; i < fetcherThreads.size(); i++) {
FetcherThread thread = fetcherThreads.get(i);
if (thread.isAlive()) {
LOG.warn("Thread #{} hung while processing {}", i, thread.getReprUrl());
if (LOG.isDebugEnabled()) {
StackTraceElement[] stack = thread.getStackTrace();
StringBuilder sb = new StringBuilder();
sb.append("Stack of thread #").append(i).append(":\n");
for (StackTraceElement s : stack) {
sb.append(s.toString()).append('\n');
}
LOG.debug(sb.toString());
}
}
}
}
return;
}
} while (activeThreads.get() > 0);
LOG.info("-activeThreads={}", activeThreads);
}
}
public void fetch(Path segment, int threads) throws IOException,
InterruptedException, ClassNotFoundException {
checkConfiguration();
long start = System.currentTimeMillis();
if (LOG.isInfoEnabled()) {
LOG.info("Fetcher: starting at {}", TimingUtil.logDateMillis(start));
LOG.info("Fetcher: segment: {}", segment);
}
// set the actual time for the timelimit relative
// to the beginning of the whole job and not of a specific task
// otherwise it keeps trying again if a task fails
long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
if (timelimit != -1) {
timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
LOG.info("Fetcher Timelimit set for : {} ({})", timelimit,
TimingUtil.logDateMillis(timelimit));
getConf().setLong("fetcher.timelimit", timelimit);
}
// Set the time limit after which the throughput threshold feature is
// enabled
timelimit = getConf().getLong("fetcher.throughput.threshold.check.after",
10);
timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
getConf().setLong("fetcher.throughput.threshold.check.after", timelimit);
int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1);
if (maxOutlinkDepth > 0) {
LOG.info("Fetcher: following outlinks up to depth: {}",
Integer.toString(maxOutlinkDepth));
int maxOutlinkDepthNumLinks = getConf().getInt(
"fetcher.follow.outlinks.num.links", 4);
int outlinksDepthDivisor = getConf().getInt(
"fetcher.follow.outlinks.depth.divisor", 2);
int totalOutlinksToFollow = 0;
for (int i = 0; i < maxOutlinkDepth; i++) {
totalOutlinksToFollow += (int) Math.floor(outlinksDepthDivisor
/ (i + 1) * maxOutlinkDepthNumLinks);
}
LOG.info("Fetcher: maximum outlinks to follow: {}",
Integer.toString(totalOutlinksToFollow));
}
Job job = NutchJob.getInstance(getConf());
job.setJobName("FetchData");
Configuration conf = job.getConfiguration();
conf.setInt("fetcher.threads.fetch", threads);
conf.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
// for politeness, don't permit parallel execution of a single task
conf.set("mapreduce.map.speculative","false");
FileInputFormat.addInputPath(job, new Path(segment,
CrawlDatum.GENERATE_DIR_NAME));
job.setInputFormatClass(InputFormat.class);
job.setJarByClass(Fetcher.class);
job.setMapperClass(Fetcher.FetcherRun.class);
FileOutputFormat.setOutputPath(job, segment);
job.setOutputFormatClass(FetcherOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NutchWritable.class);
try {
boolean success = job.waitForCompletion(true);
if (!success) {
String message = "Fetcher job did not succeed, job status:"
+ job.getStatus().getState() + ", reason: "
+ job.getStatus().getFailureInfo();
LOG.error(message);
throw new RuntimeException(message);
}
} catch (InterruptedException | ClassNotFoundException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
long end = System.currentTimeMillis();
LOG.info("Fetcher: finished at {}, elapsed: {}",
TimingUtil.logDateMillis(end), TimingUtil.elapsedTime(start, end));
}
/** Run the fetcher. */
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new Fetcher(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
String usage = "Usage: Fetcher <segment> [-threads n]";
if (args.length < 1) {
System.err.println(usage);
return -1;
}
Path segment = new Path(args[0]);
int threads = getConf().getInt("fetcher.threads.fetch", 10);
for (int i = 1; i < args.length; i++) { // parse command line
if (args[i].equals("-threads")) { // found -threads option
threads = Integer.parseInt(args[++i]);
}
}
getConf().setInt("fetcher.threads.fetch", threads);
try {
fetch(segment, threads);
return 0;
} catch (Exception e) {
LOG.error("Fetcher: {}", StringUtils.stringifyException(e));
return -1;
}
}
private void checkConfiguration() {
// ensure that a value has been set for the agent name
String agentName = getConf().get("http.agent.name");
if (agentName == null || agentName.trim().length() == 0) {
String message = "Fetcher: No agents listed in 'http.agent.name'"
+ " property.";
if (LOG.isErrorEnabled()) {
LOG.error(message);
}
throw new IllegalArgumentException(message);
}
}
@Override
public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
Map<String, Object> results = new HashMap<>();
Path segment = null;
if(args.containsKey(Nutch.ARG_SEGMENTS)) {
Object seg = args.get(Nutch.ARG_SEGMENTS);
if(seg instanceof Path) {
segment = (Path) seg;
}
else if(seg instanceof String){
segment = new Path(seg.toString());
}
else if(seg instanceof ArrayList) {
String[] segmentsArray = (String[])seg;
segment = new Path(segmentsArray[0].toString());
if(segmentsArray.length > 1){
LOG.warn("Only the first segment of segments array is used.");
}
}
}
else {
String segmentDir = crawlId+"/segments";
File segmentsDir = new File(segmentDir);
File[] segmentsList = segmentsDir.listFiles();
Arrays.sort(segmentsList, (f1, f2) -> {
if(f1.lastModified()>f2.lastModified())
return -1;
else
return 0;
});
segment = new Path(segmentsList[0].getPath());
}
int threads = getConf().getInt("fetcher.threads.fetch", 10);
// parse command line
if (args.containsKey("threads")) { // found -threads option
threads = Integer.parseInt((String)args.get("threads"));
}
getConf().setInt("fetcher.threads.fetch", threads);
try {
fetch(segment, threads);
results.put(Nutch.VAL_RESULT, Integer.toString(0));
return results;
} catch (Exception e) {
LOG.error("Fetcher: {}", StringUtils.stringifyException(e));
results.put(Nutch.VAL_RESULT, Integer.toString(-1));
return results;
}
}
}