blob: 341b6d9321c09d87e72fb5fbf862765a0e33f059 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.summary;
import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import org.apache.accumulo.core.client.rfile.RFileSource;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.CacheEntry;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.summary.Gatherer.RowRange;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableUtils;
import com.google.common.cache.Cache;
public class SummaryReader {
private interface BlockReader {
DataInputStream getMetaBlock(String name) throws IOException;
}
private static class CompositeCache implements BlockCache {
private BlockCache summaryCache;
private BlockCache indexCache;
CompositeCache(BlockCache summaryCache, BlockCache indexCache) {
this.summaryCache = summaryCache;
this.indexCache = indexCache;
}
@Override
public CacheEntry cacheBlock(String blockName, byte[] buf) {
return summaryCache.cacheBlock(blockName, buf);
}
@Override
public CacheEntry getBlock(String blockName) {
CacheEntry ce = summaryCache.getBlock(blockName);
if (ce == null) {
// Its possible the index cache may have this info, so check there. This is an opportunistic
// check.
ce = indexCache.getBlock(blockName);
}
return ce;
}
@Override
public CacheEntry getBlock(String blockName, Loader loader) {
Loader idxLoader = new Loader() {
CacheEntry idxCacheEntry;
@Override
public Map<String,Loader> getDependencies() {
idxCacheEntry = indexCache.getBlock(blockName);
if (idxCacheEntry == null) {
return loader.getDependencies();
} else {
return Collections.emptyMap();
}
}
@Override
public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
if (idxCacheEntry == null) {
return loader.load(maxSize, dependencies);
} else {
return idxCacheEntry.getBuffer();
}
}
};
return summaryCache.getBlock(blockName, idxLoader);
}
@Override
public long getMaxSize() {
return summaryCache.getMaxSize();
}
@Override
public Stats getStats() {
return summaryCache.getStats();
}
@Override
public long getMaxHeapSize() {
return summaryCache.getMaxHeapSize();
}
}
private static List<SummarySerializer> load(BlockReader bcReader,
Predicate<SummarizerConfiguration> summarySelector) throws IOException {
try (DataInputStream in = bcReader.getMetaBlock(SummaryWriter.METASTORE_INDEX)) {
List<SummarySerializer> stores = new ArrayList<>();
readHeader(in);
int numSummaries = WritableUtils.readVInt(in);
for (int i = 0; i < numSummaries; i++) {
SummarizerConfiguration conf = readConfig(in);
boolean inline = in.readBoolean();
if (inline) {
if (summarySelector.test(conf)) {
stores.add(SummarySerializer.load(conf, in));
} else {
SummarySerializer.skip(in);
}
} else {
int block = WritableUtils.readVInt(in);
int offset = WritableUtils.readVInt(in);
if (summarySelector.test(conf)) {
try (DataInputStream summaryIn =
bcReader.getMetaBlock(SummaryWriter.METASTORE_PREFIX + "." + block)) {
long skipped = in.skip(offset);
while (skipped < offset) {
skipped += in.skip(offset - skipped);
}
stores.add(SummarySerializer.load(conf, summaryIn));
} catch (MetaBlockDoesNotExist e) {
// this is unexpected
throw new IOException(e);
}
}
}
}
return stores;
} catch (MetaBlockDoesNotExist e) {
return Collections.emptyList();
}
}
private static SummaryReader load(CachableBlockFile.Reader bcReader,
Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory)
throws IOException {
SummaryReader fileSummaries = new SummaryReader();
fileSummaries.summaryStores = load(bcReader::getMetaBlock, summarySelector);
fileSummaries.factory = factory;
return fileSummaries;
}
public static SummaryReader load(Configuration conf, RFileSource source, String cacheId,
Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory,
CryptoService cryptoService) throws IOException {
CachableBuilder cb = new CachableBuilder().input(source.getInputStream(), cacheId)
.length(source.getLength()).conf(conf).cryptoService(cryptoService);
return load(new CachableBlockFile.Reader(cb), summarySelector, factory);
}
public static SummaryReader load(FileSystem fs, Configuration conf, SummarizerFactory factory,
Path file, Predicate<SummarizerConfiguration> summarySelector, BlockCache summaryCache,
BlockCache indexCache, Cache<String,Long> fileLenCache, CryptoService cryptoService) {
CachableBlockFile.Reader bcReader = null;
try {
// the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when
// only summary data is wanted.
CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache);
CachableBuilder cb = new CachableBuilder().fsPath(fs, file).conf(conf).fileLen(fileLenCache)
.cacheProvider(new BasicCacheProvider(compositeCache, null)).cryptoService(cryptoService);
bcReader = new CachableBlockFile.Reader(cb);
return load(bcReader, summarySelector, factory);
} catch (FileNotFoundException fne) {
return getEmptyReader(factory);
} catch (IOException e) {
try {
if (!fs.exists(file)) {
return getEmptyReader(factory);
}
} catch (IOException e1) {}
throw new UncheckedIOException(e);
} finally {
if (bcReader != null) {
try {
bcReader.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
}
private static SummaryReader getEmptyReader(SummarizerFactory factory) {
SummaryReader sr = new SummaryReader();
sr.factory = factory;
sr.summaryStores = Collections.emptyList();
sr.deleted = true;
return sr;
}
public static void print(Reader iter, PrintStream out) throws IOException {
String indent = " ";
out.print("Summary data : \n");
List<SummarySerializer> stores = load(iter::getMetaStore, conf -> true);
int i = 1;
for (SummarySerializer summaryStore : stores) {
out.printf("%sSummary %d of %d generated by : %s\n", indent, i, stores.size(),
summaryStore.getSummarizerConfiguration());
i++;
summaryStore.print(indent, indent, out);
}
}
private static SummarizerConfiguration readConfig(DataInputStream in) throws IOException {
// read summarizer configuration
String summarizerClazz = in.readUTF();
String configId = in.readUTF();
org.apache.accumulo.core.client.summary.SummarizerConfiguration.Builder scb =
SummarizerConfiguration.builder(summarizerClazz).setPropertyId(configId);
int numOpts = WritableUtils.readVInt(in);
for (int i = 0; i < numOpts; i++) {
String k = in.readUTF();
String v = in.readUTF();
scb.addOption(k, v);
}
return scb.build();
}
private static byte readHeader(DataInputStream in) throws IOException {
long magic = in.readLong();
if (magic != SummaryWriter.MAGIC) {
throw new IOException("Bad magic : " + String.format("%x", magic));
}
byte ver = in.readByte();
if (ver != SummaryWriter.VER) {
throw new IOException("Unknown version : " + ver);
}
return ver;
}
private List<SummarySerializer> summaryStores;
private SummarizerFactory factory;
private boolean deleted;
public SummaryCollection getSummaries(List<RowRange> ranges) {
List<SummaryCollection.FileSummary> initial = new ArrayList<>();
if (deleted) {
return new SummaryCollection(initial, true);
}
for (SummarySerializer summaryStore : summaryStores) {
if (summaryStore.exceededMaxSize()) {
initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration()));
} else {
Map<String,Long> summary = summaryStore.getSummary(ranges, factory);
boolean exceeded = summaryStore.exceedsRange(ranges);
initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration(),
summary, exceeded));
}
}
return new SummaryCollection(initial);
}
}