blob: 4f71bbf43ec436d9b14c46cce453d05b5029c02d [file] [log] [blame]
/* $Id$ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.crawler.system;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.agents.interfaces.*;
import org.apache.manifoldcf.crawler.interfaces.*;
import java.util.*;
import java.util.regex.*;
/** This class calculates a document priority given all the required inputs.
* It is not thread safe, but calls classes that are (e.g. QueueTracker).
*/
public class PriorityCalculator implements IPriorityCalculator
{
public static final String _rcsid = "@(#)$Id$";
/** This is a made-up constant, originally based on 100 documents/second, but adjusted downward as a result of experimentation and testing, which is described as "T" below.
*/
private final static double minMsPerFetch = 50.0;
protected final IRepositoryConnection connection;
protected final String[] binNames;
protected final IReprioritizationTracker rt;
protected final double[] binCountScaleFactors;
protected final double[] weightedMinimumDepths;
protected Double cachedValue = null;
/** Constructor. */
public PriorityCalculator(IReprioritizationTracker rt, IRepositoryConnection connection, String[] documentBins)
throws ManifoldCFException
{
this.connection = connection;
this.binNames = documentBins;
this.rt = rt;
// Now, precompute the weightedMinimumDepths etc; we'll need it whether we preload or not.
// For each bin, we will be calculating the bin count scale factor, which is what we multiply the bincount by to adjust for the
// throttling on that bin.
binCountScaleFactors = new double[binNames.length];
weightedMinimumDepths = new double[binNames.length];
// NOTE: We must be sure to adjust the return value by the factor calculated due to performance; a slower throttle rate
// should yield a lower priority. In theory it should be possible to calculate an adjusted priority pretty exactly,
// on the basis that the fetch rates of two distinct bins should grant priorities such that:
//
// (n documents) / (the rate of fetch (docs/millisecond) of the first bin) = milliseconds for the first bin
//
// should equal:
//
// (m documents) / (the rate of fetch of the second bin) = milliseconds for the second bin
//
// ... and then assigning priorities so that after a given number of document priorities are assigned from the first bin, the
// corresponding (*m/n) number of document priorities would get assigned for the second bin.
//
// Suppose the maximum fetch rate for the document is F fetches per millisecond. If the document priority assigned for the Bth
// bin member is -log(1/(1+B)) for a document fetched with no throttling whatsoever,
// then we want the priority to be -log(1/(1+k)) for a throttled bin, where k is chosen so that:
// k = B * ((T + 1/F)/T) = B * (1 + 1/TF)
// ... where T is the time taken to fetch a single document that has no throttling at all.
// For the purposes of this exercise, a value of 100 doc/sec, or T=10ms.
//
// Basically, for F = 0, k should be infinity, and for F = infinity, k should be B.
// First, calculate the document's max fetch rate, in fetches per millisecond. This will be used to adjust the priority, and
// also when resetting the bin counts.
double[] maxFetchRates = calculateMaxFetchRates(binNames,connection);
// Before calculating priority, calculate some factors that will allow us to determine the proper starting value for a bin.
double currentMinimumDepth = rt.getMinimumDepth();
// First thing to do is to reset the bin values based on the current minimum.
for (int i = 0; i < binNames.length; i++)
{
String binName = binNames[i];
// Remember, maxFetchRate is in fetches per ms.
double maxFetchRate = maxFetchRates[i];
// Calculate (and save for later) the scale factor for this bin.
double binCountScaleFactor;
if (maxFetchRate == 0.0)
binCountScaleFactor = Double.POSITIVE_INFINITY;
else
binCountScaleFactor = 1.0 + 1.0 / (minMsPerFetch * maxFetchRate);
binCountScaleFactors[i] = binCountScaleFactor;
weightedMinimumDepths[i] = currentMinimumDepth / binCountScaleFactor;
}
}
/** Log a preload request for this priority value.
*/
public void makePreloadRequest()
{
for (int i = 0; i < binNames.length; i++)
{
String binName = binNames[i];
rt.addPreloadRequest(binName, weightedMinimumDepths[i]);
}
}
/** Calculate a document priority value. Priorities are reversed, and in log space, so that
* zero (0.0) is considered the highest possible priority, and larger priority values are considered lower in actual priority.
*@param binNames are the global bins to which the document belongs.
*@param connection is the connection, from which the throttles may be obtained. More highly throttled connections are given
* less favorable priority.
*@return the priority value, based on recent history. Also updates statistics atomically.
*/
@Override
public double getDocumentPriority()
throws ManifoldCFException
{
if (cachedValue != null)
return cachedValue.doubleValue();
double highestAdjustedCount = 0.0;
// Find the bin with the largest effective count, and use that for the document's priority.
// (This of course assumes that the slowest throttle is the one that wins.)
for (int i = 0; i < binNames.length; i++)
{
String binName = binNames[i];
double binCountScaleFactor = binCountScaleFactors[i];
double weightedMinimumDepth = weightedMinimumDepths[i];
double thisCount = rt.getIncrementBinValue(binName,weightedMinimumDepth);
double adjustedCount;
// Use the scale factor already calculated above to yield a priority that is adjusted for the fetch rate.
if (binCountScaleFactor == Double.POSITIVE_INFINITY)
adjustedCount = Double.POSITIVE_INFINITY;
else
adjustedCount = thisCount * binCountScaleFactor;
if (adjustedCount > highestAdjustedCount)
highestAdjustedCount = adjustedCount;
}
// Calculate the proper log value
double returnValue;
if (highestAdjustedCount == Double.POSITIVE_INFINITY)
returnValue = Double.POSITIVE_INFINITY;
else
returnValue = Math.log(1.0 + highestAdjustedCount);
if (Logging.scheduling.isDebugEnabled())
{
StringBuilder sb = new StringBuilder();
int k = 0;
while (k < binNames.length)
{
sb.append(binNames[k++]).append(" ");
}
Logging.scheduling.debug("Document with bins ["+sb.toString()+"] given priority value "+new Double(returnValue).toString());
}
cachedValue = new Double(returnValue);
return returnValue;
}
/** Calculate the maximum fetch rate for a given set of bins for a given connection.
* This is used to adjust the final priority of a document.
*/
protected static double[] calculateMaxFetchRates(String[] binNames, IRepositoryConnection connection)
{
ThrottleLimits tl = new ThrottleLimits(connection);
return tl.getMaximumRates(binNames);
}
/** This class represents the throttle limits out of the connection specification */
protected static class ThrottleLimits
{
protected List<ThrottleLimitSpec> specs = new ArrayList<ThrottleLimitSpec>();
public ThrottleLimits(IRepositoryConnection connection)
{
String[] throttles = connection.getThrottles();
int i = 0;
while (i < throttles.length)
{
try
{
specs.add(new ThrottleLimitSpec(throttles[i],(double)connection.getThrottleValue(throttles[i])));
}
catch (PatternSyntaxException e)
{
}
i++;
}
}
public double[] getMaximumRates(String[] binNames)
{
double[] rval = new double[binNames.length];
for (int j = 0 ; j < binNames.length ; j++)
{
String binName = binNames[j];
double maxRate = Double.POSITIVE_INFINITY;
for (ThrottleLimitSpec spec : specs)
{
Pattern p = spec.getRegexp();
Matcher m = p.matcher(binName);
if (m.find())
{
double rate = spec.getMaxRate();
// The direction of this inequality reflects the fact that the throttling is conservative when more rules are present.
if (rate < maxRate)
maxRate = rate;
}
}
rval[j] = maxRate;
}
return rval;
}
}
/** This is a class which describes an individual throttle limit, in fetches per millisecond. */
protected static class ThrottleLimitSpec
{
/** Regexp */
protected final Pattern regexp;
/** The fetch limit for all bins matching that regexp, in fetches per millisecond */
protected final double maxRate;
/** Constructor */
public ThrottleLimitSpec(String regexp, double maxRate)
throws PatternSyntaxException
{
this.regexp = Pattern.compile(regexp);
this.maxRate = maxRate;
}
/** Get the regexp. */
public Pattern getRegexp()
{
return regexp;
}
/** Get the max count */
public double getMaxRate()
{
return maxRate;
}
}
}