blob: e29507332dd7e49b0c1ee3a17f1fff810a2b1375 [file] [log] [blame]
package org.apache.blur.mapreduce.lib.update;
import org.apache.blur.utils.ShardUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
public class InputSplitPruneUtil {
private static final String BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.update.from.new.data.count";
private static final String BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.from.new.data.count.";
private static final String BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX = "blur.lookup.rowid.from.index.count.";
private static final String BLUR_LOOKUP_TABLE = "blur.lookup.table";
private static final String BLUR_LOOKUP_RATIO_PER_SHARD = "blur.lookup.ratio.per.shard";
private static final String BLUR_LOOKUP_MAX_TOTAL_PER_SHARD = "blur.lookup.max.total.per.shard";
private static final double DEFAULT_LOOKUP_RATIO = 0.5;
private static final long DEFAULT_LOOKUP_MAX_TOTAL = Long.MAX_VALUE;
public static boolean shouldLookupExecuteOnShard(Configuration configuration, String table, int shard) {
double lookupRatio = getLookupRatio(configuration);
long maxLookupCount = getMaxLookupCount(configuration);
long rowIdFromNewDataCount = getBlurLookupRowIdFromNewDataCount(configuration, table, shard);
long rowIdUpdateFromNewDataCount = getBlurLookupRowIdUpdateFromNewDataCount(configuration, table, shard);
long rowIdFromIndexCount = getBlurLookupRowIdFromIndexCount(configuration, table, shard);
return shouldLookupRun(rowIdFromIndexCount, rowIdFromNewDataCount, rowIdUpdateFromNewDataCount, lookupRatio,
maxLookupCount);
}
private static boolean shouldLookupRun(long rowIdFromIndexCount, long rowIdFromNewDataCount,
long rowIdUpdateFromNewDataCount, double lookupRatio, long maxLookupCount) {
if (rowIdUpdateFromNewDataCount > maxLookupCount) {
return false;
}
double d = (double) rowIdUpdateFromNewDataCount / (double) rowIdFromIndexCount;
if (d <= lookupRatio) {
return true;
}
return false;
}
public static double getLookupRatio(Configuration configuration) {
return configuration.getDouble(BLUR_LOOKUP_RATIO_PER_SHARD, DEFAULT_LOOKUP_RATIO);
}
private static long getMaxLookupCount(Configuration configuration) {
return configuration.getLong(BLUR_LOOKUP_MAX_TOTAL_PER_SHARD, DEFAULT_LOOKUP_MAX_TOTAL);
}
public static void setTable(Job job, String table) {
setTable(job.getConfiguration(), table);
}
public static void setTable(Configuration configuration, String table) {
configuration.set(BLUR_LOOKUP_TABLE, table);
}
public static String getTable(Configuration configuration) {
return configuration.get(BLUR_LOOKUP_TABLE);
}
public static String getBlurLookupRowIdFromIndexCountName(String table) {
return BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX + table;
}
public static String getBlurLookupRowIdFromNewDataCountName(String table) {
return BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX + table;
}
public static String getBlurLookupRowIdUpdateFromNewDataCountName(String table) {
return BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX + table;
}
public static long getBlurLookupRowIdUpdateFromNewDataCount(Configuration configuration, String table, int shard) {
String[] strings = configuration.getStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table));
return getCount(strings, shard);
}
public static long getBlurLookupRowIdFromNewDataCount(Configuration configuration, String table, int shard) {
String[] strings = configuration.getStrings(getBlurLookupRowIdFromNewDataCountName(table));
return getCount(strings, shard);
}
public static long getBlurLookupRowIdFromIndexCount(Configuration configuration, String table, int shard) {
String[] strings = configuration.getStrings(getBlurLookupRowIdFromIndexCountName(table));
return getCount(strings, shard);
}
public static void setBlurLookupRowIdFromNewDataCounts(Job job, String table, long[] counts) {
setBlurLookupRowIdFromNewDataCounts(job.getConfiguration(), table, counts);
}
public static void setBlurLookupRowIdFromNewDataCounts(Configuration configuration, String table, long[] counts) {
configuration.setStrings(getBlurLookupRowIdFromNewDataCountName(table), toStrings(counts));
}
public static void setBlurLookupRowIdUpdateFromNewDataCounts(Job job, String table, long[] counts) {
setBlurLookupRowIdUpdateFromNewDataCounts(job.getConfiguration(), table, counts);
}
public static void setBlurLookupRowIdUpdateFromNewDataCounts(Configuration configuration, String table, long[] counts) {
configuration.setStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table), toStrings(counts));
}
public static void setBlurLookupRowIdFromIndexCounts(Job job, String table, long[] counts) {
setBlurLookupRowIdFromIndexCounts(job.getConfiguration(), table, counts);
}
public static void setBlurLookupRowIdFromIndexCounts(Configuration configuration, String table, long[] counts) {
configuration.setStrings(getBlurLookupRowIdFromIndexCountName(table), toStrings(counts));
}
public static long getCount(String[] strings, int shard) {
return Long.parseLong(strings[shard]);
}
public static int getShardFromDirectoryPath(Path path) {
return ShardUtil.getShardIndex(path.getName());
}
public static String[] toStrings(long[] counts) {
if (counts == null) {
return null;
}
String[] strs = new String[counts.length];
for (int i = 0; i < counts.length; i++) {
strs[i] = Long.toString(counts[i]);
}
return strs;
}
}