blob: c943a8e18e5d57c9b7913457f8251cbb8c96712e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.iterators;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.IteratorSetting.Column;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iteratorsImpl.conf.ColumnSet;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
/**
* A SortedKeyValueIterator that combines the Values for different versions (timestamp) of a Key
* within a row into a single Value. Combiner will replace one or more versions of a Key and their
* Values with the most recent Key and a Value which is the result of the reduce method. An
* {@link Column} which only specifies a column family will combine all Keys in that column family
* individually. Similarly, a {@link Column} which specifies a column family and column qualifier
* will combine all Keys in column family and qualifier individually. Combination is only ever
* performed on multiple versions and not across column qualifiers or column visibilities.
*
* <p>
* Implementations must provide a reduce method:
* {@code public Value reduce(Key key, Iterator<Value> iter)}.
*
* <p>
* This reduce method will be passed the most recent Key and an iterator over the Values for all
* non-deleted versions of that Key. A combiner will not combine keys that differ by more than the
* timestamp.
*
* <p>
* This class and its implementations do not automatically filter out unwanted columns from those
* being combined, thus it is generally recommended to use a {@link Combiner} implementation with
* the {@link ScannerBase#fetchColumnFamily(Text)} or {@link ScannerBase#fetchColumn(Text, Text)}
* methods.
*
* <p>
* WARNING : Using deletes with Combiners may not work as intended. See
* {@link #setReduceOnFullCompactionOnly(IteratorSetting, boolean)}
*/
public abstract class Combiner extends WrappingIterator implements OptionDescriber {
static final Logger sawDeleteLog =
LoggerFactory.getLogger(Combiner.class.getName() + ".SawDelete");
protected static final String COLUMNS_OPTION = "columns";
protected static final String ALL_OPTION = "all";
protected static final String REDUCE_ON_FULL_COMPACTION_ONLY_OPTION =
"reduceOnFullCompactionOnly";
private boolean isMajorCompaction;
private boolean reduceOnFullCompactionOnly;
/**
* A Java Iterator that iterates over the Values for a given Key from a source
* SortedKeyValueIterator.
*/
public static class ValueIterator implements Iterator<Value> {
Key topKey;
SortedKeyValueIterator<Key,Value> source;
boolean hasNext;
/**
* Constructs an iterator over Values whose Keys are versions of the current topKey of the
* source SortedKeyValueIterator.
*
* @param source
* The {@code SortedKeyValueIterator<Key,Value>} from which to read data.
*/
public ValueIterator(SortedKeyValueIterator<Key,Value> source) {
this.source = source;
topKey = new Key(source.getTopKey());
hasNext = _hasNext();
}
private boolean _hasNext() {
return source.hasTop() && !source.getTopKey().isDeleted()
&& topKey.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS);
}
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public Value next() {
if (!hasNext)
throw new NoSuchElementException();
Value topValue = new Value(source.getTopValue());
try {
source.next();
hasNext = _hasNext();
} catch (IOException e) {
throw new RuntimeException(e);
}
return topValue;
}
/**
* This method is unsupported in this iterator.
*
* @throws UnsupportedOperationException
* when called
*/
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
Key topKey;
Value topValue;
@Override
public Key getTopKey() {
if (topKey == null)
return super.getTopKey();
return topKey;
}
@Override
public Value getTopValue() {
if (topKey == null)
return super.getTopValue();
return topValue;
}
@Override
public boolean hasTop() {
return topKey != null || super.hasTop();
}
@Override
public void next() throws IOException {
if (topKey != null) {
topKey = null;
topValue = null;
} else {
super.next();
}
findTop();
}
private Key workKey = new Key();
@VisibleForTesting
static final Cache<String,Boolean> loggedMsgCache =
CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).maximumSize(10000).build();
private void sawDelete() {
if (isMajorCompaction && !reduceOnFullCompactionOnly) {
try {
loggedMsgCache.get(this.getClass().getName(), () -> {
sawDeleteLog.error(
"Combiner of type {} saw a delete during a"
+ " partial compaction. This could cause undesired results. See"
+ " ACCUMULO-2232. Will not log subsequent occurences for at least" + " 1 hour.",
Combiner.this.getClass().getSimpleName());
// the value is not used and does not matter
return Boolean.TRUE;
});
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
/**
* Sets the topKey and topValue based on the top key of the source. If the column of the source
* top key is in the set of combiners, topKey will be the top key of the source and topValue will
* be the result of the reduce method. Otherwise, topKey and topValue will be unchanged. (They are
* always set to null before this method is called.)
*/
private void findTop() throws IOException {
// check if aggregation is needed
if (super.hasTop()) {
workKey.set(super.getTopKey());
if (combineAllColumns || combiners.contains(workKey)) {
if (workKey.isDeleted()) {
sawDelete();
return;
}
topKey = workKey;
Iterator<Value> viter = new ValueIterator(getSource());
topValue = reduce(topKey, viter);
while (viter.hasNext())
viter.next();
}
}
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
// do not want to seek to the middle of a value that should be combined...
Range seekRange = IteratorUtil.maximizeStartKeyTimeStamp(range);
super.seek(seekRange, columnFamilies, inclusive);
findTop();
if (range.getStartKey() != null) {
while (hasTop()
&& getTopKey().equals(range.getStartKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)
&& getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
// the value has a more recent time stamp, so pass it up
// log.debug("skipping "+getTopKey());
next();
}
while (hasTop() && range.beforeStartKey(getTopKey())) {
next();
}
}
}
/**
* Reduces a list of Values into a single Value.
*
* @param key
* The most recent version of the Key being reduced.
*
* @param iter
* An iterator over the Values for different versions of the key.
*
* @return The combined Value.
*/
public abstract Value reduce(Key key, Iterator<Value> iter);
private ColumnSet combiners;
private boolean combineAllColumns;
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) throws IOException {
super.init(source, options, env);
combineAllColumns = false;
if (options.containsKey(ALL_OPTION)) {
combineAllColumns = Boolean.parseBoolean(options.get(ALL_OPTION));
}
if (!combineAllColumns) {
if (!options.containsKey(COLUMNS_OPTION))
throw new IllegalArgumentException("Must specify " + COLUMNS_OPTION + " option");
String encodedColumns = options.get(COLUMNS_OPTION);
if (encodedColumns.isEmpty())
throw new IllegalArgumentException("The " + COLUMNS_OPTION + " must not be empty");
combiners = new ColumnSet(Lists.newArrayList(Splitter.on(",").split(encodedColumns)));
}
isMajorCompaction = env.getIteratorScope() == IteratorScope.majc;
String rofco = options.get(REDUCE_ON_FULL_COMPACTION_ONLY_OPTION);
if (rofco != null) {
reduceOnFullCompactionOnly = Boolean.parseBoolean(rofco);
} else {
reduceOnFullCompactionOnly = false;
}
if (reduceOnFullCompactionOnly && isMajorCompaction && !env.isFullMajorCompaction()) {
// adjust configuration so that no columns are combined for a partial major compaction
combineAllColumns = false;
combiners = new ColumnSet();
}
}
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
// TODO test
Combiner newInstance;
try {
newInstance = this.getClass().getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
newInstance.setSource(getSource().deepCopy(env));
newInstance.combiners = combiners;
newInstance.combineAllColumns = combineAllColumns;
newInstance.isMajorCompaction = isMajorCompaction;
newInstance.reduceOnFullCompactionOnly = reduceOnFullCompactionOnly;
return newInstance;
}
@Override
public IteratorOptions describeOptions() {
IteratorOptions io = new IteratorOptions("comb",
"Combiners apply reduce functions to multiple versions of values with otherwise equal keys",
null, null);
io.addNamedOption(ALL_OPTION,
"set to true to apply Combiner to every column, otherwise leave blank. if true, "
+ COLUMNS_OPTION + " option will be ignored.");
io.addNamedOption(COLUMNS_OPTION,
"<col fam>[:<col qual>]{,<col fam>[:<col qual>]} escape non-alphanum chars using %<hex>.");
io.addNamedOption(REDUCE_ON_FULL_COMPACTION_ONLY_OPTION,
"If true, only reduce on full major compactions. Defaults to false. ");
return io;
}
@Override
public boolean validateOptions(Map<String,String> options) {
if (options.containsKey(ALL_OPTION)) {
try {
combineAllColumns = Boolean.parseBoolean(options.get(ALL_OPTION));
} catch (Exception e) {
throw new IllegalArgumentException(
"bad boolean " + ALL_OPTION + ":" + options.get(ALL_OPTION));
}
if (combineAllColumns)
return true;
}
if (!options.containsKey(COLUMNS_OPTION))
throw new IllegalArgumentException(
"options must include " + ALL_OPTION + " or " + COLUMNS_OPTION);
String encodedColumns = options.get(COLUMNS_OPTION);
if (encodedColumns.isEmpty())
throw new IllegalArgumentException("empty columns specified in option " + COLUMNS_OPTION);
for (String columns : Splitter.on(",").split(encodedColumns)) {
if (!ColumnSet.isValidEncoding(columns))
throw new IllegalArgumentException("invalid column encoding " + encodedColumns);
}
return true;
}
/**
* A convenience method to set which columns a combiner should be applied to. For each column
* specified, all versions of a Key which match that @{link IteratorSetting.Column} will be
* combined individually in each row. This method is likely to be used in conjunction with
* {@link ScannerBase#fetchColumnFamily(Text)} or {@link ScannerBase#fetchColumn(Text,Text)}.
*
* @param is
* iterator settings object to configure
* @param columns
* a list of columns to encode as the value for the combiner column configuration
*/
public static void setColumns(IteratorSetting is, List<IteratorSetting.Column> columns) {
String sep = "";
StringBuilder sb = new StringBuilder();
for (Column col : columns) {
sb.append(sep);
sep = ",";
sb.append(ColumnSet.encodeColumns(col.getFirst(), col.getSecond()));
}
is.addOption(COLUMNS_OPTION, sb.toString());
}
/**
* A convenience method to set the "all columns" option on a Combiner. This will combine all
* columns individually within each row.
*
* @param is
* iterator settings object to configure
* @param combineAllColumns
* if true, the columns option is ignored and the Combiner will be applied to all columns
*/
public static void setCombineAllColumns(IteratorSetting is, boolean combineAllColumns) {
is.addOption(ALL_OPTION, Boolean.toString(combineAllColumns));
}
/**
* Combiners may not work correctly with deletes. Sometimes when Accumulo compacts the files in a
* tablet, it only compacts a subset of the files. If a delete marker exists in one of the files
* that is not being compacted, then data that should be deleted may be combined. See
* <a href="https://issues.apache.org/jira/browse/ACCUMULO-2232">ACCUMULO-2232</a> for more
* information. For correctness deletes should not be used with columns that are combined OR this
* option should be set to true.
*
* <p>
* When this method is set to true all data is passed through during partial major compactions and
* no reducing is done. Reducing is only done during scan and full major compactions, when deletes
* can be correctly handled. Only reducing on full major compactions may have negative performance
* implications, leaving lots of work to be done at scan time.
*
* <p>
* When this method is set to false, combiners will log an error if a delete is seen during any
* compaction. This can be suppressed by adjusting logging configuration. Errors will not be
* logged more than once an hour per Combiner, regardless of how many deletes are seen.
*
* <p>
* This method was added in 1.6.4 and 1.7.1. If you want your code to work in earlier versions of
* 1.6 and 1.7 then do not call this method. If not set this property defaults to false in order
* to maintain compatibility.
*
* @since 1.6.5 1.7.1 1.8.0
*/
public static void setReduceOnFullCompactionOnly(IteratorSetting is,
boolean reduceOnFullCompactionOnly) {
is.addOption(REDUCE_ON_FULL_COMPACTION_ONLY_OPTION,
Boolean.toString(reduceOnFullCompactionOnly));
}
}