blob: bce15b13382a5a682a8a65c812e77af83da8a3be [file] [log] [blame]
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.nutch.fetcher;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.util.Utf8;
import org.apache.gora.mapreduce.GoraReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.crawl.URLWebPage;
import org.apache.nutch.host.HostDb;
import org.apache.nutch.net.URLFilterException;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.parse.ParseUtil;
import org.apache.nutch.parse.ParserJob;
import org.apache.nutch.protocol.Content;
import org.apache.nutch.protocol.Protocol;
import org.apache.nutch.protocol.ProtocolFactory;
import org.apache.nutch.protocol.ProtocolOutput;
import org.apache.nutch.protocol.ProtocolStatusCodes;
import org.apache.nutch.protocol.ProtocolStatusUtils;
import org.apache.nutch.protocol.RobotRules;
import org.apache.nutch.storage.Host;
import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.ProtocolStatus;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.URLUtil;
import org.slf4j.Logger;
public class FetcherReducer
extends GoraReducer<IntWritable, FetchEntry, String, WebPage> {
public static final Logger LOG = FetcherJob.LOG;
private final AtomicInteger activeThreads = new AtomicInteger(0);
private final AtomicInteger spinWaiting = new AtomicInteger(0);
private final long start = System.currentTimeMillis(); // start time of fetcher run
private final AtomicLong lastRequestStart = new AtomicLong(start);
private final AtomicLong bytes = new AtomicLong(0); // total bytes fetched
private final AtomicInteger pages = new AtomicInteger(0); // total pages fetched
private final AtomicInteger errors = new AtomicInteger(0); // total pages errored
private QueueFeeder feeder;
private final List<FetcherThread> fetcherThreads = new ArrayList<FetcherThread>();
private FetchItemQueues fetchQueues;
private boolean storingContent;
private boolean parse;
private ParseUtil parseUtil;
private boolean skipTruncated;
/**
* This class described the item to be fetched.
*/
private static class FetchItem {
WebPage page;
String queueID;
String url;
URL u;
public FetchItem(String url, WebPage page, URL u, String queueID) {
this.page = page;
this.url = url;
this.u = u;
this.queueID = queueID;
}
/** Create an item. Queue id will be created based on <code>queueMode</code>
* argument, either as a protocol + hostname pair, protocol + IP
* address pair or protocol+domain pair.
*/
public static FetchItem create(String url, WebPage page, String queueMode) {
String queueID;
URL u = null;
try {
u = new URL(url);
} catch (final Exception e) {
LOG.warn("Cannot parse url: " + url, e);
return null;
}
final String proto = u.getProtocol().toLowerCase();
String host;
if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) {
try {
final InetAddress addr = InetAddress.getByName(u.getHost());
host = addr.getHostAddress();
} catch (final UnknownHostException e) {
// unable to resolve it, so don't fall back to host name
LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
return null;
}
}
else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)){
host = URLUtil.getDomainName(u);
if (host == null) {
LOG.warn("Unknown domain for url: " + url + ", using URL string as key");
host=u.toExternalForm();
}
}
else {
host = u.getHost();
if (host == null) {
LOG.warn("Unknown host for url: " + url + ", using URL string as key");
host=u.toExternalForm();
}
}
queueID = proto + "://" + host.toLowerCase();
return new FetchItem(url, page, u, queueID);
}
@Override
public String toString() {
return "FetchItem [queueID=" + queueID + ", url=" + url + ", u=" + u
+ ", page=" + page + "]";
}
}
/**
* This class handles FetchItems which come from the same host ID (be it
* a proto/hostname or proto/IP pair). It also keeps track of requests in
* progress and elapsed time between requests.
*/
private static class FetchItemQueue {
List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
AtomicLong nextFetchTime = new AtomicLong();
long crawlDelay;
long minCrawlDelay;
int maxThreads;
public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
this.maxThreads = maxThreads;
this.crawlDelay = crawlDelay;
this.minCrawlDelay = minCrawlDelay;
// ready to start
setEndTime(System.currentTimeMillis() - crawlDelay);
}
public int getQueueSize() {
return queue.size();
}
public int getInProgressSize() {
return inProgress.size();
}
public void finishFetchItem(FetchItem it, boolean asap) {
if (it != null) {
inProgress.remove(it);
setEndTime(System.currentTimeMillis(), asap);
}
}
public void addFetchItem(FetchItem it) {
if (it == null) return;
queue.add(it);
}
@SuppressWarnings("unused")
public void addInProgressFetchItem(FetchItem it) {
if (it == null) return;
inProgress.add(it);
}
public FetchItem getFetchItem() {
if (inProgress.size() >= maxThreads) return null;
final long now = System.currentTimeMillis();
if (nextFetchTime.get() > now) return null;
FetchItem it = null;
if (queue.size() == 0) return null;
try {
it = queue.remove(0);
inProgress.add(it);
} catch (final Exception e) {
LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
}
return it;
}
public synchronized void dump() {
LOG.info(" maxThreads = " + maxThreads);
LOG.info(" inProgress = " + inProgress.size());
LOG.info(" crawlDelay = " + crawlDelay);
LOG.info(" minCrawlDelay = " + minCrawlDelay);
LOG.info(" nextFetchTime = " + nextFetchTime.get());
LOG.info(" now = " + System.currentTimeMillis());
for (int i = 0; i < queue.size(); i++) {
final FetchItem it = queue.get(i);
LOG.info(" " + i + ". " + it.url);
}
}
private void setEndTime(long endTime) {
setEndTime(endTime, false);
}
private void setEndTime(long endTime, boolean asap) {
if (!asap)
nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
else
nextFetchTime.set(endTime);
}
public synchronized int emptyQueue() {
int presize = queue.size();
queue.clear();
return presize;
}
}
/**
* Convenience class - a collection of queues that keeps track of the total
* number of items, and provides items eligible for fetching from any queue.
*/
private static class FetchItemQueues {
@SuppressWarnings("unused")
public static final String DEFAULT_ID = "default";
Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
AtomicInteger totalSize = new AtomicInteger(0);
int maxThreads;
String queueMode;
long crawlDelay;
long minCrawlDelay;
Configuration conf;
long timelimit = -1;
boolean useHostSettings = false;
HostDb hostDb = null;
public static final String QUEUE_MODE_HOST = "byHost";
public static final String QUEUE_MODE_DOMAIN = "byDomain";
public static final String QUEUE_MODE_IP = "byIP";
public FetchItemQueues(Configuration conf) throws IOException {
this.conf = conf;
this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
// check that the mode is known
if (!queueMode.equals(QUEUE_MODE_IP) && !queueMode.equals(QUEUE_MODE_DOMAIN)
&& !queueMode.equals(QUEUE_MODE_HOST)) {
LOG.error("Unknown partition mode : " + queueMode + " - forcing to byHost");
queueMode = QUEUE_MODE_HOST;
}
LOG.info("Using queue mode : "+queueMode);
// Optionally enable host specific queue behavior
if (queueMode.equals(QUEUE_MODE_HOST)) {
useHostSettings = conf.getBoolean("fetcher.queue.use.host.settings", false);
if (useHostSettings) {
LOG.info("Host specific queue settings enabled.");
// Initialize the HostDb if we need it.
hostDb = new HostDb(conf);
}
}
this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
this.timelimit = conf.getLong("fetcher.timelimit", -1);
}
public int getTotalSize() {
return totalSize.get();
}
public int getQueueCount() {
return queues.size();
}
public void addFetchItem(String url, WebPage page) {
final FetchItem it = FetchItem.create(url, page, queueMode);
if (it != null) addFetchItem(it);
}
public synchronized void addFetchItem(FetchItem it) {
final FetchItemQueue fiq = getFetchItemQueue(it.queueID);
fiq.addFetchItem(it);
totalSize.incrementAndGet();
}
public void finishFetchItem(FetchItem it) {
finishFetchItem(it, false);
}
public void finishFetchItem(FetchItem it, boolean asap) {
final FetchItemQueue fiq = queues.get(it.queueID);
if (fiq == null) {
LOG.warn("Attempting to finish item from unknown queue: " + it);
return;
}
fiq.finishFetchItem(it, asap);
}
public synchronized FetchItemQueue getFetchItemQueue(String id) {
FetchItemQueue fiq = queues.get(id);
if (fiq == null) {
// Create a new queue
if (useHostSettings) {
// Use host specific queue settings (if defined in the host table)
try {
String hostname = id.substring(id.indexOf("://")+3);
Host host = hostDb.getByHostName(hostname);
if (host != null) {
fiq = new FetchItemQueue(conf,
host.getInt("q_mt", maxThreads),
host.getLong("q_cd", crawlDelay),
host.getLong("q_mcd", minCrawlDelay));
}
} catch (IOException e) {
LOG.error("Error while trying to access host settings", e);
}
}
if (fiq == null) {
// Use queue defaults
fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
}
queues.put(id, fiq);
}
return fiq;
}
public synchronized FetchItem getFetchItem() {
final Iterator<Map.Entry<String, FetchItemQueue>> it =
queues.entrySet().iterator();
while (it.hasNext()) {
final FetchItemQueue fiq = it.next().getValue();
// reap empty queues
if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
it.remove();
continue;
}
final FetchItem fit = fiq.getFetchItem();
if (fit != null) {
totalSize.decrementAndGet();
return fit;
}
}
return null;
}
public synchronized int checkTimelimit() {
if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
return emptyQueues();
}
return 0;
}
public synchronized void dump() {
for (final String id : queues.keySet()) {
final FetchItemQueue fiq = queues.get(id);
if (fiq.getQueueSize() == 0) continue;
LOG.info("* queue: " + id);
fiq.dump();
}
}
// empties the queues (used by timebomb and throughput threshold)
public synchronized int emptyQueues() {
int count = 0;
// emptying the queues
for (String id : queues.keySet()) {
FetchItemQueue fiq = queues.get(id);
if (fiq.getQueueSize() == 0) continue;
LOG.info("* queue: " + id + " >> dropping! ");
int deleted = fiq.emptyQueue();
for (int i = 0; i < deleted; i++) {
totalSize.decrementAndGet();
}
count += deleted;
}
// there might also be a case where totalsize !=0 but number of queues
// == 0
// in which case we simply force it to 0 to avoid blocking
if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
return count;
}
}
/**
* This class picks items from queues and fetches the pages.
*/
private class FetcherThread extends Thread {
private final URLFilters urlFilters;
private final URLNormalizers normalizers;
private final ProtocolFactory protocolFactory;
private final long maxCrawlDelay;
@SuppressWarnings("unused")
private final boolean byIP;
private final int maxRedirect;
private String reprUrl;
private boolean redirecting;
private int redirectCount;
private final Context context;
public FetcherThread(Context context, int num) {
this.setDaemon(true); // don't hang JVM on exit
this.setName("FetcherThread" + num); // use an informative name
this.context = context;
Configuration conf = context.getConfiguration();
this.urlFilters = new URLFilters(conf);
this.protocolFactory = new ProtocolFactory(conf);
this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
// backward-compatible default setting
this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
this.maxRedirect = conf.getInt("http.redirect.max", 3);
}
@Override
public void run() {
activeThreads.incrementAndGet(); // count threads
FetchItem fit = null;
try {
while (true) {
fit = fetchQueues.getFetchItem();
if (fit == null) {
if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + " fetchQueues.getFetchItem() was null, spin-waiting ...");
}
// spin-wait.
spinWaiting.incrementAndGet();
try {
Thread.sleep(500);
} catch (final Exception e) {}
spinWaiting.decrementAndGet();
continue;
} else {
// all done, finish this thread
return;
}
}
lastRequestStart.set(System.currentTimeMillis());
if (!fit.page.isReadable(WebPage.Field.REPR_URL.getIndex())) {
reprUrl = fit.url;
} else {
reprUrl = TableUtil.toString(fit.page.getReprUrl());
}
try {
LOG.info("fetching " + fit.url);
// fetch the page
redirecting = false;
redirectCount = 0;
do {
if (LOG.isDebugEnabled()) {
LOG.debug("redirectCount=" + redirectCount);
}
redirecting = false;
final Protocol protocol = this.protocolFactory.getProtocol(fit.url);
final RobotRules rules = protocol.getRobotRules(fit.url, fit.page);
if (!rules.isAllowed(fit.u)) {
// unblock
fetchQueues.finishFetchItem(fit, true);
if (LOG.isDebugEnabled()) {
LOG.debug("Denied by robots.txt: " + fit.url);
}
output(fit, null, ProtocolStatusUtils.STATUS_ROBOTS_DENIED,
CrawlStatus.STATUS_GONE);
continue;
}
if (rules.getCrawlDelay() > 0) {
if (rules.getCrawlDelay() > maxCrawlDelay) {
// unblock
fetchQueues.finishFetchItem(fit, true);
LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
output(fit, null, ProtocolStatusUtils.STATUS_ROBOTS_DENIED, CrawlStatus.STATUS_GONE);
continue;
} else {
final FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
fiq.crawlDelay = rules.getCrawlDelay();
}
}
final ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.page);
final ProtocolStatus status = output.getStatus();
final Content content = output.getContent();
// unblock queue
fetchQueues.finishFetchItem(fit);
context.getCounter("FetcherStatus", ProtocolStatusUtils.getName(status.getCode())).increment(1);
int length = 0;
if (content!=null && content.getContent()!=null) length= content.getContent().length;
updateStatus(length);
switch(status.getCode()) {
case ProtocolStatusCodes.WOULDBLOCK:
// retry ?
fetchQueues.addFetchItem(fit);
break;
case ProtocolStatusCodes.SUCCESS: // got a page
output(fit, content, status, CrawlStatus.STATUS_FETCHED);
break;
case ProtocolStatusCodes.MOVED: // redirect
case ProtocolStatusCodes.TEMP_MOVED:
byte code;
boolean temp;
if (status.getCode() == ProtocolStatusCodes.MOVED) {
code = CrawlStatus.STATUS_REDIR_PERM;
temp = false;
} else {
code = CrawlStatus.STATUS_REDIR_TEMP;
temp = true;
}
output(fit, content, status, code);
final String newUrl = ProtocolStatusUtils.getMessage(status);
handleRedirect(fit.url, newUrl, temp, FetcherJob.PROTOCOL_REDIR);
redirecting = false;
break;
case ProtocolStatusCodes.EXCEPTION:
logError(fit.url, ProtocolStatusUtils.getMessage(status));
/* FALLTHROUGH */
case ProtocolStatusCodes.RETRY: // retry
case ProtocolStatusCodes.BLOCKED:
output(fit, null, status, CrawlStatus.STATUS_RETRY);
break;
case ProtocolStatusCodes.GONE: // gone
case ProtocolStatusCodes.NOTFOUND:
case ProtocolStatusCodes.ACCESS_DENIED:
case ProtocolStatusCodes.ROBOTS_DENIED:
output(fit, null, status, CrawlStatus.STATUS_GONE);
break;
case ProtocolStatusCodes.NOTMODIFIED:
output(fit, null, status, CrawlStatus.STATUS_NOTMODIFIED);
break;
default:
if (LOG.isWarnEnabled()) {
LOG.warn("Unknown ProtocolStatus: " + status.getCode());
}
output(fit, null, status, CrawlStatus.STATUS_RETRY);
}
if (redirecting && redirectCount > maxRedirect) {
fetchQueues.finishFetchItem(fit);
LOG.info(" - redirect count exceeded " + fit.url);
output(fit, null, ProtocolStatusUtils.STATUS_REDIR_EXCEEDED,
CrawlStatus.STATUS_GONE);
}
} while (redirecting && (redirectCount <= maxRedirect));
} catch (final Throwable t) { // unexpected exception
// unblock
fetchQueues.finishFetchItem(fit);
LOG.error("Unexpected error for " + fit.url, t);
output(fit, null, ProtocolStatusUtils.STATUS_FAILED,
CrawlStatus.STATUS_RETRY);
}
}
} catch (final Throwable e) {
LOG.error("fetcher throwable caught", e);
} finally {
if (fit != null) fetchQueues.finishFetchItem(fit);
activeThreads.decrementAndGet(); // count threads
LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
}
}
private void handleRedirect(String url, String newUrl,
boolean temp, String redirType)
throws URLFilterException, IOException, InterruptedException {
newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
newUrl = urlFilters.filter(newUrl);
if (newUrl == null || newUrl.equals(url)) {
return;
}
reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
final String reversedNewUrl = TableUtil.reverseUrl(newUrl);
WebPage newWebPage = new WebPage();
if (!reprUrl.equals(url)) {
newWebPage.setReprUrl(new Utf8(reprUrl));
}
newWebPage.putToMetadata(FetcherJob.REDIRECT_DISCOVERED, TableUtil.YES_VAL);
context.write(reversedNewUrl, newWebPage);
if (LOG.isDebugEnabled()) {
LOG.debug(" - " + redirType + " redirect to " +
reprUrl + " (fetching later)");
}
}
private void updateStatus(int bytesInPage) throws IOException {
pages.incrementAndGet();
bytes.addAndGet(bytesInPage);
}
private void output(FetchItem fit, Content content,
ProtocolStatus pstatus, byte status)
throws IOException, InterruptedException {
fit.page.setStatus(status);
final long prevFetchTime = fit.page.getFetchTime();
fit.page.setPrevFetchTime(prevFetchTime);
fit.page.setFetchTime(System.currentTimeMillis());
if (pstatus != null) {
fit.page.setProtocolStatus(pstatus);
}
if (content != null) {
fit.page.setContent(ByteBuffer.wrap(content.getContent()));
fit.page.setContentType(new Utf8(content.getContentType()));
fit.page.setBaseUrl(new Utf8(content.getBaseUrl()));
}
Mark.FETCH_MARK.putMark(fit.page, Mark.GENERATE_MARK.checkMark(fit.page));
String key = TableUtil.reverseUrl(fit.url);
if (parse) {
if (!skipTruncated || (skipTruncated && !ParserJob.isTruncated(fit.url, fit.page))) {
URLWebPage redirectedPage = parseUtil.process(key, fit.page);
if (redirectedPage != null) {
context.write(TableUtil.reverseUrl(redirectedPage.getUrl()),
redirectedPage.getDatum());
}
}
}
//remove content if storingContent is false. Content is added to fit.page above
//for ParseUtil be able to parse it.
if(content != null && !storingContent){
fit.page.setContent(ByteBuffer.wrap(new byte[0]));
}
context.write(key, fit.page);
}
private void logError(String url, String message) {
LOG.info("fetch of " + url + " failed with: " + message);
errors.incrementAndGet();
}
}
/**
* This class feeds the queues with input items, and re-fills them as
* items are consumed by FetcherThread-s.
*/
private static class QueueFeeder extends Thread {
private final Context context;
private final FetchItemQueues queues;
private final int size;
private Iterator<FetchEntry> currentIter;
boolean hasMore;
private long timelimit = -1;
public QueueFeeder(Context context,
FetchItemQueues queues, int size)
throws IOException, InterruptedException {
this.context = context;
this.queues = queues;
this.size = size;
this.setDaemon(true);
this.setName("QueueFeeder");
hasMore = context.nextKey();
if (hasMore) {
currentIter = context.getValues().iterator();
}
// the value of the time limit is either -1 or the time where it should finish
timelimit = context.getConfiguration().getLong("fetcher.timelimit", -1);
}
@Override
public void run() {
int cnt = 0;
int timelimitcount = 0;
try {
while (hasMore) {
if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
// enough .. lets' simply
// read all the entries from the input without processing them
while (currentIter.hasNext()) {
currentIter.next();
timelimitcount++;
}
hasMore = context.nextKey();
if (hasMore) {
currentIter = context.getValues().iterator();
}
continue;
}
int feed = size - queues.getTotalSize();
if (feed <= 0) {
// queues are full - spin-wait until they have some free space
try {
Thread.sleep(1000);
} catch (final Exception e) {};
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("-feeding " + feed + " input urls ...");
}
while (feed > 0 && currentIter.hasNext()) {
FetchEntry entry = currentIter.next();
final String url =
TableUtil.unreverseUrl(entry.getKey());
queues.addFetchItem(url, entry.getWebPage());
feed--;
cnt++;
}
if (currentIter.hasNext()) {
continue; // finish items in current list before reading next key
}
hasMore = context.nextKey();
if (hasMore) {
currentIter = context.getValues().iterator();
}
}
} catch (Exception e) {
LOG.error("QueueFeeder error reading input, record " + cnt, e);
return;
}
LOG.info("QueueFeeder finished: total " + cnt + " records. Hit by time limit :"
+ timelimitcount);
context.getCounter("FetcherStatus","HitByTimeLimit-QueueFeeder").increment(timelimitcount);
}
}
private void reportAndLogStatus(Context context, float actualPages,
int actualBytes, int totalSize) throws IOException {
StringBuilder status = new StringBuilder();
long elapsed = (System.currentTimeMillis() - start)/1000;
status.append(spinWaiting).append("/").append(activeThreads).append(" spinwaiting/active, ");
status.append(pages).append(" pages, ").append(errors).append(" errors, ");
status.append(Math.round((((float)pages.get())*10)/elapsed)/10.0).append(" ");
status.append(Math.round(((float)actualPages)*10)/10.0).append(" pages/s, ");
status.append(Math.round((((float)bytes.get())*8)/1024)/elapsed).append(" ");
status.append(Math.round(((float)actualBytes)*8)/1024).append(" kb/s, ");
status.append(totalSize).append(" URLs in ");
status.append(this.fetchQueues.getQueueCount()).append(" queues");
String toString = status.toString();
context.setStatus(toString);
LOG.info(toString);
}
@Override
public void run(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.fetchQueues = new FetchItemQueues(conf);
int threadCount = conf.getInt("fetcher.threads.fetch", 10);
parse = conf.getBoolean(FetcherJob.PARSE_KEY, false);
storingContent=conf.getBoolean("fetcher.store.content", true);
if (parse) {
skipTruncated=conf.getBoolean(ParserJob.SKIP_TRUNCATED, true);
parseUtil = new ParseUtil(conf);
}
LOG.info("Fetcher: threads: " + threadCount);
int maxFeedPerThread = conf.getInt("fetcher.queue.depth.multiplier", 50);
feeder = new QueueFeeder(context, fetchQueues, threadCount * maxFeedPerThread);
feeder.start();
for (int i = 0; i < threadCount; i++) { // spawn threads
FetcherThread ft = new FetcherThread(context, i);
fetcherThreads.add(ft);
ft.start();
}
// select a timeout that avoids a task timeout
final long timeout = conf.getInt("mapred.task.timeout", 10*60*1000)/2;
// Used for threshold check, holds pages and bytes processed in the last sec
float pagesLastSec;
int bytesLastSec;
int throughputThresholdCurrentSequence = 0;
int throughputThresholdPages = conf.getInt("fetcher.throughput.threshold.pages", -1);
if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); }
int throughputThresholdSequence = conf.getInt("fetcher.throughput.threshold.sequence", 5);
if (LOG.isInfoEnabled()) {
LOG.info("Fetcher: throughput threshold sequence: " + throughputThresholdSequence);
}
long throughputThresholdTimeLimit = conf.getLong("fetcher.throughput.threshold.check.after", -1);
do { // wait for threads to exit
pagesLastSec = pages.get();
bytesLastSec = (int)bytes.get();
final int secondsToSleep = 5;
try {
Thread.sleep(secondsToSleep * 1000);
} catch (InterruptedException e) {}
pagesLastSec = (pages.get() - pagesLastSec)/secondsToSleep;
bytesLastSec = ((int)bytes.get() - bytesLastSec)/secondsToSleep;
int fetchQueuesTotalSize = fetchQueues.getTotalSize();
reportAndLogStatus(context, pagesLastSec, bytesLastSec, fetchQueuesTotalSize);
boolean feederAlive = feeder.isAlive();
if (!feederAlive && fetchQueuesTotalSize < 5) {
fetchQueues.dump();
}
// check timelimit
if (!feederAlive) {
int hitByTimeLimit = fetchQueues.checkTimelimit();
if (hitByTimeLimit != 0) {
context.getCounter("FetcherStatus","HitByTimeLimit-Queues").increment(hitByTimeLimit);
}
}
// if throughput threshold is enabled
if (throughputThresholdTimeLimit < System.currentTimeMillis() && throughputThresholdPages != -1) {
// Check if we're dropping below the threshold
if (pagesLastSec < throughputThresholdPages) {
throughputThresholdCurrentSequence++;
LOG.warn(Integer.toString(throughputThresholdCurrentSequence)
+ ": dropping below configured threshold of " + Integer.toString(throughputThresholdPages)
+ " pages per second");
// Quit if we dropped below threshold too many times
if (throughputThresholdCurrentSequence > throughputThresholdSequence) {
LOG.warn("Dropped below threshold too many times in a row, killing!");
// Disable the threshold checker
throughputThresholdPages = -1;
// Empty the queues cleanly and get number of items that were dropped
int hitByThrougputThreshold = fetchQueues.emptyQueues();
if (hitByThrougputThreshold != 0) context.getCounter("FetcherStatus",
"hitByThrougputThreshold").increment(hitByThrougputThreshold);
}
} else {
throughputThresholdCurrentSequence = 0;
}
}
// some requests seem to hang, despite all intentions
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
LOG.warn("Aborting with " + activeThreads + " hung threads.");
return;
}
} while (activeThreads.get() > 0);
LOG.info("-activeThreads=" + activeThreads);
}
}