| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nutch.hostdb; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.Date; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.FloatWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapreduce.Reducer; |
| import org.apache.hadoop.util.StringUtils; |
| |
| import org.apache.nutch.crawl.CrawlDatum; |
| import org.apache.nutch.crawl.NutchWritable; |
| |
| import com.tdunning.math.stats.TDigest; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * |
| * |
| */ |
| public class UpdateHostDbReducer |
| extends Reducer<Text, NutchWritable, Text, HostDatum> { |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(MethodHandles.lookup().lookupClass()); |
| protected ResolverThread resolverThread = null; |
| protected Integer numResolverThreads = 10; |
| protected static Integer purgeFailedHostsThreshold = -1; |
| protected static Integer recheckInterval = 86400000; |
| protected static boolean checkFailed = false; |
| protected static boolean checkNew = false; |
| protected static boolean checkKnown = false; |
| protected static boolean force = false; |
| protected static long now = new Date().getTime(); |
| protected static String[] numericFields; |
| protected static String[] stringFields; |
| protected static int[] percentiles; |
| protected static Text[] numericFieldWritables; |
| protected static Text[] stringFieldWritables; |
| |
| protected BlockingQueue<Runnable> queue = new SynchronousQueue<>(); |
| protected ThreadPoolExecutor executor = null; |
| |
| /** |
| * Configures the thread pool and prestarts all resolver threads. |
| */ |
| @Override |
| public void setup(Reducer<Text, NutchWritable, Text, HostDatum>.Context context) { |
| Configuration conf = context.getConfiguration(); |
| purgeFailedHostsThreshold = conf.getInt(UpdateHostDb.HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1); |
| numResolverThreads = conf.getInt(UpdateHostDb.HOSTDB_NUM_RESOLVER_THREADS, 10); |
| recheckInterval = conf.getInt(UpdateHostDb.HOSTDB_RECHECK_INTERVAL, 86400) * 1000; |
| checkFailed = conf.getBoolean(UpdateHostDb.HOSTDB_CHECK_FAILED, false); |
| checkNew = conf.getBoolean(UpdateHostDb.HOSTDB_CHECK_NEW, false); |
| checkKnown = conf.getBoolean(UpdateHostDb.HOSTDB_CHECK_KNOWN, false); |
| force = conf.getBoolean(UpdateHostDb.HOSTDB_FORCE_CHECK, false); |
| numericFields = conf.getStrings(UpdateHostDb.HOSTDB_NUMERIC_FIELDS); |
| stringFields = conf.getStrings(UpdateHostDb.HOSTDB_STRING_FIELDS); |
| percentiles = conf.getInts(UpdateHostDb.HOSTDB_PERCENTILES); |
| |
| // What fields do we need to collect metadata from |
| if (numericFields != null) { |
| numericFieldWritables = new Text[numericFields.length]; |
| for (int i = 0; i < numericFields.length; i++) { |
| numericFieldWritables[i] = new Text(numericFields[i]); |
| } |
| } |
| |
| if (stringFields != null) { |
| stringFieldWritables = new Text[stringFields.length]; |
| for (int i = 0; i < stringFields.length; i++) { |
| stringFieldWritables[i] = new Text(stringFields[i]); |
| } |
| } |
| |
| // Initialize the thread pool with our queue |
| executor = new ThreadPoolExecutor(numResolverThreads, numResolverThreads, |
| 5, TimeUnit.SECONDS, queue); |
| |
| // Run all threads in the pool |
| executor.prestartAllCoreThreads(); |
| } |
| |
| /** |
| * |
| */ |
| @Override |
| public void reduce(Text key, Iterable<NutchWritable> values, |
| Context context) throws IOException, InterruptedException { |
| |
| Map<String,Map<String,Long>> stringCounts = new HashMap<>(); |
| Map<String,Float> maximums = new HashMap<>(); |
| Map<String,Float> sums = new HashMap<>(); // used to calc averages |
| Map<String,Long> counts = new HashMap<>(); // used to calc averages |
| Map<String,Float> minimums = new HashMap<>(); |
| Map<String,TDigest> tdigests = new HashMap<String,TDigest>(); |
| |
| HostDatum hostDatum = new HostDatum(); |
| float score = 0; |
| |
| if (stringFields != null) { |
| for (int i = 0; i < stringFields.length; i++) { |
| stringCounts.put(stringFields[i], new HashMap<>()); |
| } |
| } |
| |
| // Loop through all values until we find a non-empty HostDatum or use |
| // an empty if this is a new host for the host db |
| for (NutchWritable val : values) { |
| final Writable value = val.get(); // unwrap |
| |
| // Count crawl datum status's and collect metadata from fields |
| if (value instanceof CrawlDatum) { |
| CrawlDatum buffer = (CrawlDatum)value; |
| |
| // Set the correct status field |
| switch (buffer.getStatus()) { |
| case CrawlDatum.STATUS_DB_UNFETCHED: |
| hostDatum.setUnfetched(hostDatum.getUnfetched() + 1l); |
| break; |
| |
| case CrawlDatum.STATUS_DB_FETCHED: |
| hostDatum.setFetched(hostDatum.getFetched() + 1l); |
| break; |
| |
| case CrawlDatum.STATUS_DB_GONE: |
| hostDatum.setGone(hostDatum.getGone() + 1l); |
| break; |
| |
| case CrawlDatum.STATUS_DB_REDIR_TEMP: |
| hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1l); |
| break; |
| |
| case CrawlDatum.STATUS_DB_REDIR_PERM: |
| hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1l); |
| break; |
| |
| case CrawlDatum.STATUS_DB_NOTMODIFIED: |
| hostDatum.setNotModified(hostDatum.getNotModified() + 1l); |
| break; |
| } |
| |
| // Record connection failures |
| if (buffer.getRetriesSinceFetch() != 0) { |
| hostDatum.incConnectionFailures(); |
| } |
| |
| // Only gather metadata statistics for proper fetched pages |
| if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { |
| // Deal with the string fields |
| if (stringFields != null) { |
| for (int i = 0; i < stringFields.length; i++) { |
| // Does this field exist? |
| if (buffer.getMetaData().get(stringFieldWritables[i]) != null) { |
| // Get it! |
| String metadataValue = null; |
| try { |
| metadataValue = buffer.getMetaData().get(stringFieldWritables[i]).toString(); |
| } catch (Exception e) { |
| LOG.error("Metadata field " + stringFields[i] + " is probably not a numeric value"); |
| } |
| |
| // Does the value exist? |
| if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) { |
| // Yes, increment it |
| stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1l); |
| } else { |
| // Create it! |
| stringCounts.get(stringFields[i]).put(metadataValue, 1l); |
| } |
| } |
| } |
| } |
| |
| // Deal with the numeric fields |
| if (numericFields != null) { |
| for (int i = 0; i < numericFields.length; i++) { |
| // Does this field exist? |
| if (buffer.getMetaData().get(numericFieldWritables[i]) != null) { |
| try { |
| // Get it! |
| Float metadataValue = Float.parseFloat(buffer.getMetaData().get(numericFieldWritables[i]).toString()); |
| |
| // Does the median value exist? |
| if (tdigests.containsKey(numericFields[i])) { |
| tdigests.get(numericFields[i]).add(metadataValue); |
| } else { |
| // Create it! |
| TDigest tdigest = TDigest.createDigest(100); |
| tdigest.add((double)metadataValue); |
| tdigests.put(numericFields[i], tdigest); |
| } |
| |
| // Does the minimum value exist? |
| if (minimums.containsKey(numericFields[i])) { |
| // Write if this is lower than existing value |
| if (metadataValue < minimums.get(numericFields[i])) { |
| minimums.put(numericFields[i], metadataValue); |
| } |
| } else { |
| // Create it! |
| minimums.put(numericFields[i], metadataValue); |
| } |
| |
| // Does the maximum value exist? |
| if (maximums.containsKey(numericFields[i])) { |
| // Write if this is lower than existing value |
| if (metadataValue > maximums.get(numericFields[i])) { |
| maximums.put(numericFields[i], metadataValue); |
| } |
| } else { |
| // Create it! |
| maximums.put(numericFields[i], metadataValue); |
| } |
| |
| // Sum it up! |
| if (sums.containsKey(numericFields[i])) { |
| // Increment |
| sums.put(numericFields[i], sums.get(numericFields[i]) + metadataValue); |
| counts.put(numericFields[i], counts.get(numericFields[i]) + 1l); |
| } else { |
| // Create it! |
| sums.put(numericFields[i], metadataValue); |
| counts.put(numericFields[i], 1l); |
| } |
| } catch (Exception e) { |
| LOG.error(e.getMessage() + " when processing values for " + key.toString()); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| // |
| else if (value instanceof HostDatum) { |
| HostDatum buffer = (HostDatum)value; |
| |
| // Check homepage URL |
| if (buffer.hasHomepageUrl()) { |
| hostDatum.setHomepageUrl(buffer.getHomepageUrl()); |
| } |
| |
| // Check lastCheck timestamp |
| if (!buffer.isEmpty()) { |
| hostDatum.setLastCheck(buffer.getLastCheck()); |
| } |
| |
| // Check and set DNS failures |
| if (buffer.getDnsFailures() > 0) { |
| hostDatum.setDnsFailures(buffer.getDnsFailures()); |
| } |
| |
| // Check and set connection failures |
| if (buffer.getConnectionFailures() > 0) { |
| hostDatum.setConnectionFailures(buffer.getConnectionFailures()); |
| } |
| |
| // Check metadata |
| if (!buffer.getMetaData().isEmpty()) { |
| hostDatum.setMetaData(buffer.getMetaData()); |
| } |
| |
| // Check and set score (score from Web Graph has precedence) |
| if (buffer.getScore() > 0) { |
| hostDatum.setScore(buffer.getScore()); |
| } |
| } |
| |
| // Check for the score |
| else if (value instanceof FloatWritable) { |
| FloatWritable buffer = (FloatWritable)value; |
| score = buffer.get(); |
| } else { |
| LOG.error("Class {} not handled", value.getClass()); |
| } |
| } |
| |
| // Check if score was set from Web Graph |
| if (score > 0) { |
| hostDatum.setScore(score); |
| } |
| |
| // Set metadata |
| for (Map.Entry<String, Map<String,Long>> entry : stringCounts.entrySet()) { |
| for (Map.Entry<String,Long> subEntry : entry.getValue().entrySet()) { |
| hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new LongWritable(subEntry.getValue())); |
| } |
| } |
| for (Map.Entry<String, Float> entry : maximums.entrySet()) { |
| hostDatum.getMetaData().put(new Text("max." + entry.getKey()), new FloatWritable(entry.getValue())); |
| } |
| for (Map.Entry<String, Float> entry : sums.entrySet()) { |
| hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), new FloatWritable(entry.getValue() / counts.get(entry.getKey()))); |
| } |
| for (Map.Entry<String, TDigest> entry : tdigests.entrySet()) { |
| // Emit all percentiles |
| for (int i = 0; i < percentiles.length; i++) { |
| hostDatum.getMetaData().put(new Text("pct" + Long.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5))); |
| } |
| } |
| for (Map.Entry<String, Float> entry : minimums.entrySet()) { |
| hostDatum.getMetaData().put(new Text("min." + entry.getKey()), new FloatWritable(entry.getValue())); |
| } |
| |
| context.getCounter("UpdateHostDb", "total_hosts").increment(1); |
| |
| // See if this record is to be checked |
| if (shouldCheck(hostDatum)) { |
| // Make an entry |
| resolverThread = new ResolverThread(key.toString(), hostDatum, context, purgeFailedHostsThreshold); |
| |
| // Add the entry to the queue (blocking) |
| try { |
| queue.put(resolverThread); |
| } catch (InterruptedException e) { |
| LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e)); |
| } |
| |
| // Do not progress, the datum will be written in the resolver thread |
| return; |
| } else { |
| context.getCounter("UpdateHostDb", "skipped_not_eligible").increment(1); |
| LOG.info("UpdateHostDb: " + key.toString() + ": skipped_not_eligible"); |
| } |
| |
| // Write the host datum if it wasn't written by the resolver thread |
| context.write(key, hostDatum); |
| } |
| |
| /** |
| * Determines whether a record should be checked. |
| * |
| * @param datum |
| * @return boolean |
| */ |
| protected boolean shouldCheck(HostDatum datum) { |
| // Whether a new record is to be checked |
| if (checkNew && datum.isEmpty()) { |
| return true; |
| } |
| |
| // Whether existing known hosts should be rechecked |
| if (checkKnown && !datum.isEmpty() && datum.getDnsFailures() == 0) { |
| return isEligibleForCheck(datum); |
| } |
| |
| // Whether failed records are forced to be rechecked |
| if (checkFailed && datum.getDnsFailures() > 0) { |
| return isEligibleForCheck(datum); |
| } |
| |
| // It seems this record is not to be checked |
| return false; |
| } |
| |
| /** |
| * Determines whether a record is eligible for recheck. |
| * |
| * @param datum |
| * @return boolean |
| */ |
| protected boolean isEligibleForCheck(HostDatum datum) { |
| // Whether an existing host, known or unknown, if forced to be rechecked |
| if (force || datum.getLastCheck().getTime() + |
| (recheckInterval * datum.getDnsFailures() + 1) > now) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Shut down all running threads and wait for completion. |
| */ |
| @Override |
| public void cleanup(Context context) { |
| LOG.info("UpdateHostDb: feeder finished, waiting for shutdown"); |
| |
| // If we're here all keys have been fed and we can issue a shut down |
| executor.shutdown(); |
| |
| boolean finished = false; |
| |
| // Wait until all resolvers have finished |
| while (!finished) { |
| try { |
| // Wait for the executor to shut down completely |
| if (!executor.isTerminated()) { |
| LOG.info("UpdateHostDb: resolver threads waiting: " + Integer.toString(executor.getPoolSize())); |
| Thread.sleep(1000); |
| } else { |
| // All is well, get out |
| finished = true; |
| } |
| } catch (InterruptedException e) { |
| // Huh? |
| LOG.warn(StringUtils.stringifyException(e)); |
| } |
| } |
| } |
| } |