| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.core.file.blockfile.impl; |
| |
| import java.io.Closeable; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.UncheckedIOException; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Supplier; |
| |
| import org.apache.accumulo.core.file.rfile.bcfile.BCFile; |
| import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader; |
| import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist; |
| import org.apache.accumulo.core.file.streams.RateLimitedInputStream; |
| import org.apache.accumulo.core.spi.cache.BlockCache; |
| import org.apache.accumulo.core.spi.cache.BlockCache.Loader; |
| import org.apache.accumulo.core.spi.cache.CacheEntry; |
| import org.apache.accumulo.core.spi.cache.CacheEntry.Weighable; |
| import org.apache.accumulo.core.spi.crypto.CryptoService; |
| import org.apache.accumulo.core.util.ratelimit.RateLimiter; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.Seekable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.cache.Cache; |
| |
| /** |
| * This is a wrapper class for BCFile that includes a cache for independent caches for datablocks |
| * and metadatablocks |
| */ |
| public class CachableBlockFile { |
| |
| private CachableBlockFile() {} |
| |
| private static final Logger log = LoggerFactory.getLogger(CachableBlockFile.class); |
| |
| private static interface IoeSupplier<T> { |
| T get() throws IOException; |
| } |
| |
| public static String pathToCacheId(Path p) { |
| return p.toString(); |
| } |
| |
| public static class CachableBuilder { |
| String cacheId = null; |
| IoeSupplier<InputStream> inputSupplier = null; |
| IoeSupplier<Long> lengthSupplier = null; |
| Cache<String,Long> fileLenCache = null; |
| volatile CacheProvider cacheProvider = CacheProvider.NULL_PROVIDER; |
| RateLimiter readLimiter = null; |
| Configuration hadoopConf = null; |
| CryptoService cryptoService = null; |
| |
| public CachableBuilder conf(Configuration hadoopConf) { |
| this.hadoopConf = hadoopConf; |
| return this; |
| } |
| |
| public CachableBuilder fsPath(FileSystem fs, Path dataFile) { |
| this.cacheId = pathToCacheId(dataFile); |
| this.inputSupplier = () -> fs.open(dataFile); |
| this.lengthSupplier = () -> fs.getFileStatus(dataFile).getLen(); |
| return this; |
| } |
| |
| public CachableBuilder input(InputStream is, String cacheId) { |
| this.cacheId = cacheId; |
| this.inputSupplier = () -> is; |
| return this; |
| } |
| |
| public CachableBuilder length(long len) { |
| this.lengthSupplier = () -> len; |
| return this; |
| } |
| |
| public CachableBuilder fileLen(Cache<String,Long> cache) { |
| this.fileLenCache = cache; |
| return this; |
| } |
| |
| public CachableBuilder cacheProvider(CacheProvider cacheProvider) { |
| this.cacheProvider = cacheProvider; |
| return this; |
| } |
| |
| public CachableBuilder readLimiter(RateLimiter readLimiter) { |
| this.readLimiter = readLimiter; |
| return this; |
| } |
| |
| public CachableBuilder cryptoService(CryptoService cryptoService) { |
| this.cryptoService = cryptoService; |
| return this; |
| } |
| } |
| |
| /** |
| * Class wraps the BCFile reader. |
| */ |
| public static class Reader implements Closeable { |
| private final RateLimiter readLimiter; |
| // private BCFile.Reader _bc; |
| private final String cacheId; |
| private CacheProvider cacheProvider; |
| private Cache<String,Long> fileLenCache = null; |
| private volatile InputStream fin = null; |
| private boolean closed = false; |
| private final Configuration conf; |
| private final CryptoService cryptoService; |
| |
| private final IoeSupplier<InputStream> inputSupplier; |
| private final IoeSupplier<Long> lengthSupplier; |
| private final AtomicReference<BCFile.Reader> bcfr = new AtomicReference<>(); |
| |
| private static final String ROOT_BLOCK_NAME = "!RootData"; |
| |
| // ACCUMULO-4716 - Define MAX_ARRAY_SIZE smaller than Integer.MAX_VALUE to prevent possible |
| // OutOfMemory |
| // errors when allocating arrays - described in stackoverflow post: |
| // https://stackoverflow.com/a/8381338 |
| private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; |
| |
| private long getCachedFileLen() throws IOException { |
| try { |
| return fileLenCache.get(cacheId, lengthSupplier::get); |
| } catch (ExecutionException e) { |
| throw new IOException("Failed to get " + cacheId + " len from cache ", e); |
| } |
| } |
| |
| private BCFile.Reader getBCFile(byte[] serializedMetadata) throws IOException { |
| |
| BCFile.Reader reader = bcfr.get(); |
| if (reader == null) { |
| RateLimitedInputStream fsIn = |
| new RateLimitedInputStream((InputStream & Seekable) inputSupplier.get(), readLimiter); |
| BCFile.Reader tmpReader = null; |
| if (serializedMetadata == null) { |
| if (fileLenCache == null) { |
| tmpReader = new BCFile.Reader(fsIn, lengthSupplier.get(), conf, cryptoService); |
| } else { |
| long len = getCachedFileLen(); |
| try { |
| tmpReader = new BCFile.Reader(fsIn, len, conf, cryptoService); |
| } catch (Exception e) { |
| log.debug("Failed to open {}, clearing file length cache and retrying", cacheId, e); |
| fileLenCache.invalidate(cacheId); |
| } |
| |
| if (tmpReader == null) { |
| len = getCachedFileLen(); |
| tmpReader = new BCFile.Reader(fsIn, len, conf, cryptoService); |
| } |
| } |
| } else { |
| tmpReader = new BCFile.Reader(serializedMetadata, fsIn, conf, cryptoService); |
| } |
| |
| if (bcfr.compareAndSet(null, tmpReader)) { |
| fin = fsIn; |
| return tmpReader; |
| } else { |
| fsIn.close(); |
| tmpReader.close(); |
| return bcfr.get(); |
| } |
| } |
| |
| return reader; |
| } |
| |
| private BCFile.Reader getBCFile() throws IOException { |
| BlockCache _iCache = cacheProvider.getIndexCache(); |
| if (_iCache != null) { |
| CacheEntry mce = _iCache.getBlock(cacheId + ROOT_BLOCK_NAME, new BCFileLoader()); |
| if (mce != null) { |
| return getBCFile(mce.getBuffer()); |
| } |
| } |
| |
| return getBCFile(null); |
| } |
| |
| private class BCFileLoader implements Loader { |
| |
| @Override |
| public Map<String,Loader> getDependencies() { |
| return Collections.emptyMap(); |
| } |
| |
| @Override |
| public byte[] load(int maxSize, Map<String,byte[]> dependencies) { |
| try { |
| return getBCFile(null).serializeMetadata(maxSize); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| } |
| |
| private class RawBlockLoader extends BaseBlockLoader { |
| private long offset; |
| private long compressedSize; |
| private long rawSize; |
| |
| private RawBlockLoader(long offset, long compressedSize, long rawSize, boolean loadingMeta) { |
| super(loadingMeta); |
| this.offset = offset; |
| this.compressedSize = compressedSize; |
| this.rawSize = rawSize; |
| } |
| |
| @Override |
| BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException { |
| if (rawSize > Math.min(maxSize, MAX_ARRAY_SIZE)) { |
| return null; |
| } |
| return bcfr.getDataBlock(offset, compressedSize, rawSize); |
| } |
| |
| @Override |
| String getBlockId() { |
| return "raw-(" + offset + "," + compressedSize + "," + rawSize + ")"; |
| } |
| } |
| |
| private class OffsetBlockLoader extends BaseBlockLoader { |
| private int blockIndex; |
| |
| private OffsetBlockLoader(int blockIndex, boolean loadingMeta) { |
| super(loadingMeta); |
| this.blockIndex = blockIndex; |
| } |
| |
| @Override |
| BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException { |
| if (bcfr.getDataBlockRawSize(blockIndex) > Math.min(maxSize, MAX_ARRAY_SIZE)) { |
| return null; |
| } |
| return bcfr.getDataBlock(blockIndex); |
| } |
| |
| @Override |
| String getBlockId() { |
| return "bi-" + blockIndex; |
| } |
| } |
| |
| private class MetaBlockLoader extends BaseBlockLoader { |
| String blockName; |
| |
| MetaBlockLoader(String blockName) { |
| super(true); |
| this.blockName = blockName; |
| } |
| |
| @Override |
| BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException { |
| if (bcfr.getMetaBlockRawSize(blockName) > Math.min(maxSize, MAX_ARRAY_SIZE)) { |
| return null; |
| } |
| return bcfr.getMetaBlock(blockName); |
| } |
| |
| @Override |
| String getBlockId() { |
| return "meta-" + blockName; |
| } |
| } |
| |
| private abstract class BaseBlockLoader implements Loader { |
| |
| abstract BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException; |
| |
| abstract String getBlockId(); |
| |
| private boolean loadingMetaBlock; |
| |
| public BaseBlockLoader(boolean loadingMetaBlock) { |
| super(); |
| this.loadingMetaBlock = loadingMetaBlock; |
| } |
| |
| @Override |
| public Map<String,Loader> getDependencies() { |
| if (bcfr.get() == null && loadingMetaBlock) { |
| String _lookup = cacheId + ROOT_BLOCK_NAME; |
| return Collections.singletonMap(_lookup, new BCFileLoader()); |
| } |
| return Collections.emptyMap(); |
| } |
| |
| @Override |
| public byte[] load(int maxSize, Map<String,byte[]> dependencies) { |
| |
| try { |
| BCFile.Reader reader = bcfr.get(); |
| if (reader == null) { |
| if (loadingMetaBlock) { |
| byte[] serializedMetadata = dependencies.get(cacheId + ROOT_BLOCK_NAME); |
| reader = getBCFile(serializedMetadata); |
| } else { |
| reader = getBCFile(); |
| } |
| } |
| |
| BlockReader _currBlock = getBlockReader(maxSize, reader); |
| if (_currBlock == null) { |
| return null; |
| } |
| |
| byte[] b = null; |
| try { |
| b = new byte[(int) _currBlock.getRawSize()]; |
| _currBlock.readFully(b); |
| } catch (IOException e) { |
| log.debug("Error full blockRead for file " + cacheId + " for block " + getBlockId(), e); |
| throw new UncheckedIOException(e); |
| } finally { |
| _currBlock.close(); |
| } |
| |
| return b; |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| } |
| |
| public Reader(CachableBuilder b) { |
| this.cacheId = Objects.requireNonNull(b.cacheId); |
| this.inputSupplier = b.inputSupplier; |
| this.lengthSupplier = b.lengthSupplier; |
| this.fileLenCache = b.fileLenCache; |
| this.cacheProvider = b.cacheProvider; |
| this.readLimiter = b.readLimiter; |
| this.conf = b.hadoopConf; |
| this.cryptoService = Objects.requireNonNull(b.cryptoService); |
| } |
| |
| /** |
| * It is intended that once the BlockRead object is returned to the caller, that the caller will |
| * read the entire block and then call close on the BlockRead class. |
| */ |
| public CachedBlockRead getMetaBlock(String blockName) throws IOException { |
| BlockCache _iCache = cacheProvider.getIndexCache(); |
| if (_iCache != null) { |
| String _lookup = this.cacheId + "M" + blockName; |
| try { |
| CacheEntry ce = _iCache.getBlock(_lookup, new MetaBlockLoader(blockName)); |
| if (ce != null) { |
| return new CachedBlockRead(ce, ce.getBuffer()); |
| } |
| } catch (UncheckedIOException uioe) { |
| if (uioe.getCause() instanceof MetaBlockDoesNotExist) { |
| // When a block does not exists, its expected that MetaBlockDoesNotExist is thrown. |
| // However do not want to throw cause, because stack trace info |
| // would be lost. So rewrap and throw ino rder to preserve full stack trace. |
| throw new MetaBlockDoesNotExist(uioe); |
| } |
| throw uioe; |
| } |
| } |
| |
| BlockReader _currBlock = getBCFile(null).getMetaBlock(blockName); |
| return new CachedBlockRead(_currBlock); |
| } |
| |
| public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSize) |
| throws IOException { |
| BlockCache _iCache = cacheProvider.getIndexCache(); |
| if (_iCache != null) { |
| String _lookup = this.cacheId + "R" + offset; |
| CacheEntry ce = |
| _iCache.getBlock(_lookup, new RawBlockLoader(offset, compressedSize, rawSize, true)); |
| if (ce != null) { |
| return new CachedBlockRead(ce, ce.getBuffer()); |
| } |
| } |
| |
| BlockReader _currBlock = getBCFile(null).getDataBlock(offset, compressedSize, rawSize); |
| return new CachedBlockRead(_currBlock); |
| } |
| |
| /** |
| * It is intended that once the BlockRead object is returned to the caller, that the caller will |
| * read the entire block and then call close on the BlockRead class. |
| * |
| * NOTE: In the case of multi-read threads: This method can do redundant work where an entry is |
| * read from disk and other threads check the cache before it has been inserted. |
| */ |
| |
| public CachedBlockRead getDataBlock(int blockIndex) throws IOException { |
| BlockCache _dCache = cacheProvider.getDataCache(); |
| if (_dCache != null) { |
| String _lookup = this.cacheId + "O" + blockIndex; |
| CacheEntry ce = _dCache.getBlock(_lookup, new OffsetBlockLoader(blockIndex, false)); |
| if (ce != null) { |
| return new CachedBlockRead(ce, ce.getBuffer()); |
| } |
| } |
| |
| BlockReader _currBlock = getBCFile().getDataBlock(blockIndex); |
| return new CachedBlockRead(_currBlock); |
| } |
| |
| public CachedBlockRead getDataBlock(long offset, long compressedSize, long rawSize) |
| throws IOException { |
| BlockCache _dCache = cacheProvider.getDataCache(); |
| if (_dCache != null) { |
| String _lookup = this.cacheId + "R" + offset; |
| CacheEntry ce = |
| _dCache.getBlock(_lookup, new RawBlockLoader(offset, compressedSize, rawSize, false)); |
| if (ce != null) { |
| return new CachedBlockRead(ce, ce.getBuffer()); |
| } |
| } |
| |
| BlockReader _currBlock = getBCFile().getDataBlock(offset, compressedSize, rawSize); |
| return new CachedBlockRead(_currBlock); |
| } |
| |
| @Override |
| public synchronized void close() throws IOException { |
| if (closed) |
| return; |
| |
| closed = true; |
| |
| BCFile.Reader reader = bcfr.get(); |
| if (reader != null) |
| reader.close(); |
| |
| if (fin != null) { |
| // synchronize on the FSDataInputStream to ensure thread safety with the |
| // BoundedRangeFileInputStream |
| synchronized (fin) { |
| fin.close(); |
| } |
| } |
| } |
| |
| public void setCacheProvider(CacheProvider cacheProvider) { |
| this.cacheProvider = cacheProvider; |
| } |
| |
| } |
| |
| public static class CachedBlockRead extends DataInputStream { |
| private SeekableByteArrayInputStream seekableInput; |
| private final CacheEntry cb; |
| boolean indexable; |
| |
| public CachedBlockRead(InputStream in) { |
| super(in); |
| cb = null; |
| seekableInput = null; |
| indexable = false; |
| } |
| |
| public CachedBlockRead(CacheEntry cb, byte[] buf) { |
| this(new SeekableByteArrayInputStream(buf), cb); |
| } |
| |
| private CachedBlockRead(SeekableByteArrayInputStream seekableInput, CacheEntry cb) { |
| super(seekableInput); |
| this.seekableInput = seekableInput; |
| this.cb = cb; |
| indexable = true; |
| } |
| |
| public void seek(int position) { |
| seekableInput.seek(position); |
| } |
| |
| public int getPosition() { |
| return seekableInput.getPosition(); |
| } |
| |
| public boolean isIndexable() { |
| return indexable; |
| } |
| |
| public byte[] getBuffer() { |
| return seekableInput.getBuffer(); |
| } |
| |
| public <T extends Weighable> T getIndex(Supplier<T> indexSupplier) { |
| return cb.getIndex(indexSupplier); |
| } |
| |
| public void indexWeightChanged() { |
| cb.indexWeightChanged(); |
| } |
| } |
| } |