blob: f51405935cadd2125bc7fd7b5587f6189ad28455 [file] [log] [blame]
package org.apache.accumulo.testing.performance.tests;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.function.BiPredicate;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.YieldCallback;
import org.slf4j.LoggerFactory;
public abstract class YieldingFilter implements SortedKeyValueIterator<Key,Value> {
private SortedKeyValueIterator<Key,Value> source;
private BiPredicate<Key,Value> predicate;
private YieldCallback<Key> yield;
private long yieldTime;
protected abstract BiPredicate<Key,Value> createPredicate();
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) throws IOException {
this.source = source;
this.predicate = createPredicate();
this.yieldTime = Long.parseLong(options.getOrDefault("yieldTimeMS", "100"));
}
protected void findTop() throws IOException {
long start = System.nanoTime();
while (source.hasTop() && !source.getTopKey().isDeleted()
&& !predicate.test(source.getTopKey(), source.getTopValue())) {
long duration = (System.nanoTime() - start) / 1000000;
if (duration > yieldTime) {
yield.yield(source.getTopKey());
break;
}
source.next();
}
}
@Override
public boolean hasTop() {
return !yield.hasYielded() && source.hasTop();
}
@Override
public void next() throws IOException {
source.next();
findTop();
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
source.seek(range, columnFamilies, inclusive);
findTop();
}
@Override
public Key getTopKey() {
return source.getTopKey();
}
@Override
public Value getTopValue() {
return source.getTopValue();
}
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}
@Override
public void enableYielding(YieldCallback<Key> callback) {
this.yield = callback;
}
}