blob: cec272b45f18d0e471232744d63de2140b0227d9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nutch.fetcher;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlDatum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A collection of queues that keeps track of the total number of items, and
* provides items eligible for fetching from any queue.
public class FetchItemQueues {
private static final Logger LOG = LoggerFactory
public static final String DEFAULT_ID = "default";
Map<String, FetchItemQueue> queues = new ConcurrentHashMap<>();
private Set<String> queuesMaxExceptions = new HashSet<>();
Iterator<Map.Entry<String, FetchItemQueue>> lastIterator = null;
AtomicInteger totalSize = new AtomicInteger(0);
Cache<Text, Optional<String>> redirectDedupCache = null;
int maxThreads;
long crawlDelay;
long minCrawlDelay;
long timelimit = -1;
int maxExceptionsPerQueue = -1;
long exceptionsPerQueueDelay = -1;
boolean feederAlive = true;
Configuration conf;
public static final String QUEUE_MODE_HOST = "byHost";
public static final String QUEUE_MODE_DOMAIN = "byDomain";
public static final String QUEUE_MODE_IP = "byIP";
String queueMode;
enum QueuingStatus {
public FetchItemQueues(Configuration conf) {
this.conf = conf;
this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
queueMode = checkQueueMode(queueMode);"Using queue mode : " + queueMode);
this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay",
0.0f) * 1000);
this.timelimit = conf.getLong("fetcher.timelimit", -1);
this.maxExceptionsPerQueue = conf.getInt(
"fetcher.max.exceptions.per.queue", -1);
this.exceptionsPerQueueDelay = (long) (conf
.getFloat("fetcher.exceptions.per.queue.delay", .0f) * 1000);
int dedupRedirMaxTime = conf.getInt("fetcher.redirect.dedupcache.seconds",
int dedupRedirMaxSize = conf.getInt("fetcher.redirect.dedupcache.size",
if (dedupRedirMaxTime > 0 && dedupRedirMaxSize > 0) {
redirectDedupCache = CacheBuilder.newBuilder()
.expireAfterWrite(dedupRedirMaxTime, TimeUnit.SECONDS).build();
* Check whether queue mode is valid, fall-back to default mode if not.
* @param queueMode
* queue mode to check
* @return valid queue mode or default
protected static String checkQueueMode(String queueMode) {
// check that the mode is known
if (!queueMode.equals(QUEUE_MODE_IP)
&& !queueMode.equals(QUEUE_MODE_DOMAIN)
&& !queueMode.equals(QUEUE_MODE_HOST)) {
LOG.error("Unknown partition mode : {} - forcing to byHost", queueMode);
queueMode = QUEUE_MODE_HOST;
return queueMode;
public int getTotalSize() {
return totalSize.get();
public int getQueueCount() {
return queues.size();
public int getQueueCountMaxExceptions() {
return queuesMaxExceptions.size();
public QueuingStatus addFetchItem(Text url, CrawlDatum datum) {
FetchItem it = FetchItem.create(url, datum, queueMode);
if (it != null) {
return addFetchItem(it);
return QueuingStatus.ERROR_CREATE_FETCH_ITEM;
public synchronized QueuingStatus addFetchItem(FetchItem it) {
if (maxExceptionsPerQueue != -1
&& queuesMaxExceptions.contains(it.queueID)) {
FetchItemQueue fiq = getFetchItemQueue(it.queueID);
return QueuingStatus.SUCCESSFULLY_QUEUED;
public void finishFetchItem(FetchItem it) {
finishFetchItem(it, false);
public void finishFetchItem(FetchItem it, boolean asap) {
FetchItemQueue fiq = queues.get(it.queueID);
if (fiq == null) {
LOG.warn("Attempting to finish item from unknown queue: " + it);
fiq.finishFetchItem(it, asap);
public synchronized FetchItemQueue getFetchItemQueue(String id) {
FetchItemQueue fiq = queues.get(id);
if (fiq == null) {
// initialize queue
fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
queues.put(id, fiq);
return fiq;
public synchronized FetchItem getFetchItem() {
Iterator<Map.Entry<String, FetchItemQueue>> it = lastIterator;
if (it == null || !it.hasNext()) {
it = queues.entrySet().iterator();
while (it.hasNext()) {
FetchItemQueue fiq =;
// reap empty queues which do not hold state required to ensure politeness
if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
if (!feederAlive) {
// no more fetch items added
} else if ((maxExceptionsPerQueue > -1 || exceptionsPerQueueDelay > 0)
&& fiq.exceptionCounter.get() > 0) {
// keep queue because the exceptions counter is bound to it
// and is required to skip or delay items on this queue
} else if (fiq.nextFetchTime.get() > System.currentTimeMillis()) {
// keep queue to have it blocked in case new fetch items of this queue
// are added by the QueueFeeder
} else {
// empty queue without state
FetchItem fit = fiq.getFetchItem();
if (fit != null) {
lastIterator = it;
return fit;
lastIterator = null;
return null;
* @return true if the fetcher timelimit is defined and has been exceeded
* ({@code fetcher.timelimit.mins} minutes after fetching started)
public boolean timelimitExceeded() {
return timelimit != -1 && System.currentTimeMillis() >= timelimit;
// called only once the feeder has stopped
public synchronized int checkTimelimit() {
int count = 0;
if (timelimitExceeded()) {
// emptying the queues
count = emptyQueues();
// there might also be a case where totalsize !=0 but number of queues
// == 0
// in which case we simply force it to 0 to avoid blocking
if (totalSize.get() != 0 && queues.size() == 0)
return count;
// empties the queues (used by timebomb and throughput threshold)
public synchronized int emptyQueues() {
int count = 0;
for (String id : queues.keySet()) {
FetchItemQueue fiq = queues.get(id);
if (fiq.getQueueSize() == 0)
continue;"* queue: {} >> dropping!", id);
int deleted = fiq.emptyQueue();
count += deleted;
return count;
* Increment the exception counter of a queue in case of an exception e.g.
* timeout; when higher than a given threshold simply empty the queue.
* The next fetch is delayed if specified by the param {@code delay} or
* configured by the property {@code fetcher.exceptions.per.queue.delay}.
* @param queueid
* a queue identifier to locate and check
* @param maxExceptions
* custom-defined number of max. exceptions - if negative the value
* of the property {@code fetcher.max.exceptions.per.queue} is used.
* @param delay
* a custom-defined time span in milliseconds to delay the next fetch
* in addition to the delay defined for the given queue. If a
* negative value is passed the delay is chosen by
* {@code fetcher.exceptions.per.queue.delay}
* @return number of purged items
public synchronized int checkExceptionThreshold(String queueid,
int maxExceptions, long delay) {
FetchItemQueue fiq = queues.get(queueid);
if (fiq == null) {
return 0;
int excCount = fiq.incrementExceptionCounter();
if (delay > 0) {
fiq.nextFetchTime.getAndAdd(delay);"* queue: {} >> delayed next fetch by {} ms", queueid, delay);
} else if (exceptionsPerQueueDelay > 0) {
* Delay the next fetch by a time span growing exponentially with the
* number of observed exceptions. This dynamic delay is added to the
* constant delay. In order to avoid overflows, the exponential backoff is
* capped at 2**31
long exceptionDelay = exceptionsPerQueueDelay;
if (excCount > 1) {
// double the initial delay with every observed exception
exceptionDelay *= 2L << Math.min((excCount - 2), 31);
"* queue: {} >> delayed next fetch by {} ms after {} exceptions in queue",
queueid, exceptionDelay, excCount);
if (maxExceptions != -1 && excCount >= maxExceptions) {
// too many exceptions for items in this queue - purge it
int deleted = fiq.emptyQueue();
if (deleted > 0) {
"* queue: {} >> removed {} URLs from queue because {} exceptions occurred",
queueid, deleted, excCount);
if (feederAlive) {"* queue: {} >> blocked after {} exceptions", queueid,
// keep queue IDs to ensure that these queues aren't created and filled
// again, see addFetchItem(FetchItem)
return deleted;
return 0;
* Increment the exception counter of a queue in case of an exception e.g.
* timeout; when higher than a given threshold simply empty the queue.
* @see #checkExceptionThreshold(String, int, long)
* @param queueid
* queue identifier to locate and check
* @return number of purged items
public int checkExceptionThreshold(String queueid) {
return checkExceptionThreshold(queueid, this.maxExceptionsPerQueue, -1);
* @param redirUrl
* redirect target
* @return true if redirects are deduplicated and redirUrl has been queued
* recently
public boolean redirectIsQueuedRecently(Text redirUrl) {
if (redirectDedupCache != null) {
if (redirectDedupCache.getIfPresent(redirUrl) != null) {
return true;
redirectDedupCache.put(redirUrl, Optional.absent());
return false;
public synchronized void dump() {
for (String id : queues.keySet()) {
FetchItemQueue fiq = queues.get(id);
if (fiq.getQueueSize() == 0)
continue;"* queue: " + id);