blob: c9ebb8ed59faccf510444b7b70c2bcf582d9eb52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.impl.ScannerOptions;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.iterators.IteratorAdapter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
/**
* A scanner that instantiates iterators on the client side instead of on the tablet server. This
* can be useful for testing iterators or in cases where you don't want iterators affecting the
* performance of tablet servers.
*
* <p>
* Suggested usage:
*
* <pre>
* <code>
* Scanner scanner = client.createScanner(tableName, authorizations);
* scanner = new ClientSideIteratorScanner(scanner);
* </code>
* </pre>
*
* <p>
* Iterators added to this scanner will be run in the client JVM. Separate scan iterators can be run
* on the server side and client side by adding iterators to the source scanner (which will execute
* server side) and to the client side scanner (which will execute client side).
*/
public class ClientSideIteratorScanner extends ScannerOptions implements Scanner {
private int size;
private Range range;
private boolean isolated = false;
private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
private SamplerConfiguration iteratorSamplerConfig;
private class ClientSideIteratorEnvironment implements IteratorEnvironment {
private SamplerConfiguration samplerConfig;
private boolean sampleEnabled;
ClientSideIteratorEnvironment(boolean sampleEnabled, SamplerConfiguration samplerConfig) {
this.sampleEnabled = sampleEnabled;
this.samplerConfig = samplerConfig;
}
@Override
public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public AccumuloConfiguration getConfig() {
throw new UnsupportedOperationException();
}
@Override
public IteratorScope getIteratorScope() {
return IteratorScope.scan;
}
@Override
public boolean isFullMajorCompaction() {
return false;
}
@Override
public boolean isUserCompaction() {
return false;
}
@Override
public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
throw new UnsupportedOperationException();
}
@Override
public Authorizations getAuthorizations() {
return ClientSideIteratorScanner.this.getAuthorizations();
}
@Override
public IteratorEnvironment cloneWithSamplingEnabled() {
return new ClientSideIteratorEnvironment(true, samplerConfig);
}
@Override
public boolean isSamplingEnabled() {
return sampleEnabled;
}
@Override
public SamplerConfiguration getSamplerConfiguration() {
return samplerConfig;
}
}
/**
* A class that wraps a Scanner in a SortedKeyValueIterator so that other accumulo iterators can
* use it as a source.
*/
private class ScannerTranslatorImpl implements SortedKeyValueIterator<Key,Value> {
protected Scanner scanner;
Iterator<Entry<Key,Value>> iter;
Entry<Key,Value> top = null;
private SamplerConfiguration samplerConfig;
/**
* Constructs an accumulo iterator from a scanner.
*
* @param scanner
* the scanner to iterate over
*/
public ScannerTranslatorImpl(final Scanner scanner, SamplerConfiguration samplerConfig) {
this.scanner = scanner;
this.samplerConfig = samplerConfig;
}
@Override
public void init(final SortedKeyValueIterator<Key,Value> source,
final Map<String,String> options, final IteratorEnvironment env) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean hasTop() {
return top != null;
}
@Override
public void next() throws IOException {
if (iter.hasNext())
top = iter.next();
else
top = null;
}
@Override
public void seek(final Range range, final Collection<ByteSequence> columnFamilies,
final boolean inclusive) throws IOException {
if (!inclusive && columnFamilies.size() > 0) {
throw new IllegalArgumentException();
}
scanner.setRange(range);
scanner.clearColumns();
for (ByteSequence colf : columnFamilies) {
scanner.fetchColumnFamily(new Text(colf.toArray()));
}
if (samplerConfig == null) {
scanner.clearSamplerConfiguration();
} else {
scanner.setSamplerConfiguration(samplerConfig);
}
iter = scanner.iterator();
next();
}
@Override
public Key getTopKey() {
return top.getKey();
}
@Override
public Value getTopValue() {
return top.getValue();
}
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(final IteratorEnvironment env) {
return new ScannerTranslatorImpl(scanner,
env.isSamplingEnabled() ? env.getSamplerConfiguration() : null);
}
}
private ScannerTranslatorImpl smi;
/**
* Constructs a scanner that can execute client-side iterators.
*
* @param scanner
* the source scanner
*/
public ClientSideIteratorScanner(final Scanner scanner) {
smi = new ScannerTranslatorImpl(scanner, scanner.getSamplerConfiguration());
this.range = scanner.getRange();
this.size = scanner.getBatchSize();
this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
this.batchTimeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
this.readaheadThreshold = scanner.getReadaheadThreshold();
SamplerConfiguration samplerConfig = scanner.getSamplerConfiguration();
if (samplerConfig != null)
setSamplerConfiguration(samplerConfig);
}
/**
* Sets the source Scanner.
*/
public void setSource(final Scanner scanner) {
smi = new ScannerTranslatorImpl(scanner, scanner.getSamplerConfiguration());
}
@Override
public Iterator<Entry<Key,Value>> iterator() {
smi.scanner.setBatchSize(size);
smi.scanner.setTimeout(timeOut, TimeUnit.MILLISECONDS);
smi.scanner.setBatchTimeout(batchTimeOut, TimeUnit.MILLISECONDS);
smi.scanner.setReadaheadThreshold(readaheadThreshold);
if (isolated)
smi.scanner.enableIsolation();
else
smi.scanner.disableIsolation();
smi.samplerConfig = getSamplerConfiguration();
final TreeMap<Integer,IterInfo> tm = new TreeMap<>();
for (IterInfo iterInfo : serverSideIteratorList) {
tm.put(iterInfo.getPriority(), iterInfo);
}
SortedKeyValueIterator<Key,Value> skvi;
try {
skvi = IteratorUtil.loadIterators(smi, tm.values(), serverSideIteratorOptions,
new ClientSideIteratorEnvironment(getSamplerConfiguration() != null,
getIteratorSamplerConfigurationInternal()),
false, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
final Set<ByteSequence> colfs = new TreeSet<>();
for (Column c : this.getFetchedColumns()) {
colfs.add(new ArrayByteSequence(c.getColumnFamily()));
}
try {
skvi.seek(range, colfs, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
return new IteratorAdapter(skvi);
}
@Override
public Authorizations getAuthorizations() {
return smi.scanner.getAuthorizations();
}
@Deprecated
@Override
public void setTimeOut(int timeOut) {
if (timeOut == Integer.MAX_VALUE)
setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
else
setTimeout(timeOut, TimeUnit.SECONDS);
}
@Deprecated
@Override
public int getTimeOut() {
long timeout = getTimeout(TimeUnit.SECONDS);
if (timeout >= Integer.MAX_VALUE)
return Integer.MAX_VALUE;
return (int) timeout;
}
@Override
public void setRange(final Range range) {
this.range = range;
}
@Override
public Range getRange() {
return range;
}
@Override
public void setBatchSize(final int size) {
this.size = size;
}
@Override
public int getBatchSize() {
return size;
}
@Override
public void enableIsolation() {
this.isolated = true;
}
@Override
public void disableIsolation() {
this.isolated = false;
}
@Override
public long getReadaheadThreshold() {
return readaheadThreshold;
}
@Override
public void setReadaheadThreshold(long batches) {
if (0 > batches) {
throw new IllegalArgumentException(
"Number of batches before read-ahead must be non-negative");
}
this.readaheadThreshold = batches;
}
private SamplerConfiguration getIteratorSamplerConfigurationInternal() {
SamplerConfiguration scannerSamplerConfig = getSamplerConfiguration();
if (scannerSamplerConfig != null) {
if (iteratorSamplerConfig != null && !iteratorSamplerConfig.equals(scannerSamplerConfig)) {
throw new IllegalStateException("Scanner and iterator sampler configuration differ");
}
return scannerSamplerConfig;
}
return iteratorSamplerConfig;
}
/**
* This is provided for the case where no sampler configuration is set on the scanner, but there
* is a need to create iterator deep copies that have sampling enabled. If sampler configuration
* is set on the scanner, then this method does not need to be called inorder to create deep
* copies with sampling.
*
* <p>
* Setting this differently than the scanners sampler configuration may cause exceptions.
*
* @since 1.8.0
*/
public void setIteratorSamplerConfiguration(SamplerConfiguration sc) {
requireNonNull(sc);
this.iteratorSamplerConfig = sc;
}
/**
* Clear any iterator sampler configuration.
*
* @since 1.8.0
*/
public void clearIteratorSamplerConfiguration() {
this.iteratorSamplerConfig = null;
}
/**
* @return currently set iterator sampler configuration.
*
* @since 1.8.0
*/
public SamplerConfiguration getIteratorSamplerConfiguration() {
return iteratorSamplerConfig;
}
}