blob: bb4356d2d449388c0f602855697754c99eaf3b10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.client.rfile;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.function.Supplier;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.clientImpl.ScannerOptions;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.IterConfigUtil;
import org.apache.accumulo.core.conf.IterLoad;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.crypto.CryptoServiceFactory.ClassloaderType;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder;
import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
import org.apache.accumulo.core.iterators.IteratorAdapter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.BlockCacheManager;
import org.apache.accumulo.core.spi.cache.CacheEntry;
import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.Text;
import com.google.common.base.Preconditions;
class RFileScanner extends ScannerOptions implements Scanner {
private static final byte[] EMPTY_BYTES = new byte[0];
private static final Range EMPTY_RANGE = new Range();
private Range range;
private BlockCacheManager blockCacheManager = null;
private BlockCache dataCache = null;
private BlockCache indexCache = null;
private Opts opts;
private int batchSize = 1000;
private long readaheadThreshold = 3;
private AccumuloConfiguration tableConf;
private CryptoService cryptoService;
static class Opts {
InputArgs in;
Authorizations auths = Authorizations.EMPTY;
long dataCacheSize;
long indexCacheSize;
boolean useSystemIterators = true;
public HashMap<String,String> tableConfig;
Range bounds;
}
// This cache exist as a hack to avoid leaking decompressors. When the RFile code is not given a
// cache it reads blocks directly from the decompressor. However if a user does not read all data
// for a scan this can leave a BCFile block open and a decompressor allocated.
//
// By providing a cache to the RFile code it forces each block to be read into memory. When a
// block is accessed the entire thing is read into memory immediately allocating and deallocating
// a decompressor. If the user does not read all data, no decompressors are left allocated.
private static class NoopCache implements BlockCache {
@Override
public CacheEntry cacheBlock(String blockName, byte[] buf) {
return null;
}
@Override
public CacheEntry getBlock(String blockName) {
return null;
}
@Override
public long getMaxHeapSize() {
return getMaxSize();
}
@Override
public long getMaxSize() {
return Integer.MAX_VALUE;
}
@Override
public Stats getStats() {
return new BlockCache.Stats() {
@Override
public long hitCount() {
return 0L;
}
@Override
public long requestCount() {
return 0L;
}
};
}
@Override
public CacheEntry getBlock(String blockName, Loader loader) {
Map<String,Loader> depLoaders = loader.getDependencies();
Map<String,byte[]> depData;
switch (depLoaders.size()) {
case 0:
depData = Collections.emptyMap();
break;
case 1:
Entry<String,Loader> entry = depLoaders.entrySet().iterator().next();
depData = Collections.singletonMap(entry.getKey(),
getBlock(entry.getKey(), entry.getValue()).getBuffer());
break;
default:
depData = new HashMap<>();
depLoaders.forEach((k, v) -> depData.put(k, getBlock(k, v).getBuffer()));
}
byte[] data = loader.load(Integer.MAX_VALUE, depData);
return new CacheEntry() {
@Override
public byte[] getBuffer() {
return data;
}
@Override
public <T extends Weighable> T getIndex(Supplier<T> supplier) {
return null;
}
@Override
public void indexWeightChanged() {}
};
}
}
RFileScanner(Opts opts) {
if (!opts.auths.equals(Authorizations.EMPTY) && !opts.useSystemIterators) {
throw new IllegalArgumentException(
"Set authorizations and specified not to use system iterators");
}
this.opts = opts;
if (opts.tableConfig != null && !opts.tableConfig.isEmpty()) {
ConfigurationCopy tableCC = new ConfigurationCopy(DefaultConfiguration.getInstance());
opts.tableConfig.forEach(tableCC::set);
this.tableConf = tableCC;
} else {
this.tableConf = DefaultConfiguration.getInstance();
}
if (opts.indexCacheSize > 0 || opts.dataCacheSize > 0) {
ConfigurationCopy cc = tableConf instanceof ConfigurationCopy ? (ConfigurationCopy) tableConf
: new ConfigurationCopy(tableConf);
try {
blockCacheManager = BlockCacheManagerFactory.getClientInstance(cc);
if (opts.indexCacheSize > 0) {
cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(opts.indexCacheSize));
}
if (opts.dataCacheSize > 0) {
cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(opts.dataCacheSize));
}
blockCacheManager.start(new BlockCacheConfiguration(cc));
this.indexCache = blockCacheManager.getBlockCache(CacheType.INDEX);
this.dataCache = blockCacheManager.getBlockCache(CacheType.DATA);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
if (indexCache == null) {
this.indexCache = new NoopCache();
}
if (this.dataCache == null) {
this.dataCache = new NoopCache();
}
this.cryptoService = CryptoServiceFactory.newInstance(tableConf, ClassloaderType.JAVA);
}
@Override
public synchronized void fetchColumnFamily(Text col) {
Preconditions.checkArgument(opts.useSystemIterators,
"Can only fetch columns when using system iterators");
super.fetchColumnFamily(col);
}
@Override
public synchronized void fetchColumn(Text colFam, Text colQual) {
Preconditions.checkArgument(opts.useSystemIterators,
"Can only fetch columns when using system iterators");
super.fetchColumn(colFam, colQual);
}
@Override
public void fetchColumn(IteratorSetting.Column column) {
Preconditions.checkArgument(opts.useSystemIterators,
"Can only fetch columns when using system iterators");
super.fetchColumn(column);
}
@Override
public void setClassLoaderContext(String classLoaderContext) {
throw new UnsupportedOperationException();
}
@Override
public void setRange(Range range) {
this.range = range;
}
@Override
public Range getRange() {
return range;
}
@Override
public void setBatchSize(int size) {
this.batchSize = size;
}
@Override
public int getBatchSize() {
return batchSize;
}
@Override
public void enableIsolation() {}
@Override
public void disableIsolation() {}
@Override
public synchronized void setReadaheadThreshold(long batches) {
Preconditions.checkArgument(batches > 0);
readaheadThreshold = batches;
}
@Override
public synchronized long getReadaheadThreshold() {
return readaheadThreshold;
}
@Override
public Authorizations getAuthorizations() {
return opts.auths;
}
@Override
public void addScanIterator(IteratorSetting cfg) {
super.addScanIterator(cfg);
}
@Override
public void removeScanIterator(String iteratorName) {
super.removeScanIterator(iteratorName);
}
@Override
public void updateScanIteratorOption(String iteratorName, String key, String value) {
super.updateScanIteratorOption(iteratorName, key, value);
}
private class IterEnv implements IteratorEnvironment {
@Override
public IteratorScope getIteratorScope() {
return IteratorScope.scan;
}
@Override
public boolean isFullMajorCompaction() {
return false;
}
@Override
public Authorizations getAuthorizations() {
return opts.auths;
}
@Override
public boolean isSamplingEnabled() {
return RFileScanner.this.getSamplerConfiguration() != null;
}
@Override
public SamplerConfiguration getSamplerConfiguration() {
return RFileScanner.this.getSamplerConfiguration();
}
}
@Override
public Iterator<Entry<Key,Value>> iterator() {
try {
RFileSource[] sources = opts.in.getSources();
List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length);
CacheProvider cacheProvider = new BasicCacheProvider(indexCache, dataCache);
for (int i = 0; i < sources.length; i++) {
// TODO may have been a bug with multiple files and caching in older version...
FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
CachableBuilder cb =
new CachableBuilder().input(inputStream, "source-" + i).length(sources[i].getLength())
.conf(opts.in.getConf()).cacheProvider(cacheProvider).cryptoService(cryptoService);
readers.add(new RFile.Reader(cb));
}
if (getSamplerConfiguration() != null) {
for (int i = 0; i < readers.size(); i++) {
readers.set(i, ((Reader) readers.get(i))
.getSample(new SamplerConfigurationImpl(getSamplerConfiguration())));
}
}
SortedKeyValueIterator<Key,Value> iterator;
if (opts.bounds != null) {
iterator = new MultiIterator(readers, opts.bounds);
} else {
iterator = new MultiIterator(readers, false);
}
Set<ByteSequence> families = Collections.emptySet();
if (opts.useSystemIterators) {
SortedSet<Column> cols = this.getFetchedColumns();
families = LocalityGroupUtil.families(cols);
iterator = SystemIteratorUtil.setupSystemScanIterators(iterator, cols, getAuthorizations(),
EMPTY_BYTES, tableConf);
}
try {
if (opts.tableConfig != null && !opts.tableConfig.isEmpty()) {
IterLoad il = IterConfigUtil.loadIterConf(IteratorScope.scan, serverSideIteratorList,
serverSideIteratorOptions, tableConf);
iterator = IterConfigUtil.loadIterators(iterator,
il.iterEnv(new IterEnv()).useAccumuloClassLoader(true));
} else {
iterator = IterConfigUtil.loadIterators(iterator,
new IterLoad().iters(serverSideIteratorList).iterOpts(serverSideIteratorOptions)
.iterEnv(new IterEnv()).useAccumuloClassLoader(false));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
iterator.seek(getRange() == null ? EMPTY_RANGE : getRange(), families, !families.isEmpty());
return new IteratorAdapter(iterator);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
try {
for (RFileSource source : opts.in.getSources()) {
source.getInputStream().close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
if (this.blockCacheManager != null) {
this.blockCacheManager.stop();
}
} catch (Exception e1) {
throw new RuntimeException(e1);
}
}
}