| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nutch.crawl; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| |
| import org.apache.commons.jexl3.JexlContext; |
| import org.apache.commons.jexl3.JexlExpression; |
| import org.apache.commons.jexl3.MapContext; |
| import org.apache.hadoop.io.FloatWritable; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.VersionMismatchException; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableComparable; |
| import org.apache.hadoop.io.WritableComparator; |
| import org.apache.nutch.protocol.ProtocolStatus; |
| import org.apache.nutch.util.StringUtil; |
| |
| /* The crawl state of a url. */ |
| public class CrawlDatum implements WritableComparable<CrawlDatum>, Cloneable { |
| |
| public static final String GENERATE_DIR_NAME = "crawl_generate"; |
| public static final String FETCH_DIR_NAME = "crawl_fetch"; |
| public static final String PARSE_DIR_NAME = "crawl_parse"; |
| |
| private static final byte CUR_VERSION = 7; |
| |
| /** Compatibility values for on-the-fly conversion from versions < 5. */ |
| private static final byte OLD_STATUS_SIGNATURE = 0; |
| private static final byte OLD_STATUS_DB_UNFETCHED = 1; |
| private static final byte OLD_STATUS_DB_FETCHED = 2; |
| private static final byte OLD_STATUS_DB_GONE = 3; |
| private static final byte OLD_STATUS_LINKED = 4; |
| private static final byte OLD_STATUS_FETCH_SUCCESS = 5; |
| private static final byte OLD_STATUS_FETCH_RETRY = 6; |
| private static final byte OLD_STATUS_FETCH_GONE = 7; |
| |
| private static HashMap<Byte, Byte> oldToNew = new HashMap<>(); |
| |
| /** Page was not fetched yet. */ |
| public static final byte STATUS_DB_UNFETCHED = 0x01; |
| /** Page was successfully fetched. */ |
| public static final byte STATUS_DB_FETCHED = 0x02; |
| /** Page no longer exists. */ |
| public static final byte STATUS_DB_GONE = 0x03; |
| /** Page temporarily redirects to other page. */ |
| public static final byte STATUS_DB_REDIR_TEMP = 0x04; |
| /** Page permanently redirects to other page. */ |
| public static final byte STATUS_DB_REDIR_PERM = 0x05; |
| /** Page was successfully fetched and found not modified. */ |
| public static final byte STATUS_DB_NOTMODIFIED = 0x06; |
| /** Page was marked as being a duplicate of another page */ |
| public static final byte STATUS_DB_DUPLICATE = 0x07; |
| /** Page was marked as orphan, e.g. has no inlinks anymore */ |
| public static final byte STATUS_DB_ORPHAN = 0x08; |
| |
| /** Maximum value of DB-related status. */ |
| public static final byte STATUS_DB_MAX = 0x1f; |
| |
| /** Fetching was successful. */ |
| public static final byte STATUS_FETCH_SUCCESS = 0x21; |
| /** Fetching unsuccessful, needs to be retried (transient errors). */ |
| public static final byte STATUS_FETCH_RETRY = 0x22; |
| /** Fetching temporarily redirected to other page. */ |
| public static final byte STATUS_FETCH_REDIR_TEMP = 0x23; |
| /** Fetching permanently redirected to other page. */ |
| public static final byte STATUS_FETCH_REDIR_PERM = 0x24; |
| /** Fetching unsuccessful - page is gone. */ |
| public static final byte STATUS_FETCH_GONE = 0x25; |
| /** Fetching successful - page is not modified. */ |
| public static final byte STATUS_FETCH_NOTMODIFIED = 0x26; |
| |
| /** Maximum value of fetch-related status. */ |
| public static final byte STATUS_FETCH_MAX = 0x3f; |
| |
| /** Page signature. */ |
| public static final byte STATUS_SIGNATURE = 0x41; |
| /** Page was newly injected. */ |
| public static final byte STATUS_INJECTED = 0x42; |
| /** Page discovered through a link. */ |
| public static final byte STATUS_LINKED = 0x43; |
| /** Page got metadata from a parser */ |
| public static final byte STATUS_PARSE_META = 0x44; |
| |
| public static final HashMap<Byte, String> statNames = new HashMap<>(); |
| static { |
| statNames.put(STATUS_DB_UNFETCHED, "db_unfetched"); |
| statNames.put(STATUS_DB_FETCHED, "db_fetched"); |
| statNames.put(STATUS_DB_GONE, "db_gone"); |
| statNames.put(STATUS_DB_REDIR_TEMP, "db_redir_temp"); |
| statNames.put(STATUS_DB_REDIR_PERM, "db_redir_perm"); |
| statNames.put(STATUS_DB_NOTMODIFIED, "db_notmodified"); |
| statNames.put(STATUS_DB_DUPLICATE, "db_duplicate"); |
| statNames.put(STATUS_DB_ORPHAN, "db_orphan"); |
| statNames.put(STATUS_SIGNATURE, "signature"); |
| statNames.put(STATUS_INJECTED, "injected"); |
| statNames.put(STATUS_LINKED, "linked"); |
| statNames.put(STATUS_FETCH_SUCCESS, "fetch_success"); |
| statNames.put(STATUS_FETCH_RETRY, "fetch_retry"); |
| statNames.put(STATUS_FETCH_REDIR_TEMP, "fetch_redir_temp"); |
| statNames.put(STATUS_FETCH_REDIR_PERM, "fetch_redir_perm"); |
| statNames.put(STATUS_FETCH_GONE, "fetch_gone"); |
| statNames.put(STATUS_FETCH_NOTMODIFIED, "fetch_notmodified"); |
| statNames.put(STATUS_PARSE_META, "parse_metadata"); |
| |
| oldToNew.put(OLD_STATUS_DB_UNFETCHED, STATUS_DB_UNFETCHED); |
| oldToNew.put(OLD_STATUS_DB_FETCHED, STATUS_DB_FETCHED); |
| oldToNew.put(OLD_STATUS_DB_GONE, STATUS_DB_GONE); |
| oldToNew.put(OLD_STATUS_FETCH_GONE, STATUS_FETCH_GONE); |
| oldToNew.put(OLD_STATUS_FETCH_SUCCESS, STATUS_FETCH_SUCCESS); |
| oldToNew.put(OLD_STATUS_FETCH_RETRY, STATUS_FETCH_RETRY); |
| oldToNew.put(OLD_STATUS_LINKED, STATUS_LINKED); |
| oldToNew.put(OLD_STATUS_SIGNATURE, STATUS_SIGNATURE); |
| } |
| |
| private byte status; |
| private long fetchTime = System.currentTimeMillis(); |
| private byte retries; |
| private int fetchInterval; |
| private float score = 0.0f; |
| private byte[] signature = null; |
| private long modifiedTime; |
| private org.apache.hadoop.io.MapWritable metaData; |
| |
| public static boolean hasDbStatus(CrawlDatum datum) { |
| if (datum.status <= STATUS_DB_MAX) |
| return true; |
| return false; |
| } |
| |
| public static boolean hasFetchStatus(CrawlDatum datum) { |
| if (datum.status > STATUS_DB_MAX && datum.status <= STATUS_FETCH_MAX) |
| return true; |
| return false; |
| } |
| |
| public CrawlDatum() { |
| } |
| |
| public CrawlDatum(int status, int fetchInterval) { |
| this(); |
| this.status = (byte) status; |
| this.fetchInterval = fetchInterval; |
| } |
| |
| public CrawlDatum(int status, int fetchInterval, float score) { |
| this(status, fetchInterval); |
| this.score = score; |
| } |
| |
| // |
| // accessor methods |
| // |
| |
| public byte getStatus() { |
| return status; |
| } |
| |
| public static String getStatusName(byte value) { |
| String res = statNames.get(value); |
| if (res == null) |
| res = "unknown"; |
| return res; |
| } |
| |
| public static byte getStatusByName(String name) { |
| for (Entry<Byte, String> status : statNames.entrySet()) { |
| if (name.equalsIgnoreCase(status.getValue())) { |
| return status.getKey(); |
| } |
| } |
| return -1; |
| } |
| |
| public void setStatus(int status) { |
| this.status = (byte) status; |
| } |
| |
| /** |
| * Get the fetch time. |
| * @return long value indicating either the time of the last |
| * fetch, or the next fetch time, depending on whether Fetcher |
| * or CrawlDbReducer set the time. |
| */ |
| public long getFetchTime() { |
| return fetchTime; |
| } |
| |
| /** |
| * Sets either the time of the last fetch or the next fetch time, depending on |
| * whether Fetcher or CrawlDbReducer set the time. |
| * @param fetchTime the fetch time to set. |
| */ |
| public void setFetchTime(long fetchTime) { |
| this.fetchTime = fetchTime; |
| } |
| |
| public long getModifiedTime() { |
| return modifiedTime; |
| } |
| |
| public void setModifiedTime(long modifiedTime) { |
| this.modifiedTime = modifiedTime; |
| } |
| |
| public byte getRetriesSinceFetch() { |
| return retries; |
| } |
| |
| public void setRetriesSinceFetch(int retries) { |
| this.retries = (byte) retries; |
| } |
| |
| public int getFetchInterval() { |
| return fetchInterval; |
| } |
| |
| public void setFetchInterval(int fetchInterval) { |
| this.fetchInterval = fetchInterval; |
| } |
| |
| public void setFetchInterval(float fetchInterval) { |
| this.fetchInterval = Math.round(fetchInterval); |
| } |
| |
| public float getScore() { |
| return score; |
| } |
| |
| public void setScore(float score) { |
| this.score = score; |
| } |
| |
| public byte[] getSignature() { |
| return signature; |
| } |
| |
| public void setSignature(byte[] signature) { |
| if (signature != null && signature.length > 256) |
| throw new RuntimeException("Max signature length (256) exceeded: " |
| + signature.length); |
| this.signature = signature; |
| } |
| |
| public void setMetaData(org.apache.hadoop.io.MapWritable mapWritable) { |
| this.metaData = new org.apache.hadoop.io.MapWritable(mapWritable); |
| } |
| |
| /** |
| * Add all metadata from other CrawlDatum to this CrawlDatum. |
| * |
| * @param other |
| * CrawlDatum |
| */ |
| public void putAllMetaData(CrawlDatum other) { |
| for (Entry<Writable, Writable> e : other.getMetaData().entrySet()) { |
| getMetaData().put(e.getKey(), e.getValue()); |
| } |
| } |
| |
| /** |
| * Get CrawlDatum metadata |
| * @see CrawlDatum#readFields(DataInput) |
| * @return a MapWritable if it was set or read in #readFields(DataInput), |
| * returns empty map in case CrawlDatum was freshly created (lazily |
| * instantiated). |
| */ |
| public org.apache.hadoop.io.MapWritable getMetaData() { |
| if (this.metaData == null) |
| this.metaData = new org.apache.hadoop.io.MapWritable(); |
| return this.metaData; |
| } |
| |
| // |
| // writable methods |
| // |
| |
| public static CrawlDatum read(DataInput in) throws IOException { |
| CrawlDatum result = new CrawlDatum(); |
| result.readFields(in); |
| return result; |
| } |
| |
| public void readFields(DataInput in) throws IOException { |
| byte version = in.readByte(); // read version |
| if (version > CUR_VERSION) // check version |
| throw new VersionMismatchException(CUR_VERSION, version); |
| |
| status = in.readByte(); |
| fetchTime = in.readLong(); |
| retries = in.readByte(); |
| if (version > 5) { |
| fetchInterval = in.readInt(); |
| } else |
| fetchInterval = Math.round(in.readFloat()); |
| score = in.readFloat(); |
| if (version > 2) { |
| modifiedTime = in.readLong(); |
| int cnt = in.readByte(); |
| if (cnt > 0) { |
| signature = new byte[cnt]; |
| in.readFully(signature); |
| } else |
| signature = null; |
| } |
| |
| if (version > 3) { |
| boolean hasMetadata = false; |
| if (version < 7) { |
| org.apache.hadoop.io.MapWritable oldMetaData = new org.apache.hadoop.io.MapWritable(); |
| if (in.readBoolean()) { |
| hasMetadata = true; |
| metaData = new org.apache.hadoop.io.MapWritable(); |
| oldMetaData.readFields(in); |
| } |
| for (Writable key : oldMetaData.keySet()) { |
| metaData.put(key, oldMetaData.get(key)); |
| } |
| } else { |
| if (in.readBoolean()) { |
| hasMetadata = true; |
| metaData = new org.apache.hadoop.io.MapWritable(); |
| metaData.readFields(in); |
| } |
| } |
| if (hasMetadata == false) |
| metaData = null; |
| } |
| // translate status codes |
| if (version < 5) { |
| if (oldToNew.containsKey(status)) |
| status = oldToNew.get(status); |
| else |
| status = STATUS_DB_UNFETCHED; |
| |
| } |
| } |
| |
| /** The number of bytes into a CrawlDatum that the score is stored. */ |
| private static final int SCORE_OFFSET = 15; |
| private static final int SIG_OFFSET = SCORE_OFFSET + 12; |
| |
| public void write(DataOutput out) throws IOException { |
| out.writeByte(CUR_VERSION); // store current version |
| out.writeByte(status); |
| out.writeLong(fetchTime); |
| out.writeByte(retries); |
| out.writeInt(fetchInterval); |
| out.writeFloat(score); |
| out.writeLong(modifiedTime); |
| if (signature == null) { |
| out.writeByte(0); |
| } else { |
| out.writeByte(signature.length); |
| out.write(signature); |
| } |
| if (metaData != null && metaData.size() > 0) { |
| out.writeBoolean(true); |
| metaData.write(out); |
| } else { |
| out.writeBoolean(false); |
| } |
| } |
| |
| /** |
| * Copy the contents of another instance into this instance. |
| * @param that an existing {@link CrawlDatum} |
| */ |
| public void set(CrawlDatum that) { |
| this.status = that.status; |
| this.fetchTime = that.fetchTime; |
| this.retries = that.retries; |
| this.fetchInterval = that.fetchInterval; |
| this.score = that.score; |
| this.modifiedTime = that.modifiedTime; |
| this.signature = that.signature; |
| if (that.metaData != null) { |
| // make a deep copy |
| this.metaData = new org.apache.hadoop.io.MapWritable(that.metaData); |
| } else { |
| this.metaData = null; |
| } |
| } |
| |
| // |
| // compare methods |
| // |
| |
| /** |
| * Sort two {@link CrawlDatum} objects by decreasing score. |
| * @param that an existing {@link CrawlDatum} |
| * @return 1 if any one field (score, status, fetchTime, retries, |
| * fetchInterval or modifiedTime) of the new {@link CrawlDatum} |
| * minus the correspoinding field of the existing {@link CrawlDatum} |
| * is greater than 0, otherwise return -1. |
| */ |
| public int compareTo(CrawlDatum that) { |
| if (that.score != this.score) |
| return (that.score - this.score) > 0 ? 1 : -1; |
| if (that.status != this.status) |
| return this.status - that.status; |
| if (that.fetchTime != this.fetchTime) |
| return (that.fetchTime - this.fetchTime) > 0 ? 1 : -1; |
| if (that.retries != this.retries) |
| return that.retries - this.retries; |
| if (that.fetchInterval != this.fetchInterval) |
| return (that.fetchInterval - this.fetchInterval) > 0 ? 1 : -1; |
| if (that.modifiedTime != this.modifiedTime) |
| return (that.modifiedTime - this.modifiedTime) > 0 ? 1 : -1; |
| return SignatureComparator._compare(this, that); |
| } |
| |
| /** A Comparator optimized for CrawlDatum. */ |
| public static class Comparator extends WritableComparator { |
| public Comparator() { |
| super(CrawlDatum.class); |
| } |
| |
| public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { |
| float score1 = readFloat(b1, s1 + SCORE_OFFSET); |
| float score2 = readFloat(b2, s2 + SCORE_OFFSET); |
| if (score2 != score1) { |
| return (score2 - score1) > 0 ? 1 : -1; |
| } |
| int status1 = b1[s1 + 1]; |
| int status2 = b2[s2 + 1]; |
| if (status2 != status1) |
| return status1 - status2; |
| long fetchTime1 = readLong(b1, s1 + 2); |
| long fetchTime2 = readLong(b2, s2 + 2); |
| if (fetchTime2 != fetchTime1) |
| return (fetchTime2 - fetchTime1) > 0 ? 1 : -1; |
| int retries1 = b1[s1 + 10]; |
| int retries2 = b2[s2 + 10]; |
| if (retries2 != retries1) |
| return retries2 - retries1; |
| int fetchInterval1 = readInt(b1, s1 + 11); |
| int fetchInterval2 = readInt(b2, s2 + 11); |
| if (fetchInterval2 != fetchInterval1) |
| return (fetchInterval2 - fetchInterval1) > 0 ? 1 : -1; |
| long modifiedTime1 = readLong(b1, s1 + SCORE_OFFSET + 4); |
| long modifiedTime2 = readLong(b2, s2 + SCORE_OFFSET + 4); |
| if (modifiedTime2 != modifiedTime1) |
| return (modifiedTime2 - modifiedTime1) > 0 ? 1 : -1; |
| int sigl1 = b1[s1 + SIG_OFFSET]; |
| int sigl2 = b2[s2 + SIG_OFFSET]; |
| return SignatureComparator._compare(b1, SIG_OFFSET, sigl1, b2, |
| SIG_OFFSET, sigl2); |
| } |
| } |
| |
| static { // register this comparator |
| WritableComparator.define(CrawlDatum.class, new Comparator()); |
| } |
| |
| // |
| // basic methods |
| // |
| |
| public String toString() { |
| StringBuilder buf = new StringBuilder(); |
| buf.append("Version: " + CUR_VERSION + "\n"); |
| buf.append("Status: " + getStatus() + " (" + getStatusName(getStatus()) |
| + ")\n"); |
| buf.append("Fetch time: " + new Date(getFetchTime()) + "\n"); |
| buf.append("Modified time: " + new Date(getModifiedTime()) + "\n"); |
| buf.append("Retries since fetch: " + getRetriesSinceFetch() + "\n"); |
| buf.append("Retry interval: " + getFetchInterval() + " seconds (" |
| + (getFetchInterval() / FetchSchedule.SECONDS_PER_DAY) + " days)\n"); |
| buf.append("Score: " + getScore() + "\n"); |
| buf.append("Signature: " + StringUtil.toHexString(getSignature()) + "\n"); |
| buf.append("Metadata: \n "); |
| if (metaData != null) { |
| for (Entry<Writable, Writable> e : metaData.entrySet()) { |
| buf.append("\t"); |
| buf.append(e.getKey()); |
| buf.append("="); |
| buf.append(e.getValue()); |
| buf.append("\n"); |
| } |
| } |
| return buf.toString(); |
| } |
| |
| private boolean metadataEquals(org.apache.hadoop.io.MapWritable otherMetaData) { |
| if (metaData == null || metaData.size() == 0) { |
| return otherMetaData == null || otherMetaData.size() == 0; |
| } |
| if (otherMetaData == null) { |
| // we already know that the current object is not null or empty |
| return false; |
| } |
| HashSet<Entry<Writable, Writable>> set1 = new HashSet<>( |
| metaData.entrySet()); |
| HashSet<Entry<Writable, Writable>> set2 = new HashSet<>( |
| otherMetaData.entrySet()); |
| return set1.equals(set2); |
| } |
| |
| public boolean equals(Object o) { |
| if (!(o instanceof CrawlDatum)) |
| return false; |
| CrawlDatum other = (CrawlDatum) o; |
| boolean res = (this.status == other.status) |
| && (this.fetchTime == other.fetchTime) |
| && (this.modifiedTime == other.modifiedTime) |
| && (this.retries == other.retries) |
| && (this.fetchInterval == other.fetchInterval) |
| && (SignatureComparator._compare(this.signature, other.signature) == 0) |
| && (this.score == other.score); |
| if (!res) |
| return res; |
| return metadataEquals(other.metaData); |
| } |
| |
| public int hashCode() { |
| int res = 0; |
| if (signature != null) { |
| for (int i = 0; i < signature.length / 4; i += 4) { |
| res ^= ((signature[i] << 24) + (signature[i + 1] << 16) |
| + (signature[i + 2] << 8) + signature[i + 3]); |
| } |
| } |
| if (metaData != null) { |
| res ^= metaData.entrySet().hashCode(); |
| } |
| return res ^ status ^ ((int) fetchTime) ^ ((int) modifiedTime) ^ retries |
| ^ fetchInterval ^ Float.floatToIntBits(score); |
| } |
| |
| public Object clone() { |
| try { |
| return super.clone(); |
| } catch (CloneNotSupportedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public boolean evaluate(JexlExpression expr, String url) { |
| if (expr != null && url != null) { |
| // Create a context and add data |
| JexlContext jcontext = new MapContext(); |
| |
| // https://issues.apache.org/jira/browse/NUTCH-2229 |
| jcontext.set("url", url); |
| jcontext.set("status", getStatusName(getStatus())); |
| jcontext.set("fetchTime", (long)(getFetchTime())); |
| jcontext.set("modifiedTime", (long)(getModifiedTime())); |
| jcontext.set("retries", getRetriesSinceFetch()); |
| jcontext.set("interval", Integer.valueOf(getFetchInterval())); |
| jcontext.set("score", getScore()); |
| jcontext.set("signature", StringUtil.toHexString(getSignature())); |
| |
| // Set metadata variables |
| for (Map.Entry<Writable, Writable> entry : getMetaData().entrySet()) { |
| Object value = entry.getValue(); |
| Text tkey = (Text)entry.getKey(); |
| |
| if (value instanceof FloatWritable) { |
| FloatWritable fvalue = (FloatWritable)value; |
| jcontext.set(tkey.toString(), fvalue.get()); |
| } |
| |
| if (value instanceof IntWritable) { |
| IntWritable ivalue = (IntWritable)value; |
| jcontext.set(tkey.toString(), ivalue.get()); |
| } |
| |
| if (value instanceof Text) { |
| Text tvalue = (Text)value; |
| jcontext.set(tkey.toString().replace("-", "_"), tvalue.toString()); |
| } |
| |
| if (value instanceof ProtocolStatus) { |
| ProtocolStatus pvalue = (ProtocolStatus)value; |
| jcontext.set(tkey.toString().replace("-", "_"), pvalue.toString()); |
| } |
| |
| } |
| |
| try { |
| if (Boolean.TRUE.equals(expr.evaluate(jcontext))) { |
| return true; |
| } |
| } catch (Exception e) { |
| // |
| } |
| } |
| |
| return false; |
| } |
| } |