| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nutch.fetcher; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.nutch.crawl.CrawlDatum; |
| import org.apache.nutch.crawl.NutchWritable; |
| import org.apache.nutch.crawl.SignatureFactory; |
| import org.apache.nutch.fetcher.Fetcher.FetcherRun; |
| import org.apache.nutch.fetcher.FetcherThreadEvent.PublishEventType; |
| import org.apache.nutch.metadata.Metadata; |
| import org.apache.nutch.metadata.Nutch; |
| import org.apache.nutch.net.URLExemptionFilters; |
| import org.apache.nutch.net.URLFilterException; |
| import org.apache.nutch.net.URLFilters; |
| import org.apache.nutch.net.URLNormalizers; |
| import org.apache.nutch.net.protocols.ProtocolLogUtil; |
| import org.apache.nutch.parse.Outlink; |
| import org.apache.nutch.parse.Parse; |
| import org.apache.nutch.parse.ParseData; |
| import org.apache.nutch.parse.ParseImpl; |
| import org.apache.nutch.parse.ParseOutputFormat; |
| import org.apache.nutch.parse.ParseResult; |
| import org.apache.nutch.parse.ParseSegment; |
| import org.apache.nutch.parse.ParseStatus; |
| import org.apache.nutch.parse.ParseText; |
| import org.apache.nutch.parse.ParseUtil; |
| import org.apache.nutch.protocol.Content; |
| import org.apache.nutch.protocol.Protocol; |
| import org.apache.nutch.protocol.ProtocolFactory; |
| import org.apache.nutch.protocol.ProtocolOutput; |
| import org.apache.nutch.protocol.ProtocolStatus; |
| import org.apache.nutch.scoring.ScoringFilterException; |
| import org.apache.nutch.scoring.ScoringFilters; |
| import org.apache.nutch.service.NutchServer; |
| import org.apache.nutch.util.StringUtil; |
| import org.apache.nutch.util.URLUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import crawlercommons.robots.BaseRobotRules; |
| |
| /** |
| * This class picks items from queues and fetches the pages. |
| */ |
| public class FetcherThread extends Thread { |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(MethodHandles.lookup().lookupClass()); |
| |
| private Configuration conf; |
| private URLFilters urlFilters; |
| private URLExemptionFilters urlExemptionFilters; |
| private ScoringFilters scfilters; |
| private ParseUtil parseUtil; |
| private URLNormalizers normalizers; |
| private ProtocolFactory protocolFactory; |
| private long maxCrawlDelay; |
| private String queueMode; |
| private int maxRedirect; |
| private boolean maxRedirectExceededSkip = false; |
| private String reprUrl; |
| private boolean redirecting; |
| private int redirectCount; |
| private boolean ignoreInternalLinks; |
| private boolean ignoreExternalLinks; |
| private boolean ignoreAlsoRedirects; |
| private String ignoreExternalLinksMode; |
| |
| // Used by fetcher.follow.outlinks.depth in parse |
| private final int maxOutlinks; |
| private final int maxOutlinkLength; |
| private final int interval; |
| private int maxOutlinkDepth; |
| private int maxOutlinkDepthNumLinks; |
| private boolean outlinksIgnoreExternal; |
| |
| URLFilters urlFiltersForOutlinks; |
| URLNormalizers normalizersForOutlinks; |
| |
| private boolean skipTruncated; |
| |
| private boolean halted = false; |
| |
| private AtomicInteger activeThreads; |
| |
| private Object fetchQueues; |
| |
| private QueueFeeder feeder; |
| |
| private Object spinWaiting; |
| |
| private AtomicLong lastRequestStart; |
| |
| private AtomicInteger errors; |
| |
| private String segmentName; |
| |
| private boolean parsing; |
| |
| private FetcherRun.Context context; |
| |
| private boolean storingContent; |
| |
| private boolean signatureWithoutParsing; |
| |
| private AtomicInteger pages; |
| |
| private AtomicLong bytes; |
| |
| private List<Content> robotsTxtContent = null; |
| |
| //Used by the REST service |
| private FetchNode fetchNode; |
| private boolean reportToNutchServer; |
| |
| //Used for publishing events |
| private FetcherThreadPublisher publisher; |
| private boolean activatePublisher; |
| |
| private ProtocolLogUtil logUtil = new ProtocolLogUtil(); |
| |
| public FetcherThread(Configuration conf, AtomicInteger activeThreads, FetchItemQueues fetchQueues, |
| QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong lastRequestStart, FetcherRun.Context context, |
| AtomicInteger errors, String segmentName, boolean parsing, boolean storingContent, |
| AtomicInteger pages, AtomicLong bytes) { |
| this.setDaemon(true); // don't hang JVM on exit |
| this.setName("FetcherThread"); // use an informative name |
| this.conf = conf; |
| this.urlFilters = new URLFilters(conf); |
| this.urlExemptionFilters = new URLExemptionFilters(conf); |
| this.scfilters = new ScoringFilters(conf); |
| this.parseUtil = new ParseUtil(conf); |
| this.skipTruncated = conf.getBoolean(ParseSegment.SKIP_TRUNCATED, true); |
| this.signatureWithoutParsing = conf.getBoolean("fetcher.signature", false); |
| this.protocolFactory = new ProtocolFactory(conf); |
| this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER); |
| this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000; |
| this.activeThreads = activeThreads; |
| this.fetchQueues = fetchQueues; |
| this.feeder = feeder; |
| this.spinWaiting = spinWaiting; |
| this.lastRequestStart = lastRequestStart; |
| this.context = context; |
| this.errors = errors; |
| this.segmentName = segmentName; |
| this.parsing = parsing; |
| this.storingContent = storingContent; |
| this.pages = pages; |
| this.bytes = bytes; |
| |
| this.logUtil.setConf(conf); |
| |
| // NUTCH-2413 Apply filters and normalizers on outlinks |
| // when parsing only if configured |
| if (parsing) { |
| if (conf.getBoolean("parse.filter.urls", true)) |
| this.urlFiltersForOutlinks = urlFilters; |
| if (conf.getBoolean("parse.normalize.urls", true)) |
| this.normalizersForOutlinks = new URLNormalizers(conf, |
| URLNormalizers.SCOPE_OUTLINK); |
| } |
| |
| if((activatePublisher=conf.getBoolean("fetcher.publisher", false))) |
| this.publisher = new FetcherThreadPublisher(conf); |
| |
| queueMode = conf.get("fetcher.queue.mode", |
| FetchItemQueues.QUEUE_MODE_HOST); |
| queueMode = FetchItemQueues.checkQueueMode(queueMode); |
| LOG.info("{} {} Using queue mode : {}", getName(), |
| Thread.currentThread().getId(), queueMode); |
| |
| this.maxRedirect = conf.getInt("http.redirect.max", 3); |
| this.maxRedirectExceededSkip = conf |
| .getBoolean("http.redirect.max.exceeded.skip", false); |
| |
| int maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100); |
| maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE |
| : maxOutlinksPerPage; |
| int maxOutlinkL = conf.getInt("db.max.outlink.length", 4096); |
| maxOutlinkLength = (maxOutlinkL < 0) ? Integer.MAX_VALUE : maxOutlinkL; |
| interval = conf.getInt("db.fetch.interval.default", 2592000); |
| ignoreInternalLinks = conf.getBoolean("db.ignore.internal.links", false); |
| ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false); |
| ignoreAlsoRedirects = conf.getBoolean("db.ignore.also.redirects", true); |
| ignoreExternalLinksMode = conf.get("db.ignore.external.links.mode", "byHost"); |
| maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1); |
| outlinksIgnoreExternal = conf.getBoolean( |
| "fetcher.follow.outlinks.ignore.external", false); |
| maxOutlinkDepthNumLinks = conf.getInt( |
| "fetcher.follow.outlinks.num.links", 4); |
| if (conf.getBoolean("fetcher.store.robotstxt", false)) { |
| if (storingContent) { |
| robotsTxtContent = new LinkedList<>(); |
| } else { |
| LOG.warn( |
| "{} {} Ignoring fetcher.store.robotstxt because not storing content (fetcher.store.content)!", |
| getName(), Thread.currentThread().getId()); |
| } |
| } |
| } |
| |
| @SuppressWarnings("fallthrough") |
| public void run() { |
| activeThreads.incrementAndGet(); // count threads |
| |
| FetchItem fit = null; |
| try { |
| // checking for the server to be running and fetcher.parse to be true |
| if (parsing && NutchServer.getInstance().isRunning()) |
| reportToNutchServer = true; |
| |
| while (true) { |
| // creating FetchNode for storing in FetchNodeDb |
| if (reportToNutchServer) |
| this.fetchNode = new FetchNode(); |
| else |
| this.fetchNode = null; |
| |
| // check whether must be stopped |
| if (isHalted()) { |
| LOG.debug("{} set to halted", getName()); |
| fit = null; |
| return; |
| } |
| |
| fit = ((FetchItemQueues) fetchQueues).getFetchItem(); |
| if (fit == null) { |
| if (feeder.isAlive() || ((FetchItemQueues) fetchQueues).getTotalSize() > 0) { |
| LOG.debug("{} spin-waiting ...", getName()); |
| // spin-wait. |
| ((AtomicInteger) spinWaiting).incrementAndGet(); |
| try { |
| Thread.sleep(500); |
| } catch (Exception e) { |
| } |
| ((AtomicInteger) spinWaiting).decrementAndGet(); |
| continue; |
| } else { |
| // all done, finish this thread |
| LOG.info("{} {} has no more work available", getName(), |
| Thread.currentThread().getId()); |
| return; |
| } |
| } |
| lastRequestStart.set(System.currentTimeMillis()); |
| |
| Text reprUrlWritable = (Text) fit.datum.getMetaData().get( |
| Nutch.WRITABLE_REPR_URL_KEY); |
| if (reprUrlWritable == null) { |
| setReprUrl(fit.url.toString()); |
| } else { |
| setReprUrl(reprUrlWritable.toString()); |
| } |
| |
| try { |
| // fetch the page |
| redirecting = false; |
| redirectCount = 0; |
| |
| //Publisher event |
| if(activatePublisher) { |
| FetcherThreadEvent startEvent = new FetcherThreadEvent(PublishEventType.START, fit.getUrl().toString()); |
| publisher.publish(startEvent, conf); |
| } |
| |
| do { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("{} {} fetching {} (queue crawl delay={}ms)", getName(), |
| Thread.currentThread().getId(), fit.url, |
| ((FetchItemQueues) fetchQueues) |
| .getFetchItemQueue(fit.queueID).crawlDelay); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("redirectCount={}", redirectCount); |
| } |
| redirecting = false; |
| Protocol protocol = this.protocolFactory.getProtocol(fit.u); |
| BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum, |
| robotsTxtContent); |
| if (robotsTxtContent != null) { |
| outputRobotsTxt(robotsTxtContent); |
| robotsTxtContent.clear(); |
| } |
| if (!rules.isAllowed(fit.url.toString())) { |
| // unblock |
| ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true); |
| LOG.info("Denied by robots.txt: {}", fit.url); |
| output(fit.url, fit.datum, null, |
| ProtocolStatus.STATUS_ROBOTS_DENIED, |
| CrawlDatum.STATUS_FETCH_GONE); |
| context.getCounter("FetcherStatus", "robots_denied").increment(1); |
| continue; |
| } |
| if (rules.getCrawlDelay() > 0) { |
| if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) { |
| // unblock |
| ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true); |
| LOG.info("Crawl-Delay for {} too long ({}), skipping", fit.url, |
| rules.getCrawlDelay()); |
| output(fit.url, fit.datum, null, |
| ProtocolStatus.STATUS_ROBOTS_DENIED, |
| CrawlDatum.STATUS_FETCH_GONE); |
| context.getCounter("FetcherStatus", |
| "robots_denied_maxcrawldelay").increment(1); |
| continue; |
| } else { |
| FetchItemQueue fiq = ((FetchItemQueues) fetchQueues) |
| .getFetchItemQueue(fit.queueID); |
| fiq.crawlDelay = rules.getCrawlDelay(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Crawl delay for queue: " + fit.queueID |
| + " is set to " + fiq.crawlDelay |
| + " as per robots.txt. url: " + fit.url); |
| } |
| } |
| } |
| ProtocolOutput output = protocol.getProtocolOutput(fit.url, |
| fit.datum); |
| ProtocolStatus status = output.getStatus(); |
| Content content = output.getContent(); |
| ParseStatus pstatus = null; |
| // unblock queue |
| ((FetchItemQueues) fetchQueues).finishFetchItem(fit); |
| |
| // used for FetchNode |
| if (fetchNode != null) { |
| fetchNode.setStatus(status.getCode()); |
| fetchNode.setFetchTime(System.currentTimeMillis()); |
| fetchNode.setUrl(fit.url); |
| } |
| |
| //Publish fetch finish event |
| if(activatePublisher) { |
| FetcherThreadEvent endEvent = new FetcherThreadEvent(PublishEventType.END, fit.getUrl().toString()); |
| endEvent.addEventData("status", status.getName()); |
| publisher.publish(endEvent, conf); |
| } |
| context.getCounter("FetcherStatus", status.getName()).increment(1); |
| |
| switch (status.getCode()) { |
| |
| case ProtocolStatus.WOULDBLOCK: |
| // retry ? |
| ((FetchItemQueues) fetchQueues).addFetchItem(fit); |
| break; |
| |
| case ProtocolStatus.SUCCESS: // got a page |
| pstatus = output(fit.url, fit.datum, content, status, |
| CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth); |
| updateStatus(content.getContent().length); |
| if (pstatus != null && pstatus.isSuccess() |
| && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { |
| String newUrl = pstatus.getMessage(); |
| int refreshTime = Integer.valueOf(pstatus.getArgs()[1]); |
| Text redirUrl = handleRedirect(fit, newUrl, |
| refreshTime < Fetcher.PERM_REFRESH_TIME, |
| Fetcher.CONTENT_REDIR); |
| if (redirUrl != null) { |
| fit = queueRedirect(redirUrl, fit); |
| } |
| } |
| break; |
| |
| case ProtocolStatus.MOVED: // redirect |
| case ProtocolStatus.TEMP_MOVED: |
| int code; |
| boolean temp; |
| if (status.getCode() == ProtocolStatus.MOVED) { |
| code = CrawlDatum.STATUS_FETCH_REDIR_PERM; |
| temp = false; |
| } else { |
| code = CrawlDatum.STATUS_FETCH_REDIR_TEMP; |
| temp = true; |
| } |
| output(fit.url, fit.datum, content, status, code); |
| String newUrl = status.getMessage(); |
| Text redirUrl = handleRedirect(fit, newUrl, temp, |
| Fetcher.PROTOCOL_REDIR); |
| if (redirUrl != null) { |
| fit = queueRedirect(redirUrl, fit); |
| } else { |
| // stop redirecting |
| redirecting = false; |
| } |
| break; |
| |
| case ProtocolStatus.EXCEPTION: |
| logError(fit.url, status.getMessage()); |
| int killedURLs = ((FetchItemQueues) fetchQueues).checkExceptionThreshold(fit |
| .getQueueID()); |
| if (killedURLs != 0) |
| context.getCounter("FetcherStatus", |
| "AboveExceptionThresholdInQueue").increment(killedURLs); |
| /* FALLTHROUGH */ |
| case ProtocolStatus.RETRY: // retry |
| case ProtocolStatus.BLOCKED: |
| output(fit.url, fit.datum, null, status, |
| CrawlDatum.STATUS_FETCH_RETRY); |
| break; |
| |
| case ProtocolStatus.GONE: // gone |
| case ProtocolStatus.NOTFOUND: |
| case ProtocolStatus.ACCESS_DENIED: |
| case ProtocolStatus.ROBOTS_DENIED: |
| output(fit.url, fit.datum, null, status, |
| CrawlDatum.STATUS_FETCH_GONE); |
| break; |
| |
| case ProtocolStatus.NOTMODIFIED: |
| output(fit.url, fit.datum, null, status, |
| CrawlDatum.STATUS_FETCH_NOTMODIFIED); |
| break; |
| |
| default: |
| if (LOG.isWarnEnabled()) { |
| LOG.warn("{} {} Unknown ProtocolStatus: {}", getName(), |
| Thread.currentThread().getId(), status.getCode()); |
| } |
| output(fit.url, fit.datum, null, status, |
| CrawlDatum.STATUS_FETCH_RETRY); |
| } |
| |
| if (redirecting && redirectCount > maxRedirect) { |
| ((FetchItemQueues) fetchQueues).finishFetchItem(fit); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("{} {} - redirect count exceeded {} ({})", getName(), |
| Thread.currentThread().getId(), fit.url, |
| maxRedirectExceededSkip ? "skipped" : "linked"); |
| } |
| if (maxRedirectExceededSkip) { |
| // skip redirect target when redirect count is exceeded |
| } else { |
| Text newUrl = new Text(status.getMessage()); |
| CrawlDatum newDatum = createRedirDatum(newUrl, fit, |
| CrawlDatum.STATUS_LINKED); |
| output(newUrl, newDatum, null, null, CrawlDatum.STATUS_LINKED); |
| } |
| } |
| |
| } while (redirecting && (redirectCount <= maxRedirect)); |
| |
| } catch (Throwable t) { // unexpected exception |
| // unblock |
| ((FetchItemQueues) fetchQueues).finishFetchItem(fit); |
| String message; |
| if (LOG.isDebugEnabled()) { |
| message = StringUtils.stringifyException(t); |
| } else if (logUtil.logShort(t)) { |
| message = t.getClass().getName(); |
| } else { |
| message = StringUtils.stringifyException(t); |
| } |
| logError(fit.url, message); |
| output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, |
| CrawlDatum.STATUS_FETCH_RETRY); |
| } |
| } |
| |
| } catch (Throwable e) { |
| if (LOG.isErrorEnabled()) { |
| LOG.error("fetcher caught:", e); |
| } |
| } finally { |
| if (fit != null) |
| ((FetchItemQueues) fetchQueues).finishFetchItem(fit); |
| activeThreads.decrementAndGet(); // count threads |
| LOG.info("{} {} -finishing thread {}, activeThreads={}", getName(), |
| Thread.currentThread().getId(), getName(), activeThreads); |
| } |
| } |
| |
| private Text handleRedirect(FetchItem fit, String newUrl, |
| boolean temp, String redirType) |
| throws MalformedURLException, URLFilterException, InterruptedException { |
| if (newUrl.length() > maxOutlinkLength) { |
| return null; |
| } |
| newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER); |
| newUrl = urlFilters.filter(newUrl); |
| String urlString = fit.url.toString(); |
| |
| if (newUrl == null || newUrl.equals(urlString)) { |
| LOG.debug(" - {} redirect skipped: {}", redirType, |
| (newUrl != null ? "to same url" : "filtered")); |
| return null; |
| } |
| |
| if (ignoreAlsoRedirects && (ignoreExternalLinks || ignoreInternalLinks)) { |
| try { |
| URL origUrl = fit.u; |
| URL redirUrl = new URL(newUrl); |
| if (ignoreExternalLinks) { |
| String origHostOrDomain, newHostOrDomain; |
| if ("bydomain".equalsIgnoreCase(ignoreExternalLinksMode)) { |
| origHostOrDomain = URLUtil.getDomainName(origUrl).toLowerCase(); |
| newHostOrDomain = URLUtil.getDomainName(redirUrl).toLowerCase(); |
| } else { |
| // byHost |
| origHostOrDomain = origUrl.getHost().toLowerCase(); |
| newHostOrDomain = redirUrl.getHost().toLowerCase(); |
| } |
| if (!origHostOrDomain.equals(newHostOrDomain)) { |
| LOG.debug( |
| " - ignoring redirect {} from {} to {} because external links are ignored", |
| redirType, urlString, newUrl); |
| return null; |
| } |
| } |
| |
| if (ignoreInternalLinks) { |
| String origHost = origUrl.getHost().toLowerCase(); |
| String newHost = redirUrl.getHost().toLowerCase(); |
| if (origHost.equals(newHost)) { |
| LOG.debug( |
| " - ignoring redirect {} from {} to {} because internal links are ignored", |
| redirType, urlString, newUrl); |
| return null; |
| } |
| } |
| } catch (MalformedURLException e) { |
| return null; |
| } |
| } |
| |
| reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp); |
| Text url = new Text(newUrl); |
| if (maxRedirect > 0) { |
| redirecting = true; |
| redirectCount++; |
| LOG.debug(" - {} redirect to {} (fetching now)", redirType, url); |
| return url; |
| } else { |
| CrawlDatum newDatum = createRedirDatum(url, fit, CrawlDatum.STATUS_LINKED); |
| output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED); |
| LOG.debug(" - {} redirect to {} (fetching later)", redirType, url); |
| return null; |
| } |
| } |
| |
| private CrawlDatum createRedirDatum(Text redirUrl, FetchItem fit, byte status) { |
| CrawlDatum newDatum = new CrawlDatum(status, fit.datum.getFetchInterval(), |
| fit.datum.getScore()); |
| // transfer existing metadata |
| newDatum.getMetaData().putAll(fit.datum.getMetaData()); |
| try { |
| scfilters.initialScore(redirUrl, newDatum); |
| } catch (ScoringFilterException e) { |
| LOG.error("Scoring filtering failed for {}: ", redirUrl, e); |
| } |
| if (reprUrl != null) { |
| newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, |
| new Text(reprUrl)); |
| } |
| return newDatum; |
| } |
| |
| private FetchItem queueRedirect(Text redirUrl, FetchItem fit) |
| throws ScoringFilterException { |
| CrawlDatum newDatum = createRedirDatum(redirUrl, fit, CrawlDatum.STATUS_DB_UNFETCHED); |
| fit = FetchItem.create(redirUrl, newDatum, queueMode); |
| if (fit != null) { |
| FetchItemQueue fiq = ((FetchItemQueues) fetchQueues).getFetchItemQueue(fit.queueID); |
| fiq.addInProgressFetchItem(fit); |
| } else { |
| // stop redirecting |
| redirecting = false; |
| context.getCounter("FetcherStatus", "FetchItem.notCreated.redirect").increment(1); |
| } |
| return fit; |
| } |
| |
| private void logError(Text url, String message) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("{} {} fetch of {} failed with: {}", getName(), |
| Thread.currentThread().getId(), url, message); |
| } |
| errors.incrementAndGet(); |
| } |
| |
| private ParseStatus output(Text key, CrawlDatum datum, Content content, |
| ProtocolStatus pstatus, int status) throws InterruptedException{ |
| return output(key, datum, content, pstatus, status, 0); |
| } |
| |
| private ParseStatus output(Text key, CrawlDatum datum, Content content, |
| ProtocolStatus pstatus, int status, int outlinkDepth) throws InterruptedException{ |
| |
| datum.setStatus(status); |
| datum.setFetchTime(System.currentTimeMillis()); |
| if (pstatus != null) |
| datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus); |
| |
| ParseResult parseResult = null; |
| if (content != null) { |
| Metadata metadata = content.getMetadata(); |
| |
| // store the guessed content type in the crawldatum |
| if (content.getContentType() != null) |
| datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE), |
| new Text(content.getContentType())); |
| |
| // add segment to metadata |
| metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName); |
| // add score to content metadata so that ParseSegment can pick it up. |
| try { |
| scfilters.passScoreBeforeParsing(key, datum, content); |
| } catch (Exception e) { |
| if (LOG.isWarnEnabled()) { |
| LOG.warn("{} {} Couldn't pass score, url {} ({})", getName(), |
| Thread.currentThread().getId(), key, e); |
| } |
| } |
| |
| if (status == CrawlDatum.STATUS_FETCH_SUCCESS) { |
| if (parsing && !(skipTruncated && ParseSegment.isTruncated(content))) { |
| try { |
| parseResult = this.parseUtil.parse(content); |
| } catch (Exception e) { |
| LOG.warn("{} {} Error parsing: {}: {}", getName(), |
| Thread.currentThread().getId(), key, |
| StringUtils.stringifyException(e)); |
| } |
| } |
| |
| if (parseResult == null && (parsing || signatureWithoutParsing)) { |
| byte[] signature = SignatureFactory.getSignature(conf) |
| .calculate(content, new ParseStatus().getEmptyParse(conf)); |
| datum.setSignature(signature); |
| } |
| } |
| |
| /* |
| * Store status code in content So we can read this value during parsing |
| * (as a separate job) and decide to parse or not. |
| */ |
| content.getMetadata().add(Nutch.FETCH_STATUS_KEY, |
| Integer.toString(status)); |
| } |
| |
| try { |
| context.write(key, new NutchWritable(datum)); |
| if (content != null && storingContent) |
| context.write(key, new NutchWritable(content)); |
| if (parseResult != null) { |
| for (Entry<Text, Parse> entry : parseResult) { |
| Text url = entry.getKey(); |
| Parse parse = entry.getValue(); |
| ParseStatus parseStatus = parse.getData().getStatus(); |
| ParseData parseData = parse.getData(); |
| |
| if (!parseStatus.isSuccess()) { |
| LOG.warn("{} {} Error parsing: {}: {}", getName(), |
| Thread.currentThread().getId(), key, parseStatus); |
| parse = parseStatus.getEmptyParse(conf); |
| } |
| |
| // Calculate page signature. For non-parsing fetchers this will |
| // be done in ParseSegment |
| byte[] signature = SignatureFactory.getSignature(conf) |
| .calculate(content, parse); |
| // Ensure segment name and score are in parseData metadata |
| parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName); |
| parseData.getContentMeta().set(Nutch.SIGNATURE_KEY, |
| StringUtil.toHexString(signature)); |
| // Pass fetch time to content meta |
| parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY, |
| Long.toString(datum.getFetchTime())); |
| if (url.equals(key)) |
| datum.setSignature(signature); |
| try { |
| scfilters.passScoreAfterParsing(url, content, parse); |
| } catch (Exception e) { |
| if (LOG.isWarnEnabled()) { |
| LOG.warn("{} {} Couldn't pass score, url {} ({})", getName(), |
| Thread.currentThread().getId(), key, e); |
| } |
| } |
| |
| String origin = null; |
| |
| // collect outlinks for subsequent db update |
| Outlink[] links = parseData.getOutlinks(); |
| int outlinksToStore = Math.min(maxOutlinks, links.length); |
| if (ignoreExternalLinks || ignoreInternalLinks) { |
| URL originURL = new URL(url.toString()); |
| // based on domain? |
| if ("bydomain".equalsIgnoreCase(ignoreExternalLinksMode)) { |
| origin = URLUtil.getDomainName(originURL).toLowerCase(); |
| } |
| // use host |
| else { |
| origin = originURL.getHost().toLowerCase(); |
| } |
| } |
| |
| //used by fetchNode |
| if(fetchNode!=null){ |
| fetchNode.setOutlinks(links); |
| fetchNode.setTitle(parseData.getTitle()); |
| FetchNodeDb.getInstance().put(fetchNode.getUrl().toString(), fetchNode); |
| } |
| int validCount = 0; |
| |
| // Process all outlinks, normalize, filter and deduplicate |
| List<Outlink> outlinkList = new ArrayList<>(outlinksToStore); |
| HashSet<String> outlinks = new HashSet<>(outlinksToStore); |
| for (int i = 0; i < links.length && validCount < outlinksToStore; i++) { |
| String toUrl = links[i].getToUrl(); |
| |
| if (toUrl.length() > maxOutlinkLength) { |
| continue; |
| } |
| toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl, |
| origin, ignoreInternalLinks, ignoreExternalLinks, |
| ignoreExternalLinksMode, urlFiltersForOutlinks, |
| urlExemptionFilters, normalizersForOutlinks); |
| if (toUrl == null) { |
| continue; |
| } |
| |
| validCount++; |
| links[i].setUrl(toUrl); |
| outlinkList.add(links[i]); |
| outlinks.add(toUrl); |
| } |
| |
| //Publish fetch report event |
| if(activatePublisher) { |
| FetcherThreadEvent reportEvent = new FetcherThreadEvent(PublishEventType.REPORT, url.toString()); |
| reportEvent.addOutlinksToEventData(outlinkList); |
| reportEvent.addEventData(Nutch.FETCH_EVENT_TITLE, parseData.getTitle()); |
| reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTTYPE, parseData.getContentMeta().get("content-type")); |
| reportEvent.addEventData(Nutch.FETCH_EVENT_SCORE, datum.getScore()); |
| reportEvent.addEventData(Nutch.FETCH_EVENT_FETCHTIME, datum.getFetchTime()); |
| reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTLANG, parseData.getContentMeta().get("content-language")); |
| publisher.publish(reportEvent, conf); |
| } |
| // Only process depth N outlinks |
| if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) { |
| FetchItem ft = FetchItem.create(url, null, queueMode); |
| FetchItemQueue queue = ((FetchItemQueues) fetchQueues).getFetchItemQueue(ft.queueID); |
| queue.alreadyFetched.add(url.toString().hashCode()); |
| |
| context.getCounter("FetcherOutlinks", "outlinks_detected").increment( |
| outlinks.size()); |
| |
| // Counter to limit num outlinks to follow per page |
| int outlinkCounter = 0; |
| |
| String followUrl; |
| |
| // Walk over the outlinks and add as new FetchItem to the queues |
| Iterator<String> iter = outlinks.iterator(); |
| while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) { |
| followUrl = iter.next(); |
| |
| // Check whether we'll follow external outlinks |
| if (outlinksIgnoreExternal) { |
| if (!URLUtil.getHost(url.toString()).equals( |
| URLUtil.getHost(followUrl))) { |
| continue; |
| } |
| } |
| |
| // Already followed? |
| int urlHashCode = followUrl.hashCode(); |
| if (queue.alreadyFetched.contains(urlHashCode)) { |
| continue; |
| } |
| queue.alreadyFetched.add(urlHashCode); |
| |
| // Create new FetchItem with depth incremented |
| FetchItem fit = FetchItem.create(new Text(followUrl), |
| new CrawlDatum(CrawlDatum.STATUS_LINKED, interval), |
| queueMode, outlinkDepth + 1); |
| |
| context.getCounter("FetcherOutlinks", "outlinks_following").increment(1); |
| |
| ((FetchItemQueues) fetchQueues).addFetchItem(fit); |
| |
| outlinkCounter++; |
| } |
| } |
| |
| // Overwrite the outlinks in ParseData with the normalized and |
| // filtered set |
| parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList |
| .size()])); |
| |
| context.write(url, new NutchWritable(new ParseImpl(new ParseText( |
| parse.getText()), parseData, parse.isCanonical()))); |
| } |
| } |
| } catch (IOException e) { |
| if (LOG.isErrorEnabled()) { |
| LOG.error("fetcher caught:", e); |
| } |
| } |
| |
| // return parse status (of the "original" URL if the ParseResult contains |
| // multiple parses) which allows Fetcher to follow meta-redirects |
| if (parseResult != null && !parseResult.isEmpty()) { |
| Parse p = parseResult.get(content.getUrl()); |
| if (p != null) { |
| context.getCounter("ParserStatus", ParseStatus.majorCodes[p |
| .getData().getStatus().getMajorCode()]).increment(1); |
| return p.getData().getStatus(); |
| } |
| } |
| return null; |
| } |
| |
| private void outputRobotsTxt(List<Content> robotsTxtContent) throws InterruptedException { |
| for (Content robotsTxt : robotsTxtContent) { |
| LOG.debug("fetched and stored robots.txt {}", |
| robotsTxt.getUrl()); |
| try { |
| context.write(new Text(robotsTxt.getUrl()), |
| new NutchWritable(robotsTxt)); |
| } catch (IOException e) { |
| LOG.error("fetcher caught:", e); |
| } |
| } |
| } |
| |
| private void updateStatus(int bytesInPage) throws IOException { |
| pages.incrementAndGet(); |
| bytes.addAndGet(bytesInPage); |
| } |
| |
| public synchronized void setHalted(boolean halted) { |
| this.halted = halted; |
| } |
| |
| public synchronized boolean isHalted() { |
| return halted; |
| } |
| |
| public String getReprUrl() { |
| return reprUrl; |
| } |
| |
| private void setReprUrl(String urlString) { |
| this.reprUrl = urlString; |
| |
| } |
| |
| } |