blob: f5fa6631f5e57a1908ca8b9dce450ee36328ff87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.fetcher;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.fetcher.Fetcher.FetcherRun;
import org.apache.nutch.net.URLFilterException;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class feeds the queues with input items, and re-fills them as items
* are consumed by FetcherThread-s.
*/
public class QueueFeeder extends Thread {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
private FetcherRun.Context context;
private FetchItemQueues queues;
private int size;
private long timelimit = -1;
private URLFilters urlFilters = null;
private URLNormalizers urlNormalizers = null;
private String urlNormalizerScope = URLNormalizers.SCOPE_DEFAULT;
public QueueFeeder(FetcherRun.Context context,
FetchItemQueues queues, int size) {
this.context = context;
this.queues = queues;
this.size = size;
this.setDaemon(true);
this.setName("QueueFeeder");
Configuration conf = context.getConfiguration();
if (conf.getBoolean("fetcher.filter.urls", false)) {
urlFilters = new URLFilters(conf);
}
if (conf.getBoolean("fetcher.normalize.urls", false)) {
urlNormalizers = new URLNormalizers(conf, urlNormalizerScope);
}
}
public void setTimeLimit(long tl) {
timelimit = tl;
}
/** Filter and normalize the url */
private String filterNormalize(String url) {
if (url != null) {
try {
if (urlNormalizers != null)
url = urlNormalizers.normalize(url, urlNormalizerScope); // normalize the url
if (urlFilters != null)
url = urlFilters.filter(url);
} catch (MalformedURLException | URLFilterException e) {
LOG.warn("Skipping {}: {}", url, e);
url = null;
}
}
return url;
}
public void run() {
boolean hasMore = true;
int cnt = 0;
int timelimitcount = 0;
while (hasMore) {
if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
// enough ... lets' simply read all the entries from the input without
// processing them
try {
hasMore = context.nextKeyValue();
timelimitcount++;
} catch (IOException e) {
LOG.error("QueueFeeder error reading input, record " + cnt, e);
return;
} catch (InterruptedException e) {
LOG.info("QueueFeeder interrupted, exception:", e);
return;
}
continue;
}
int feed = size - queues.getTotalSize();
if (feed <= 0) {
// queues are full - spin-wait until they have some free space
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
continue;
}
LOG.debug("-feeding {} input urls ...", feed);
while (feed > 0 && hasMore) {
try {
hasMore = context.nextKeyValue();
if (hasMore) {
Text url = context.getCurrentKey();
if (urlFilters != null || urlNormalizers != null) {
String u = filterNormalize(url.toString());
if (u == null) {
// filtered or failed to normalize
context.getCounter("FetcherStatus", "filtered").increment(1);
continue;
}
url = new Text(u);
}
/*
* Need to copy key and value objects because MapReduce will reuse
* the original objects while the objects are stored in the queue.
*/
else {
url = new Text(url);
}
CrawlDatum datum = new CrawlDatum();
datum.set((CrawlDatum) context.getCurrentValue());
queues.addFetchItem(url, datum);
cnt++;
feed--;
}
} catch (IOException e) {
LOG.error("QueueFeeder error reading input, record " + cnt, e);
return;
} catch (InterruptedException e) {
LOG.info("QueueFeeder interrupted, exception:", e);
}
}
}
LOG.info("QueueFeeder finished: total {} records hit by time limit : {}",
cnt, timelimitcount);
}
}