blob: 05e4a940c8d2309b78afc07a83a8d0fd2919ab6e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.hostdb;
import java.lang.invoke.MethodHandles;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.util.StringUtils;
import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple runnable that performs DNS lookup for a single host.
*/
public class ResolverThread implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
protected String host = null;
protected HostDatum datum = null;
protected Text hostText = new Text();
protected Context context;
protected int purgeFailedHostsThreshold;
// Cached counter references for performance
private Counter newKnownHostCounter;
private Counter rediscoveredHostCounter;
private Counter existingKnownHostCounter;
private Counter newUnknownHostCounter;
private Counter existingUnknownHostCounter;
private Counter purgedUnknownHostCounter;
private Counter checkedHostsCounter;
private Counter errorsCounter;
private Counter errorsNetworkCounter;
/**
* Overloaded constructor.
* @param host name of the host to lookup
* @param datum accompanying host information
* @param context {@link org.apache.hadoop.mapreduce.Reducer.Context} for
* writing custom counters and output.
* @param purgeFailedHostsThreshold int value which marks the maximum failed
* DNS lookups before a given host is purged from the hostdb
*/
public ResolverThread(String host, HostDatum datum,
Context context, int purgeFailedHostsThreshold) {
hostText.set(host);
this.host = host;
this.datum = datum;
this.context = context;
this.purgeFailedHostsThreshold = purgeFailedHostsThreshold;
// Initialize cached counters for performance
initCounters();
}
/**
* Initialize cached counter references to avoid repeated lookups.
*/
private void initCounters() {
newKnownHostCounter = context.getCounter(
NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_NEW_KNOWN_HOST_TOTAL);
rediscoveredHostCounter = context.getCounter(
NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_REDISCOVERED_HOST_TOTAL);
existingKnownHostCounter = context.getCounter(
NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_EXISTING_KNOWN_HOST_TOTAL);
newUnknownHostCounter = context.getCounter(
NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_NEW_UNKNOWN_HOST_TOTAL);
existingUnknownHostCounter = context.getCounter(
NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_EXISTING_UNKNOWN_HOST_TOTAL);
purgedUnknownHostCounter = context.getCounter(
NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_PURGED_UNKNOWN_HOST_TOTAL);
checkedHostsCounter = context.getCounter(
NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_CHECKED_HOSTS_TOTAL);
errorsCounter = context.getCounter(
NutchMetrics.GROUP_HOSTDB, NutchMetrics.ERROR_TOTAL);
errorsNetworkCounter = context.getCounter(
NutchMetrics.GROUP_HOSTDB, NutchMetrics.ERROR_NETWORK_TOTAL);
}
/**
*
*/
@Override
public void run() {
// Resolve the host and act appropriatly
try {
// Throws an exception if host is not found
@SuppressWarnings("unused")
InetAddress inetAddr = InetAddress.getByName(host);
if (datum.isEmpty()) {
newKnownHostCounter.increment(1);
datum.setLastCheck();
LOG.info("{}: new_known_host {}", host, datum);
} else if (datum.getDnsFailures() > 0) {
rediscoveredHostCounter.increment(1);
datum.setLastCheck();
datum.setDnsFailures(0l);
LOG.info("{}: rediscovered_host {}", host, datum);
} else {
existingKnownHostCounter.increment(1);
datum.setLastCheck();
LOG.info("{}: existing_known_host {}", host, datum);
}
// Write the host datum
context.write(hostText, datum);
} catch (UnknownHostException e) {
try {
// If the counter is empty we'll initialize with date = today and 1 failure
if (datum.isEmpty()) {
datum.setLastCheck();
datum.setDnsFailures(1l);
context.write(hostText, datum);
newUnknownHostCounter.increment(1);
LOG.info("{}: new_unknown_host {}", host, datum);
} else {
datum.setLastCheck();
datum.incDnsFailures();
// Check if this host should be forgotten
if (purgeFailedHostsThreshold == -1 ||
purgeFailedHostsThreshold < datum.getDnsFailures()) {
context.write(hostText, datum);
existingUnknownHostCounter.increment(1);
LOG.info("{}: existing_unknown_host {}", host, datum);
} else {
purgedUnknownHostCounter.increment(1);
LOG.info("{}: purged_unknown_host {}", host, datum);
}
}
// Dynamic counter based on failure count - can't cache
context.getCounter(NutchMetrics.GROUP_HOSTDB, createFailureCounterLabel(datum)).increment(1);
// Common error counters for consistency
errorsCounter.increment(1);
errorsNetworkCounter.increment(1);
} catch (Exception ioe) {
LOG.warn(StringUtils.stringifyException(ioe));
context.getCounter(NutchMetrics.GROUP_HOSTDB,
NutchMetrics.ERROR_TOTAL).increment(1);
context.getCounter(NutchMetrics.GROUP_HOSTDB,
ErrorTracker.getCounterName(ioe)).increment(1);
}
} catch (Exception e) {
LOG.warn(StringUtils.stringifyException(e));
errorsCounter.increment(1);
context.getCounter(NutchMetrics.GROUP_HOSTDB,
ErrorTracker.getCounterName(e)).increment(1);
}
checkedHostsCounter.increment(1);
}
private String createFailureCounterLabel(HostDatum datum) {
// Hadoop will allow no more than 120 distinct counters. If we have a large
// number of distinct failures, we'll exceed the limit, Hadoop will complain,
// the job will fail. Let's limit the amount of possibilities by grouping
// the numFailures in buckets. NUTCH-3096
String label = null;
long n = datum.numFailures();
if (n < 4) {
label = Long.toString(n);
} else if (n > 3 && n < 11) {
label = "4-10";
} else {
label = ">10";
}
return label + "_times_failed";
}
}