blob: 711f31d8ee03aaac0aea0ebf7aa1a650005de61d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Compact passed set of files in the mob-enabled column family.
*/
@InterfaceAudience.Private
public class DefaultMobStoreCompactor extends DefaultCompactor {
private static final Log LOG = LogFactory.getLog(DefaultMobStoreCompactor.class);
private long mobSizeThreshold;
private HMobStore mobStore;
private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
@Override
public ScanType getScanType(CompactionRequest request) {
return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
: ScanType.COMPACT_DROP_DELETES;
}
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners,
ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
if (scanType == ScanType.COMPACT_DROP_DELETES) {
// In major compaction, we need to write the delete markers to del files, so we have to
// retain the them in scanning.
scanType = ScanType.COMPACT_RETAIN_DELETES;
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, fd.earliestPutTs, true);
} else {
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, fd.earliestPutTs, false);
}
}
};
private final CellSinkFactory<StoreFileWriter> writerFactory =
new CellSinkFactory<StoreFileWriter>() {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind) throws IOException {
// make this writer with tags always because of possible new cells with tags.
return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true,
shouldDropBehind);
}
};
public DefaultMobStoreCompactor(Configuration conf, Store store) {
super(conf, store);
// The mob cells reside in the mob-enabled column family which is held by HMobStore.
// During the compaction, the compactor reads the cells from the mob files and
// probably creates new mob files. All of these operations are included in HMobStore,
// so we need to cast the Store to HMobStore.
if (!(store instanceof HMobStore)) {
throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
}
mobStore = (HMobStore) store;
mobSizeThreshold = store.getFamily().getMobThreshold();
}
@Override
public List<Path> compact(CompactionRequest request, ThroughputController throughputController,
User user) throws IOException {
return compact(request, scannerFactory, writerFactory, throughputController, user);
}
// TODO refactor to take advantage of the throughput controller.
/**
* Performs compaction on a column family with the mob flag enabled.
* This is for when the mob threshold size has changed or if the mob
* column family mode has been toggled via an alter table statement.
* Compacts the files by the following rules.
* 1. If the cell has a mob reference tag, the cell's value is the path of the mob file.
* <ol>
* <li>
* If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
* directly copy the (with mob tag) cell into the new store file.
* </li>
* <li>
* Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
* the new store file.
* </li>
* </ol>
* 2. If the cell doesn't have a reference tag.
* <ol>
* <li>
* If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
* write this cell to a mob file, and write the path of this mob file to the store file.
* </li>
* <li>
* Otherwise, directly write this cell into the store file.
* </li>
* </ol>
* In the mob compaction, the {@link MobCompactionStoreScanner} is used as a scanner
* which could output the normal cells and delete markers together when required.
* After the major compaction on the normal hfiles, we have a guarantee that we have purged all
* deleted or old version mob refs, and the delete markers are written to a del file with the
* suffix _del. Because of this, it is safe to use the del file in the mob compaction.
* The mob compaction doesn't take place in the normal hfiles, it occurs directly in the
* mob files. When the small mob files are merged into bigger ones, the del file is added into
* the scanner to filter the deleted cells.
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId,
ThroughputController throughputController, boolean major) throws IOException {
if (!(scanner instanceof MobCompactionStoreScanner)) {
throw new IllegalArgumentException(
"The scanner should be an instance of MobCompactionStoreScanner");
}
MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) scanner;
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
List<Cell> cells = new ArrayList<Cell>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckInterval = HStore.getCloseCheckInterval();
boolean hasMore;
Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
byte[] fileName = null;
StoreFileWriter mobFileWriter = null, delFileWriter = null;
long mobCells = 0, deleteMarkersCount = 0;
Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE,
store.getTableName().getName());
long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
try {
try {
// If the mob file writer could not be created, directly write the cell to the store file.
mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
} catch (IOException e) {
LOG.error("Failed to create mob writer, "
+ "we will continue the compaction by writing MOB cells directly in store files", e);
}
delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
do {
hasMore = compactionScanner.next(cells, scannerContext);
for (Cell c : cells) {
if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) {
delFileWriter.append(c);
deleteMarkersCount++;
} else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
// If the mob file writer is null or the kv type is not put, directly write the cell
// to the store file.
writer.append(c);
} else if (MobUtils.isMobReferenceCell(c)) {
if (MobUtils.hasValidMobRefCellValue(c)) {
int size = MobUtils.getMobValueLength(c);
if (size > mobSizeThreshold) {
// If the value size is larger than the threshold, it's regarded as a mob. Since
// its value is already in the mob file, directly write this cell to the store file
writer.append(c);
} else {
// If the value is not larger than the threshold, it's not regarded a mob. Retrieve
// the mob cell from the mob file, and write it back to the store file.
Cell mobCell = mobStore.resolve(c, false);
if (mobCell.getValueLength() != 0) {
// put the mob data back to the store file
CellUtil.setSequenceId(mobCell, c.getSequenceId());
writer.append(mobCell);
cellsCountCompactedFromMob++;
cellsSizeCompactedFromMob += mobCell.getValueLength();
} else {
// If the value of a file is empty, there might be issues when retrieving,
// directly write the cell to the store file, and leave it to be handled by the
// next compaction.
writer.append(c);
}
}
} else {
LOG.warn("The value format of the KeyValue " + c
+ " is wrong, its length is less than " + Bytes.SIZEOF_INT);
writer.append(c);
}
} else if (c.getValueLength() <= mobSizeThreshold) {
//If value size of a cell is not larger than the threshold, directly write to store file
writer.append(c);
} else {
// If the value size of a cell is larger than the threshold, it's regarded as a mob,
// write this cell to a mob file, and write the path to the store file.
mobCells++;
// append the original keyValue in the mob file.
mobFileWriter.append(c);
KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag);
// write the cell whose value is the path of a mob file to the store file.
writer.append(reference);
cellsCountCompactedToMob++;
cellsSizeCompactedToMob += c.getValueLength();
}
++progress.currentCompactedKVs;
// check periodically to see if a system stop is requested
if (closeCheckInterval > 0) {
bytesWritten += KeyValueUtil.length(c);
if (bytesWritten > closeCheckInterval) {
bytesWritten = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
}
}
}
}
cells.clear();
} while (hasMore);
} finally {
if (mobFileWriter != null) {
mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
mobFileWriter.close();
}
if (delFileWriter != null) {
delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
delFileWriter.close();
}
}
if (mobFileWriter != null) {
if (mobCells > 0) {
// If the mob file is not empty, commit it.
mobStore.commitFile(mobFileWriter.getPath(), path);
} else {
try {
// If the mob file is empty, delete it instead of committing.
store.getFileSystem().delete(mobFileWriter.getPath(), true);
} catch (IOException e) {
LOG.error("Failed to delete the temp mob file", e);
}
}
}
if (delFileWriter != null) {
if (deleteMarkersCount > 0) {
// If the del file is not empty, commit it.
// If the commit fails, the compaction is re-performed again.
mobStore.commitFile(delFileWriter.getPath(), path);
} else {
try {
// If the del file is empty, delete it instead of committing.
store.getFileSystem().delete(delFileWriter.getPath(), true);
} catch (IOException e) {
LOG.error("Failed to delete the temp del file", e);
}
}
}
mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
progress.complete();
return true;
}
}