blob: 5d36fe9a3c8a66f9c9bafcaea8f125c14dea659b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.crawl;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.nutch.crawl.CrawlDbUpdateUtil;
import org.apache.nutch.protocol.Content;
import org.apache.nutch.util.TimingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Emulate a continuous crawl for one URL.
*
*/
public class ContinuousCrawlTestUtil extends TestCase {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
protected static Text dummyURL = new Text("http://nutch.apache.org/");
protected static Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context defaultContext = CrawlDBTestUtil.createContext();
protected long interval = FetchSchedule.SECONDS_PER_DAY * 1000; // (default)
// launch
// crawler
// every day
protected long duration = 2 * 365L * FetchSchedule.SECONDS_PER_DAY * 1000L; // run
// for
// two
// years
protected Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context;
private FetchSchedule schedule;
/** status a fetched datum should get */
protected byte fetchStatus = CrawlDatum.STATUS_FETCH_SUCCESS;
/** expected status of the resulting Db datum */
protected byte expectedDbStatus = CrawlDatum.STATUS_DB_FETCHED;
/** for signature calculation */
protected Signature signatureImpl;
protected Content content = new Content();
{
byte[] data = { 'n', 'u', 't', 'c', 'h' };
content.setContent(data);
}
protected ContinuousCrawlTestUtil(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context cont) {
context = cont;
Configuration conf = context.getConfiguration();
schedule = FetchScheduleFactory.getFetchSchedule(conf);
signatureImpl = SignatureFactory.getSignature(conf);
}
protected ContinuousCrawlTestUtil(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context cont, byte fetchStatus,
byte expectedDbStatus) {
this(cont);
this.fetchStatus = fetchStatus;
this.expectedDbStatus = expectedDbStatus;
}
protected ContinuousCrawlTestUtil() {
this(defaultContext);
}
protected ContinuousCrawlTestUtil(byte fetchStatus, byte expectedDbStatus) {
this(defaultContext, fetchStatus, expectedDbStatus);
}
/** set the interval the crawl is relaunched (default: every day) */
protected void setInterval(int seconds) {
interval = seconds * 1000L;
}
/** set the duration of the continuous crawl (default = 2 years) */
protected void setDuraction(int seconds) {
duration = seconds * 1000L;
}
/**
* default fetch action: set status and time
*
* @param datum
* CrawlDatum to fetch
* @param currentTime
* current time used to set the fetch time via
* {@link CrawlDatum#setFetchTime(long)}
* @return the modified CrawlDatum
*/
protected CrawlDatum fetch(CrawlDatum datum, long currentTime) {
datum.setStatus(fetchStatus);
datum.setFetchTime(currentTime);
return datum;
}
/**
* get signature for content and configured signature implementation
*/
protected byte[] getSignature() {
return signatureImpl.calculate(content, null);
}
/**
* change content to force a changed signature
*/
protected void changeContent() {
byte[] data = Arrays.copyOf(content.getContent(),
content.getContent().length + 1);
data[content.getContent().length] = '2'; // append one byte
content.setContent(data);
LOG.info("document content changed");
}
/**
* default parse action: add signature if successfully fetched
*
* @param fetchDatum
* fetch datum
* @return list of all datums resulting from parse (status: signature, linked,
* parse_metadata)
*/
protected List<CrawlDatum> parse(CrawlDatum fetchDatum) {
List<CrawlDatum> parseDatums = new ArrayList<CrawlDatum>(0);
if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_SUCCESS) {
CrawlDatum signatureDatum = new CrawlDatum(CrawlDatum.STATUS_SIGNATURE, 0);
signatureDatum.setSignature(getSignature());
parseDatums.add(signatureDatum);
}
return parseDatums;
}
/**
* default implementation to check the result state
*
* @param datum
* the CrawlDatum to be checked
* @return true if the check succeeds
*/
protected boolean check(CrawlDatum datum) {
if (datum.getStatus() != expectedDbStatus)
return false;
return true;
}
/**
* Run the continuous crawl.
* <p>
* A loop emulates a continuous crawl launched in regular intervals (see
* {@link #setInterval(int)} over a longer period ({@link #setDuraction(int)}.
*
* <ul>
* <li>every "round" emulates
* <ul>
* <li>a fetch (see {@link #fetch(CrawlDatum, long)})</li>
* <li>{@literal updatedb} which returns a {@link CrawlDatum}</li>
* </ul>
* <li>the returned CrawlDatum is used as input for the next round</li>
* <li>and is checked whether it is correct (see {@link #check(CrawlDatum)})
* </ul>
* </p>
*
* @param maxErrors
* (if > 0) continue crawl even if the checked CrawlDatum is not
* correct, but stop after max. number of errors
*
* @return false if a check of CrawlDatum failed, true otherwise
* @throws IOException
*/
protected boolean run(int maxErrors) throws IOException {
long now = System.currentTimeMillis();
CrawlDbUpdateUtil<CrawlDbReducer> updateDb = new CrawlDbUpdateUtil<CrawlDbReducer>(
new CrawlDbReducer(), context);
/* start with a db_unfetched */
CrawlDatum dbDatum = new CrawlDatum();
dbDatum.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
schedule.initializeSchedule(dummyURL, dbDatum); // initialize fetchInterval
dbDatum.setFetchTime(now);
LOG.info("Emulate a continuous crawl, launched every "
+ (interval / (FetchSchedule.SECONDS_PER_DAY * 1000)) + " day ("
+ (interval / 1000) + " seconds)");
long maxTime = (now + duration);
long nextTime = now;
long lastFetchTime = -1;
boolean ok = true; // record failure but keep going
CrawlDatum fetchDatum = new CrawlDatum();
/*
* Keep copies because CrawlDbReducer.reduce() and
* FetchSchedule.shouldFetch() may alter the references. Copies are used for
* verbose logging in case of an error.
*/
CrawlDatum copyDbDatum = new CrawlDatum();
CrawlDatum copyFetchDatum = new CrawlDatum();
CrawlDatum afterShouldFetch = new CrawlDatum();
int errorCount = 0;
while (nextTime < maxTime) {
LOG.info("check: " + new Date(nextTime));
fetchDatum.set(dbDatum);
copyDbDatum.set(dbDatum);
if (schedule.shouldFetch(dummyURL, fetchDatum, nextTime)) {
LOG.info("... fetching now (" + new Date(nextTime) + ")");
if (lastFetchTime > -1) {
LOG.info("(last fetch: " + new Date(lastFetchTime) + " = "
+ TimingUtil.elapsedTime(lastFetchTime, nextTime) + " ago)");
}
lastFetchTime = nextTime;
afterShouldFetch.set(fetchDatum);
fetchDatum = fetch(fetchDatum, nextTime);
copyFetchDatum.set(fetchDatum);
List<CrawlDatum> values = new ArrayList<CrawlDatum>();
values.add(dbDatum);
values.add(fetchDatum);
values.addAll(parse(fetchDatum));
List<CrawlDatum> res = updateDb.update(values);
assertNotNull("null returned", res);
assertFalse("no CrawlDatum", 0 == res.size());
assertEquals("more than one CrawlDatum", 1, res.size());
if (!check(res.get(0))) {
LOG.info("previously in CrawlDb: " + copyDbDatum);
LOG.info("after shouldFetch(): " + afterShouldFetch);
LOG.info("fetch: " + fetchDatum);
LOG.warn("wrong result in CrawlDb: " + res.get(0));
if (++errorCount >= maxErrors) {
if (maxErrors > 0) {
LOG.error("Max. number of errors " + maxErrors
+ " reached. Stopping.");
}
return false;
} else {
ok = false; // record failure but keep going
}
}
/* use the returned CrawlDatum for the next fetch */
dbDatum = res.get(0);
}
nextTime += interval;
}
return ok;
}
}