blob: 1d5728bee4ffd15a6f56b4a2570cd4078f05c94c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.iterators;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
import org.apache.accumulo.core.iterators.system.DeletingIterator;
import org.apache.accumulo.core.iterators.system.SynchronizedIterator;
import org.apache.accumulo.core.iterators.system.VisibilityFilter;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IteratorUtil {
private static final Logger log = LoggerFactory.getLogger(IteratorUtil.class);
/**
* Even though this type is not in a public API package, its used by methods in the public API. Therefore it should be treated as public API and should not
* reference any non public API types. Also this type can not be moved.
*/
public static enum IteratorScope {
majc, minc, scan;
}
public static class IterInfoComparator implements Comparator<IterInfo>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(IterInfo o1, IterInfo o2) {
return (o1.priority < o2.priority ? -1 : (o1.priority == o2.priority ? 0 : 1));
}
}
/**
* Fetch the correct configuration key prefix for the given scope. Throws an IllegalArgumentException if no property exists for the given scope.
*/
static Property getProperty(IteratorScope scope) {
requireNonNull(scope);
switch (scope) {
case scan:
return Property.TABLE_ITERATOR_SCAN_PREFIX;
case minc:
return Property.TABLE_ITERATOR_MINC_PREFIX;
case majc:
return Property.TABLE_ITERATOR_MAJC_PREFIX;
default:
throw new IllegalStateException("Could not find configuration property for IteratorScope");
}
}
/**
* Generate the initial (default) properties for a table
*
* @param limitVersion
* include a VersioningIterator at priority 20 that retains a single version of a given K/V pair.
* @return A map of Table properties
*/
public static Map<String,String> generateInitialTableProperties(boolean limitVersion) {
TreeMap<String,String> props = new TreeMap<>();
if (limitVersion) {
for (IteratorScope iterScope : IteratorScope.values()) {
props.put(Property.TABLE_ITERATOR_PREFIX + iterScope.name() + ".vers", "20," + VersioningIterator.class.getName());
props.put(Property.TABLE_ITERATOR_PREFIX + iterScope.name() + ".vers.opt.maxVersions", "1");
}
}
props.put(Property.TABLE_CONSTRAINT_PREFIX.toString() + "1", DefaultKeySizeConstraint.class.getName());
return props;
}
public static void mergeIteratorConfig(List<IterInfo> destList, Map<String,Map<String,String>> destOpts, List<IterInfo> tableIters,
Map<String,Map<String,String>> tableOpts, List<IterInfo> ssi, Map<String,Map<String,String>> ssio) {
destList.addAll(tableIters);
destList.addAll(ssi);
Collections.sort(destList, new IterInfoComparator());
Set<Entry<String,Map<String,String>>> es = tableOpts.entrySet();
for (Entry<String,Map<String,String>> entry : es) {
if (entry.getValue() == null) {
destOpts.put(entry.getKey(), null);
} else {
destOpts.put(entry.getKey(), new HashMap<>(entry.getValue()));
}
}
IteratorUtil.mergeOptions(ssio, destOpts);
}
public static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) {
final Property scopeProperty = getProperty(scope);
final String scopePropertyKey = scopeProperty.getKey();
for (Entry<String,String> entry : conf.getAllPropertiesWithPrefix(scopeProperty).entrySet()) {
String suffix = entry.getKey().substring(scopePropertyKey.length());
String suffixSplit[] = suffix.split("\\.", 3);
if (suffixSplit.length == 1) {
String sa[] = entry.getValue().split(",");
int prio = Integer.parseInt(sa[0]);
String className = sa[1];
iters.add(new IterInfo(prio, className, suffixSplit[0]));
} else if (suffixSplit.length == 3 && suffixSplit[1].equals("opt")) {
String iterName = suffixSplit[0];
String optName = suffixSplit[2];
Map<String,String> options = allOptions.get(iterName);
if (options == null) {
options = new HashMap<>();
allOptions.put(iterName, options);
}
options.put(optName, entry.getValue());
} else {
log.warn("Unrecognizable option: " + entry.getKey());
}
}
Collections.sort(iters, new IterInfoComparator());
}
public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, IteratorEnvironment env) throws IOException {
List<IterInfo> emptyList = Collections.emptyList();
Map<String,Map<String,String>> emptyMap = Collections.emptyMap();
return loadIterators(scope, source, extent, conf, emptyList, emptyMap, env);
}
public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IteratorSetting> iterators, IteratorEnvironment env)
throws IOException {
List<IterInfo> ssiList = new ArrayList<>();
Map<String,Map<String,String>> ssio = new HashMap<>();
for (IteratorSetting is : iterators) {
ssiList.add(new IterInfo(is.getPriority(), is.getIteratorClass(), is.getName()));
ssio.put(is.getName(), is.getOptions());
}
return loadIterators(scope, source, extent, conf, ssiList, ssio, env, true);
}
public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
IteratorEnvironment env) throws IOException {
return loadIterators(scope, source, extent, conf, ssiList, ssio, env, true);
}
private static void parseIteratorConfiguration(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> ssio,
Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) {
parseIterConf(scope, iters, allOptions, conf);
mergeOptions(ssio, allOptions);
}
private static void mergeOptions(Map<String,Map<String,String>> ssio, Map<String,Map<String,String>> allOptions) {
for (Entry<String,Map<String,String>> entry : ssio.entrySet()) {
if (entry.getValue() == null)
continue;
Map<String,String> options = allOptions.get(entry.getKey());
if (options == null) {
allOptions.put(entry.getKey(), entry.getValue());
} else {
options.putAll(entry.getValue());
}
}
}
public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
List<IterInfo> iters = new ArrayList<>(ssiList);
Map<String,Map<String,String>> allOptions = new HashMap<>();
parseIteratorConfiguration(scope, iters, ssio, allOptions, conf);
return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader, conf.get(Property.TABLE_CLASSPATH));
}
public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
IteratorEnvironment env, boolean useAccumuloClassLoader, String classLoaderContext) throws IOException {
List<IterInfo> iters = new ArrayList<>(ssiList);
Map<String,Map<String,String>> allOptions = new HashMap<>();
parseIteratorConfiguration(scope, iters, ssio, allOptions, conf);
return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader, classLoaderContext);
}
public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source,
Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env, boolean useAccumuloClassLoader, String context)
throws IOException {
return loadIterators(source, iters, iterOpts, env, useAccumuloClassLoader, context, null);
}
public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source,
Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env, boolean useAccumuloClassLoader, String context,
Map<String,Class<? extends SortedKeyValueIterator<K,V>>> classCache) throws IOException {
// wrap the source in a SynchronizedIterator in case any of the additional configured iterators want to use threading
SortedKeyValueIterator<K,V> prev = new SynchronizedIterator<>(source);
try {
for (IterInfo iterInfo : iters) {
Class<? extends SortedKeyValueIterator<K,V>> clazz = null;
log.trace("Attempting to load iterator class {}", iterInfo.className);
if (classCache != null) {
clazz = classCache.get(iterInfo.className);
if (clazz == null) {
clazz = loadClass(useAccumuloClassLoader, context, iterInfo);
classCache.put(iterInfo.className, clazz);
}
} else {
clazz = loadClass(useAccumuloClassLoader, context, iterInfo);
}
SortedKeyValueIterator<K,V> skvi = clazz.newInstance();
Map<String,String> options = iterOpts.get(iterInfo.iterName);
if (options == null)
options = Collections.emptyMap();
skvi.init(prev, options, env);
prev = skvi;
}
} catch (ClassNotFoundException e) {
log.error(e.toString());
throw new RuntimeException(e);
} catch (InstantiationException e) {
log.error(e.toString());
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
log.error(e.toString());
throw new RuntimeException(e);
}
return prev;
}
@SuppressWarnings("unchecked")
private static <K extends WritableComparable<?>,V extends Writable> Class<? extends SortedKeyValueIterator<K,V>> loadClass(boolean useAccumuloClassLoader,
String context, IterInfo iterInfo) throws ClassNotFoundException, IOException {
Class<? extends SortedKeyValueIterator<K,V>> clazz;
if (useAccumuloClassLoader) {
if (context != null && !context.equals("")) {
clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloVFSClassLoader.getContextManager().loadClass(context, iterInfo.className,
SortedKeyValueIterator.class);
log.trace("Iterator class {} loaded from context {}, classloader: {}", iterInfo.className, context, clazz.getClassLoader());
} else {
clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloVFSClassLoader.loadClass(iterInfo.className, SortedKeyValueIterator.class);
log.trace("Iterator class {} loaded from AccumuloVFSClassLoader: {}", iterInfo.className, clazz.getClassLoader());
}
} else {
clazz = (Class<? extends SortedKeyValueIterator<K,V>>) Class.forName(iterInfo.className).asSubclass(SortedKeyValueIterator.class);
log.trace("Iterator class {} loaded from classpath", iterInfo.className);
}
return clazz;
}
public static Range maximizeStartKeyTimeStamp(Range range) {
Range seekRange = range;
if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
Key seekKey = new Key(seekRange.getStartKey());
seekKey.setTimestamp(Long.MAX_VALUE);
seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
}
return seekRange;
}
public static Range minimizeEndKeyTimeStamp(Range range) {
Range seekRange = range;
if (range.getEndKey() != null && range.getEndKey().getTimestamp() != Long.MIN_VALUE) {
Key seekKey = new Key(seekRange.getEndKey());
seekKey.setTimestamp(Long.MIN_VALUE);
seekRange = new Range(range.getStartKey(), range.isStartKeyInclusive(), seekKey, true);
}
return seekRange;
}
public static TIteratorSetting toTIteratorSetting(IteratorSetting is) {
return new TIteratorSetting(is.getPriority(), is.getName(), is.getIteratorClass(), is.getOptions());
}
public static IteratorSetting toIteratorSetting(TIteratorSetting tis) {
return new IteratorSetting(tis.getPriority(), tis.getName(), tis.getIteratorClass(), tis.getProperties());
}
public static IteratorConfig toIteratorConfig(List<IteratorSetting> iterators) {
ArrayList<TIteratorSetting> tisList = new ArrayList<>();
for (IteratorSetting iteratorSetting : iterators) {
tisList.add(toTIteratorSetting(iteratorSetting));
}
return new IteratorConfig(tisList);
}
public static List<IteratorSetting> toIteratorSettings(IteratorConfig ic) {
List<IteratorSetting> ret = new ArrayList<>();
for (TIteratorSetting tIteratorSetting : ic.getIterators()) {
ret.add(toIteratorSetting(tIteratorSetting));
}
return ret;
}
public static byte[] encodeIteratorSettings(IteratorConfig iterators) {
TSerializer tser = new TSerializer(new TBinaryProtocol.Factory());
try {
return tser.serialize(iterators);
} catch (TException e) {
throw new RuntimeException(e);
}
}
public static byte[] encodeIteratorSettings(List<IteratorSetting> iterators) {
return encodeIteratorSettings(toIteratorConfig(iterators));
}
public static List<IteratorSetting> decodeIteratorSettings(byte[] enc) {
TDeserializer tdser = new TDeserializer(new TBinaryProtocol.Factory());
IteratorConfig ic = new IteratorConfig();
try {
tdser.deserialize(ic, enc);
} catch (TException e) {
throw new RuntimeException(e);
}
return toIteratorSettings(ic);
}
public static SortedKeyValueIterator<Key,Value> setupSystemScanIterators(SortedKeyValueIterator<Key,Value> source, Set<Column> cols, Authorizations auths,
byte[] defaultVisibility) throws IOException {
DeletingIterator delIter = new DeletingIterator(source, false);
ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, cols);
return new VisibilityFilter(colFilter, auths, defaultVisibility);
}
}