blob: 6df4589c6583e24c1074a499bcec82e4b9158e4c [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
import com.gemstone.gemfire.internal.cache.persistence.soplog.AbstractSortedReader;
import com.gemstone.gemfire.internal.cache.persistence.soplog.DelegatingSerializedComparator;
import com.gemstone.gemfire.internal.cache.persistence.soplog.ReversingSerializedComparator;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedBuffer.BufferIterator;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator;
import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedStatistics;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.util.Bytes;
import com.gemstone.gemfire.internal.util.Hex;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.internal.logging.LogService;
/**
* Provides a soplog backed by an HFile.
*
* @author bakera
*/
public class HFileSortedOplog implements SortedOplog {
public static final byte[] MAGIC = new byte[] { 0x53, 0x4F, 0x50 };
public static final byte[] VERSION_1 = new byte[] { 0x1 };
// FileInfo is not visible
private static final byte[] AVG_KEY_LEN = "hfile.AVG_KEY_LEN".getBytes();
private static final byte[] AVG_VALUE_LEN = "hfile.AVG_VALUE_LEN".getBytes();
/** a default bloom filter */
private static final BloomFilter DUMMY_BLOOM = new BloomFilter() {
@Override
public boolean mightContain(byte[] key) {
return true;
}
};
static final Configuration hconf;
private static final FileSystem fs;
static {
// Leave these HBase properties set to defaults for now
//
// hfile.block.cache.size (25% of heap)
// hbase.hash.type (murmur)
// hfile.block.index.cacheonwrite (false)
// hfile.index.block.max.size (128k)
// hfile.format.version (2)
// io.storefile.bloom.block.size (128k)
// hfile.block.bloom.cacheonwrite (false)
// hbase.rs.cacheblocksonwrite (false)
// hbase.offheapcache.minblocksize (64k)
// hbase.offheapcache.percentage (0)
hconf = new Configuration();
hconf.setBoolean("hbase.metrics.showTableName", true);
SchemaMetrics.configureGlobally(hconf);
try {
fs = FileSystem.get(hconf);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
private static enum InternalMetadata {
/** identifies the soplog as a gemfire file, required */
GEMFIRE_MAGIC,
/** identifies the soplog version, required */
VERSION,
/** identifies the statistics data */
STATISTICS,
/** identifies the names of embedded comparators */
COMPARATORS;
public byte[] bytes() {
return ("gemfire." + name()).getBytes();
}
}
//logger instance
private static final Logger logger = LogService.getLogger();
protected final String logPrefix;
/** the configuration */
private final SortedOplogConfiguration sopConfig;
/** the hfile cache config */
private final CacheConfig hcache;
/** the hfile location */
private Path path;
public HFileSortedOplog(File hfile, SortedOplogConfiguration sopConfig) throws IOException {
assert hfile != null;
assert sopConfig != null;
this.sopConfig = sopConfig;
path = fs.makeQualified(new Path(hfile.toString()));
// hcache = new CacheConfig(hconf, sopConfig.getCacheDataBlocksOnRead(), sopConfig.getBlockCache(),
// HFileSortedOplogFactory.convertStatistics(sopConfig.getStatistics(), sopConfig.getStoreStatistics()));
hcache = new CacheConfig(hconf);
this.logPrefix = "<" + sopConfig.getName() + "> ";
}
@Override
public SortedOplogReader createReader() throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("{}Creating an HFile reader on " + path, logPrefix);
}
return new HFileSortedOplogReader();
}
@Override
public SortedOplogWriter createWriter() throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("{}Creating an HFile writer on " + path, logPrefix);
}
return new HFileSortedOplogWriter();
}
SortedOplogConfiguration getConfiguration() {
return sopConfig;
}
private class HFileSortedOplogReader extends AbstractSortedReader implements SortedOplogReader {
private final Reader reader;
private final BloomFilter bloom;
private final SortedStatistics stats;
private volatile boolean closed;
public HFileSortedOplogReader() throws IOException {
reader = HFile.createReader(fs, path, hcache);
validate();
stats = new HFileSortedStatistics(reader);
closed = false;
if (reader.getComparator() instanceof DelegatingSerializedComparator) {
loadComparators((DelegatingSerializedComparator) reader.getComparator());
}
DataInput bin = reader.getGeneralBloomFilterMetadata();
if (bin != null) {
final org.apache.hadoop.hbase.util.BloomFilter hbloom = BloomFilterFactory.createFromMeta(bin, reader);
if (reader.getComparator() instanceof DelegatingSerializedComparator) {
loadComparators((DelegatingSerializedComparator) hbloom.getComparator());
}
bloom = new BloomFilter() {
@Override
public boolean mightContain(byte[] key) {
assert key != null;
long start = sopConfig.getStatistics().getBloom().begin();
boolean foundKey = hbloom.contains(key, 0, key.length, null);
sopConfig.getStatistics().getBloom().end(start);
if (logger.isTraceEnabled()) {
logger.trace(String.format("{}Bloom check on %s for key %s: %b",
path, Hex.toHex(key), foundKey), logPrefix);
}
return foundKey;
}
};
} else {
bloom = DUMMY_BLOOM;
}
}
@Override
public boolean mightContain(byte[] key) {
return getBloomFilter().mightContain(key);
}
@Override
public ByteBuffer read(byte[] key) throws IOException {
assert key != null;
if (logger.isTraceEnabled()) {
logger.trace(String.format("{}Reading key %s from %s", Hex.toHex(key), path), logPrefix);
}
long start = sopConfig.getStatistics().getRead().begin();
try {
HFileScanner seek = reader.getScanner(true, true);
if (seek.seekTo(key) == 0) {
ByteBuffer val = seek.getValue();
sopConfig.getStatistics().getRead().end(val.remaining(), start);
return val;
}
sopConfig.getStatistics().getRead().end(start);
sopConfig.getStatistics().getBloom().falsePositive();
return null;
} catch (IOException e) {
sopConfig.getStatistics().getRead().error(start);
throw (IOException) e.fillInStackTrace();
}
}
@Override
public SortedIterator<ByteBuffer> scan(
byte[] from, boolean fromInclusive,
byte[] to, boolean toInclusive,
boolean ascending,
MetadataFilter filter) throws IOException {
if (filter == null || filter.accept(getMetadata(filter.getName()))) {
SerializedComparator tmp = (SerializedComparator) reader.getComparator();
tmp = ascending ? tmp : ReversingSerializedComparator.reverse(tmp);
// HFileScanner scan = reader.getScanner(true, false, ascending, false);
HFileScanner scan = reader.getScanner(true, false, false);
return new HFileSortedIterator(scan, tmp, from, fromInclusive, to, toInclusive);
}
return new BufferIterator(Collections.<byte[], byte[]>emptyMap().entrySet().iterator());
}
@Override
public SerializedComparator getComparator() {
return (SerializedComparator) reader.getComparator();
}
@Override
public SortedStatistics getStatistics() {
return stats;
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public void close() throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("{}Closing reader on " + path, logPrefix);
}
reader.close();
closed = true;
}
@Override
public BloomFilter getBloomFilter() {
return bloom;
}
@Override
public byte[] getMetadata(Metadata name) throws IOException {
assert name != null;
return reader.loadFileInfo().get(name.bytes());
}
@Override
public File getFile() {
return new File(path.toUri());
}
@Override
public String getFileName() {
return path.getName();
}
@Override
public long getModificationTimeStamp() throws IOException {
FileStatus[] stats = FSUtils.listStatus(fs, path, null);
if (stats != null && stats.length == 1) {
return stats[0].getModificationTime();
} else {
return 0;
}
}
@Override
public void rename(String name) throws IOException {
Path parent = path.getParent();
Path newPath = new Path(parent, name);
fs.rename(path, newPath);
// update path to point to the new path
path = newPath;
}
@Override
public void delete() throws IOException {
fs.delete(path, false);
}
@Override
public String toString() {
return path.toString();
}
private byte[] getMetadata(InternalMetadata name) throws IOException {
return reader.loadFileInfo().get(name.bytes());
}
private void validate() throws IOException {
// check magic
byte[] magic = getMetadata(InternalMetadata.GEMFIRE_MAGIC);
if (!Arrays.equals(magic, MAGIC)) {
throw new IOException(LocalizedStrings.Soplog_INVALID_MAGIC.toLocalizedString(Hex.toHex(magic)));
}
// check version compatibility
byte[] ver = getMetadata(InternalMetadata.VERSION);
if (logger.isDebugEnabled()) {
logger.debug("{}Soplog version is " + Hex.toHex(ver), logPrefix);
}
if (!Arrays.equals(ver, VERSION_1)) {
throw new IOException(LocalizedStrings.Soplog_UNRECOGNIZED_VERSION.toLocalizedString(Hex.toHex(ver)));
}
}
private void loadComparators(DelegatingSerializedComparator comparator) throws IOException {
byte[] raw = reader.loadFileInfo().get(InternalMetadata.COMPARATORS.bytes());
assert raw != null;
DataInput in = new DataInputStream(new ByteArrayInputStream(raw));
comparator.setComparators(readComparators(in));
}
private SerializedComparator[] readComparators(DataInput in) throws IOException {
try {
SerializedComparator[] comps = new SerializedComparator[in.readInt()];
assert comps.length > 0;
for (int i = 0; i < comps.length; i++) {
comps[i] = (SerializedComparator) Class.forName(in.readUTF()).newInstance();
if (comps[i] instanceof DelegatingSerializedComparator) {
((DelegatingSerializedComparator) comps[i]).setComparators(readComparators(in));
}
}
return comps;
} catch (Exception e) {
throw new IOException(e);
}
}
}
private class HFileSortedOplogWriter implements SortedOplogWriter {
private final Writer writer;
private final BloomFilterWriter bfw;
public HFileSortedOplogWriter() throws IOException {
writer = HFile.getWriterFactory(hconf, hcache)
.withPath(fs, path)
.withBlockSize(sopConfig.getBlockSize())
.withBytesPerChecksum(sopConfig.getBytesPerChecksum())
.withChecksumType(HFileSortedOplogFactory.convertChecksum(sopConfig.getChecksum()))
// .withComparator(sopConfig.getComparator())
.withCompression(HFileSortedOplogFactory.convertCompression(sopConfig.getCompression()))
.withDataBlockEncoder(HFileSortedOplogFactory.convertEncoding(sopConfig.getKeyEncoding()))
.create();
bfw = sopConfig.isBloomFilterEnabled() ?
// BloomFilterFactory.createGeneralBloomAtWrite(hconf, hcache, BloomType.ROW,
// 0, writer, sopConfig.getComparator())
BloomFilterFactory.createGeneralBloomAtWrite(hconf, hcache, BloomType.ROW,
0, writer)
: null;
}
@Override
public void append(byte[] key, byte[] value) throws IOException {
assert key != null;
assert value != null;
if (logger.isTraceEnabled()) {
logger.trace(String.format("{}Appending key %s to %s", Hex.toHex(key), path), logPrefix);
}
try {
writer.append(key, value);
if (bfw != null) {
bfw.add(key, 0, key.length);
}
} catch (IOException e) {
throw (IOException) e.fillInStackTrace();
}
}
@Override
public void append(ByteBuffer key, ByteBuffer value) throws IOException {
assert key != null;
assert value != null;
if (logger.isTraceEnabled()) {
logger.trace(String.format("{}Appending key %s to %s",
Hex.toHex(key.array(), key.arrayOffset(), key.remaining()), path), logPrefix);
}
try {
byte[] keyBytes = new byte[key.remaining()];
key.duplicate().get(keyBytes);
byte[] valueBytes = new byte[value.remaining()];
value.duplicate().get(valueBytes);
writer.append(keyBytes, valueBytes);
if (bfw != null) {
bfw.add(key.array(), key.arrayOffset(), key.remaining());
}
} catch (IOException e) {
throw (IOException) e.fillInStackTrace();
}
}
@Override
public void close(EnumMap<Metadata, byte[]> metadata) throws IOException {
if (logger.isTraceEnabled()) {
logger.debug("{}Finalizing and closing writer on " + path, logPrefix);
}
if (bfw != null) {
bfw.compactBloom();
writer.addGeneralBloomFilter(bfw);
}
// append system metadata
writer.appendFileInfo(InternalMetadata.GEMFIRE_MAGIC.bytes(), MAGIC);
writer.appendFileInfo(InternalMetadata.VERSION.bytes(), VERSION_1);
// append comparator info
// if (writer.getComparator() instanceof DelegatingSerializedComparator) {
// ByteArrayOutputStream bos = new ByteArrayOutputStream();
// DataOutput out = new DataOutputStream(bos);
//
// writeComparatorInfo(out, ((DelegatingSerializedComparator) writer.getComparator()).getComparators());
// writer.appendFileInfo(InternalMetadata.COMPARATORS.bytes(), bos.toByteArray());
// }
// TODO write statistics data to soplog
// writer.appendFileInfo(Meta.STATISTICS.toBytes(), null);
// append user metadata
if (metadata != null) {
for (Entry<Metadata, byte[]> entry : metadata.entrySet()) {
writer.appendFileInfo(entry.getKey().name().getBytes(), entry.getValue());
}
}
writer.close();
}
@Override
public void closeAndDelete() throws IOException {
if (logger.isTraceEnabled()) {
logger.debug("{}Closing writer and deleting " + path, logPrefix);
}
writer.close();
new File(writer.getPath().toUri()).delete();
}
// private void writeComparatorInfo(DataOutput out, SerializedComparator[] comparators) throws IOException {
// out.writeInt(comparators.length);
// for (SerializedComparator sc : comparators) {
// out.writeUTF(sc.getClass().getName());
// if (sc instanceof DelegatingSerializedComparator) {
// writeComparatorInfo(out, ((DelegatingSerializedComparator) sc).getComparators());
// }
// }
// }
}
private class HFileSortedIterator implements SortedIterator<ByteBuffer> {
private final HFileScanner scan;
private final SerializedComparator comparator;
private final byte[] from;
private final boolean fromInclusive;
private final byte[] to;
private final boolean toInclusive;
private final long start;
private long bytes;
private boolean foundNext;
private ByteBuffer key;
private ByteBuffer value;
public HFileSortedIterator(HFileScanner scan, SerializedComparator comparator,
byte[] from, boolean fromInclusive,
byte[] to, boolean toInclusive) throws IOException {
this.scan = scan;
this.comparator = comparator;
this.from = from;
this.fromInclusive = fromInclusive;
this.to = to;
this.toInclusive = toInclusive;
assert from == null
|| to == null
|| comparator.compare(from, 0, from.length, to, 0, to.length) <= 0;
start = sopConfig.getStatistics().getScan().begin();
foundNext = evalFrom();
}
@Override
public ByteBuffer key() {
return key;
}
@Override
public ByteBuffer value() {
return value;
}
@Override
public boolean hasNext() {
if (!foundNext) {
foundNext = step();
}
return foundNext;
}
@Override
public ByteBuffer next() {
long startNext = sopConfig.getStatistics().getScan().beginIteration();
if (!hasNext()) {
throw new NoSuchElementException();
}
foundNext = false;
key = scan.getKey();
value = scan.getValue();
int len = key.remaining() + value.remaining();
bytes += len;
sopConfig.getStatistics().getScan().endIteration(len, startNext);
return key;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
sopConfig.getStatistics().getScan().end(bytes, start);
}
private boolean step() {
try {
if (!scan.isSeeked()) {
return false;
} else if (scan.next() && evalTo()) {
return true;
}
} catch (IOException e) {
throw new HDFSIOException("Error from HDFS during iteration", e);
}
return false;
}
private boolean evalFrom() throws IOException {
if (from == null) {
return scan.seekTo() && evalTo();
} else {
int compare = scan.seekTo(from);
if (compare < 0) {
return scan.seekTo() && evalTo();
} else if (compare == 0 && fromInclusive) {
return true;
} else {
return step();
}
}
}
private boolean evalTo() throws IOException {
int compare = -1;
if (to != null) {
ByteBuffer key = scan.getKey();
compare = comparator.compare(
key.array(), key.arrayOffset(), key.remaining(),
to, 0, to.length);
}
return compare < 0 || (compare == 0 && toInclusive);
}
}
private static class HFileSortedStatistics implements SortedStatistics {
private final Reader reader;
private final int keySize;
private final int valueSize;
public HFileSortedStatistics(Reader reader) throws IOException {
this.reader = reader;
byte[] sz = reader.loadFileInfo().get(AVG_KEY_LEN);
keySize = Bytes.toInt(sz[0], sz[1], sz[2], sz[3]);
sz = reader.loadFileInfo().get(AVG_VALUE_LEN);
valueSize = Bytes.toInt(sz[0], sz[1], sz[2], sz[3]);
}
@Override
public long keyCount() {
return reader.getEntries();
}
@Override
public byte[] firstKey() {
return reader.getFirstKey();
}
@Override
public byte[] lastKey() {
return reader.getLastKey();
}
@Override
public double avgKeySize() {
return keySize;
}
@Override
public double avgValueSize() {
return valueSize;
}
@Override
public void close() {
}
}
}