blob: 1ff0e225b958c069c18d61dcba571e13bac4c972 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.file;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import com.google.common.cache.Cache;
public abstract class FileOperations {
private static final HashSet<String> validExtensions = new HashSet<>(
Arrays.asList(Constants.MAPFILE_EXTENSION, RFile.EXTENSION));
public static Set<String> getValidExtensions() {
return validExtensions;
}
public static String getNewFileExtension(AccumuloConfiguration acuconf) {
return acuconf.get(Property.TABLE_FILE_TYPE);
}
public static FileOperations getInstance() {
return new DispatchingFileFactory();
}
//
// Abstract methods (to be implemented by subclasses)
//
protected abstract long getFileSize(FileOptions options) throws IOException;
protected abstract FileSKVWriter openWriter(FileOptions options) throws IOException;
protected abstract FileSKVIterator openIndex(FileOptions options) throws IOException;
protected abstract FileSKVIterator openScanReader(FileOptions options) throws IOException;
protected abstract FileSKVIterator openReader(FileOptions options) throws IOException;
//
// File operations
//
/**
* Construct an operation object allowing one to create a writer for a file. <br>
* Syntax:
*
* <pre>
* FileSKVWriter writer = fileOperations.newWriterBuilder()
* .forFile(...)
* .withTableConfiguration(...)
* .withRateLimiter(...) // optional
* .withCompression(...) // optional
* .build();
* </pre>
*/
public WriterBuilder newWriterBuilder() {
return new WriterBuilder();
}
/**
* Construct an operation object allowing one to create an index iterator for a file. <br>
* Syntax:
*
* <pre>
* FileSKVIterator iterator = fileOperations.newIndexReaderBuilder()
* .forFile(...)
* .withTableConfiguration(...)
* .withRateLimiter(...) // optional
* .withBlockCache(...) // optional
* .build();
* </pre>
*/
public IndexReaderBuilder newIndexReaderBuilder() {
return new IndexReaderBuilder();
}
/**
* Construct an operation object allowing one to create a "scan" reader for a file. Scan readers
* do not have any optimizations for seeking beyond their initial position. This is useful for
* file operations that only need to scan data within a range and do not need to seek. Therefore
* file metadata such as indexes does not need to be kept in memory while the file is scanned.
* Also seek optimizations like bloom filters do not need to be loaded. <br>
* Syntax:
*
* <pre>
* FileSKVIterator scanner = fileOperations.newScanReaderBuilder()
* .forFile(...)
* .withTableConfiguration(...)
* .overRange(...)
* .withRateLimiter(...) // optional
* .withBlockCache(...) // optional
* .build();
* </pre>
*/
public ScanReaderBuilder newScanReaderBuilder() {
return new ScanReaderBuilder();
}
/**
* Construct an operation object allowing one to create a reader for a file. A reader constructed
* in this manner fully supports seeking, and also enables any optimizations related to seeking
* (e.g. Bloom filters). <br>
* Syntax:
*
* <pre>
* FileSKVIterator scanner = fileOperations.newReaderBuilder()
* .forFile(...)
* .withTableConfiguration(...)
* .withRateLimiter(...) // optional
* .withBlockCache(...) // optional
* .seekToBeginning(...) // optional
* .build();
* </pre>
*/
public ReaderBuilder newReaderBuilder() {
return new ReaderBuilder();
}
public class FileOptions {
// objects used by all
public final AccumuloConfiguration tableConfiguration;
public final String filename;
public final FileSystem fs;
public final Configuration fsConf;
public final RateLimiter rateLimiter;
// writer only objects
public final String compression;
public final FSDataOutputStream outputStream;
public final boolean enableAccumuloStart;
// reader only objects
public final BlockCache dataCache;
public final BlockCache indexCache;
public final Cache<String,Long> fileLenCache;
public final boolean seekToBeginning;
public final CryptoService cryptoService;
// scan reader only objects
public final Range range;
public final Set<ByteSequence> columnFamilies;
public final boolean inclusive;
public FileOptions(AccumuloConfiguration tableConfiguration, String filename, FileSystem fs,
Configuration fsConf, RateLimiter rateLimiter, String compression,
FSDataOutputStream outputStream, boolean enableAccumuloStart, BlockCache dataCache,
BlockCache indexCache, Cache<String,Long> fileLenCache, boolean seekToBeginning,
CryptoService cryptoService, Range range, Set<ByteSequence> columnFamilies,
boolean inclusive) {
this.tableConfiguration = tableConfiguration;
this.filename = filename;
this.fs = fs;
this.fsConf = fsConf;
this.rateLimiter = rateLimiter;
this.compression = compression;
this.outputStream = outputStream;
this.enableAccumuloStart = enableAccumuloStart;
this.dataCache = dataCache;
this.indexCache = indexCache;
this.fileLenCache = fileLenCache;
this.seekToBeginning = seekToBeginning;
this.cryptoService = cryptoService;
this.range = range;
this.columnFamilies = columnFamilies;
this.inclusive = inclusive;
}
public AccumuloConfiguration getTableConfiguration() {
return tableConfiguration;
}
public String getFilename() {
return filename;
}
public FileSystem getFileSystem() {
return fs;
}
public Configuration getConfiguration() {
return fsConf;
}
public RateLimiter getRateLimiter() {
return rateLimiter;
}
public String getCompression() {
return compression;
}
public FSDataOutputStream getOutputStream() {
return outputStream;
}
public boolean isAccumuloStartEnabled() {
return enableAccumuloStart;
}
public BlockCache getDataCache() {
return dataCache;
}
public BlockCache getIndexCache() {
return indexCache;
}
public Cache<String,Long> getFileLenCache() {
return fileLenCache;
}
public boolean isSeekToBeginning() {
return seekToBeginning;
}
public CryptoService getCryptoService() {
return cryptoService;
}
public Range getRange() {
return range;
}
public Set<ByteSequence> getColumnFamilies() {
return columnFamilies;
}
public boolean isRangeInclusive() {
return inclusive;
}
}
/**
* Helper class extended by both writers and readers.
*/
public class FileHelper {
protected AccumuloConfiguration tableConfiguration;
protected String filename;
protected FileSystem fs;
protected Configuration fsConf;
protected RateLimiter rateLimiter;
protected CryptoService cryptoService;
protected FileHelper fs(FileSystem fs) {
Objects.requireNonNull(fs);
this.fs = fs;
return this;
}
protected FileHelper fsConf(Configuration fsConf) {
Objects.requireNonNull(fsConf);
this.fsConf = fsConf;
return this;
}
protected FileHelper filename(String filename) {
Objects.requireNonNull(filename);
this.filename = filename;
return this;
}
protected FileHelper tableConfiguration(AccumuloConfiguration tableConfiguration) {
Objects.requireNonNull(tableConfiguration);
this.tableConfiguration = tableConfiguration;
return this;
}
protected FileHelper rateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
return this;
}
protected FileHelper cryptoService(CryptoService cs) {
this.cryptoService = cs;
return this;
}
protected FileOptions toWriterBuilderOptions(String compression,
FSDataOutputStream outputStream, boolean startEnabled) {
return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, compression,
outputStream, startEnabled, null, null, null, false, cryptoService, null, null, true);
}
protected FileOptions toReaderBuilderOptions(BlockCache dataCache, BlockCache indexCache,
Cache<String,Long> fileLenCache, boolean seekToBeginning) {
return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
false, dataCache, indexCache, fileLenCache, seekToBeginning, cryptoService, null, null,
true);
}
protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> fileLenCache) {
return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
false, null, null, fileLenCache, false, cryptoService, null, null, true);
}
protected FileOptions toScanReaderBuilderOptions(Range range, Set<ByteSequence> columnFamilies,
boolean inclusive) {
return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
false, null, null, null, false, cryptoService, range, columnFamilies, inclusive);
}
}
/**
* Operation object for constructing a writer.
*/
public class WriterBuilder extends FileHelper implements WriterTableConfiguration {
private String compression;
private FSDataOutputStream outputStream;
private boolean enableAccumuloStart = true;
public WriterTableConfiguration forOutputStream(String extension,
FSDataOutputStream outputStream, Configuration fsConf) {
this.outputStream = outputStream;
filename("foo" + extension).fsConf(fsConf);
return this;
}
public WriterTableConfiguration forFile(String filename, FileSystem fs, Configuration fsConf) {
filename(filename).fs(fs).fsConf(fsConf);
return this;
}
@Override
public WriterBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
tableConfiguration(tableConfiguration);
return this;
}
public WriterBuilder withStartDisabled() {
this.enableAccumuloStart = false;
return this;
}
public WriterBuilder withCompression(String compression) {
this.compression = compression;
return this;
}
public WriterBuilder withRateLimiter(RateLimiter rateLimiter) {
rateLimiter(rateLimiter);
return this;
}
public WriterBuilder withCryptoService(CryptoService cs) {
cryptoService(cs);
return this;
}
public FileSKVWriter build() throws IOException {
return openWriter(toWriterBuilderOptions(compression, outputStream, enableAccumuloStart));
}
}
public interface WriterTableConfiguration {
public WriterBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
}
/**
* Options common to all {@code FileOperations} which perform reads.
*/
public class ReaderBuilder extends FileHelper implements ReaderTableConfiguration {
private BlockCache dataCache;
private BlockCache indexCache;
private Cache<String,Long> fileLenCache;
private boolean seekToBeginning = false;
public ReaderTableConfiguration forFile(String filename, FileSystem fs, Configuration fsConf) {
filename(filename).fs(fs).fsConf(fsConf);
return this;
}
@Override
public ReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
tableConfiguration(tableConfiguration);
return this;
}
/**
* (Optional) Set the block cache pair to be used to optimize reads within the constructed
* reader.
*/
public ReaderBuilder withBlockCache(BlockCache dataCache, BlockCache indexCache) {
this.dataCache = dataCache;
this.indexCache = indexCache;
return this;
}
/** (Optional) set the data cache to be used to optimize reads within the constructed reader. */
public ReaderBuilder withDataCache(BlockCache dataCache) {
this.dataCache = dataCache;
return this;
}
/**
* (Optional) set the index cache to be used to optimize reads within the constructed reader.
*/
public ReaderBuilder withIndexCache(BlockCache indexCache) {
this.indexCache = indexCache;
return this;
}
public ReaderBuilder withFileLenCache(Cache<String,Long> fileLenCache) {
this.fileLenCache = fileLenCache;
return this;
}
public ReaderBuilder withCryptoService(CryptoService cs) {
cryptoService(cs);
return this;
}
public ReaderBuilder withRateLimiter(RateLimiter rateLimiter) {
rateLimiter(rateLimiter);
return this;
}
/**
* Seek the constructed iterator to the beginning of its domain before returning. Equivalent to
* {@code seekToBeginning(true)}.
*/
public ReaderBuilder seekToBeginning() {
seekToBeginning(true);
return this;
}
/** If true, seek the constructed iterator to the beginning of its domain before returning. */
public ReaderBuilder seekToBeginning(boolean seekToBeginning) {
this.seekToBeginning = seekToBeginning;
return this;
}
/** Execute the operation, constructing the specified file reader. */
public FileSKVIterator build() throws IOException {
/**
* If the table configuration disallows caching, rewrite the options object to not pass the
* caches.
*/
if (!tableConfiguration.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) {
withIndexCache(null);
}
if (!tableConfiguration.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) {
withDataCache(null);
}
return openReader(
toReaderBuilderOptions(dataCache, indexCache, fileLenCache, seekToBeginning));
}
}
public interface ReaderTableConfiguration {
ReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
}
/**
* Operation object for opening an index.
*/
public class IndexReaderBuilder extends FileHelper implements IndexReaderTableConfiguration {
private Cache<String,Long> fileLenCache = null;
public IndexReaderTableConfiguration forFile(String filename, FileSystem fs,
Configuration fsConf) {
filename(filename).fs(fs).fsConf(fsConf);
return this;
}
@Override
public IndexReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
tableConfiguration(tableConfiguration);
return this;
}
public IndexReaderBuilder withFileLenCache(Cache<String,Long> fileLenCache) {
this.fileLenCache = fileLenCache;
return this;
}
public FileSKVIterator build() throws IOException {
return openIndex(toIndexReaderBuilderOptions(fileLenCache));
}
}
public interface IndexReaderTableConfiguration {
IndexReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
}
/** Operation object for opening a scan reader. */
public class ScanReaderBuilder extends FileHelper implements ScanReaderTableConfiguration {
private Range range;
private Set<ByteSequence> columnFamilies;
private boolean inclusive;
public ScanReaderTableConfiguration forFile(String filename, FileSystem fs,
Configuration fsConf) {
filename(filename).fs(fs).fsConf(fsConf);
return this;
}
@Override
public ScanReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
tableConfiguration(tableConfiguration);
return this;
}
/** Set the range over which the constructed iterator will search. */
public ScanReaderBuilder overRange(Range range, Set<ByteSequence> columnFamilies,
boolean inclusive) {
Objects.requireNonNull(range);
Objects.requireNonNull(columnFamilies);
this.range = range;
this.columnFamilies = columnFamilies;
this.inclusive = inclusive;
return this;
}
/** The range over which this reader should scan. */
public Range getRange() {
return range;
}
/** The column families which this reader should scan. */
public Set<ByteSequence> getColumnFamilies() {
return columnFamilies;
}
/** Execute the operation, constructing a scan iterator. */
public FileSKVIterator build() throws IOException {
return openScanReader(toScanReaderBuilderOptions(range, columnFamilies, inclusive));
}
}
public interface ScanReaderTableConfiguration {
ScanReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
}
}