| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.mob; |
| |
| import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; |
| import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.Optional; |
| import java.util.function.Consumer; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.DoNotRetryIOException; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.PrivateCellUtil; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.regionserver.CellSink; |
| import org.apache.hadoop.hbase.regionserver.HMobStore; |
| import org.apache.hadoop.hbase.regionserver.HStore; |
| import org.apache.hadoop.hbase.regionserver.HStoreFile; |
| import org.apache.hadoop.hbase.regionserver.InternalScanner; |
| import org.apache.hadoop.hbase.regionserver.KeyValueScanner; |
| import org.apache.hadoop.hbase.regionserver.ScanInfo; |
| import org.apache.hadoop.hbase.regionserver.ScanType; |
| import org.apache.hadoop.hbase.regionserver.ScannerContext; |
| import org.apache.hadoop.hbase.regionserver.ShipperListener; |
| import org.apache.hadoop.hbase.regionserver.StoreFileScanner; |
| import org.apache.hadoop.hbase.regionserver.StoreFileWriter; |
| import org.apache.hadoop.hbase.regionserver.StoreScanner; |
| import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; |
| import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; |
| import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; |
| import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; |
| import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; |
| import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; |
| import org.apache.hadoop.hbase.security.User; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; |
| import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap; |
| import org.apache.hbase.thirdparty.com.google.common.collect.Lists; |
| import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; |
| |
| /** |
| * Compact passed set of files in the mob-enabled column family. |
| */ |
| @InterfaceAudience.Private |
| public class DefaultMobStoreCompactor extends DefaultCompactor { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class); |
| protected long mobSizeThreshold; |
| protected HMobStore mobStore; |
| protected boolean ioOptimizedMode = false; |
| |
| /* |
| * MOB file reference set thread local variable. It contains set of a MOB file names, which newly |
| * compacted store file has references to. This variable is populated during compaction and the |
| * content of it is written into meta section of a newly created store file at the final step of |
| * compaction process. |
| */ |
| static ThreadLocal<SetMultimap<TableName, String>> mobRefSet = |
| ThreadLocal.withInitial(HashMultimap::create); |
| |
| /* |
| * Is it user or system-originated request. |
| */ |
| |
| static ThreadLocal<Boolean> userRequest = new ThreadLocal<Boolean>() { |
| @Override |
| protected Boolean initialValue() { |
| return Boolean.FALSE; |
| } |
| }; |
| |
| /* |
| * Disable IO mode. IO mode can be forcefully disabled if compactor finds old MOB file |
| * (pre-distributed compaction). This means that migration has not been completed yet. During data |
| * migration (upgrade) process only general compaction is allowed. |
| */ |
| |
| static ThreadLocal<Boolean> disableIO = new ThreadLocal<Boolean>() { |
| |
| @Override |
| protected Boolean initialValue() { |
| return Boolean.FALSE; |
| } |
| }; |
| |
| /* |
| * Map : MOB file name - file length Can be expensive for large amount of MOB files. |
| */ |
| static ThreadLocal<HashMap<String, Long>> mobLengthMap = |
| new ThreadLocal<HashMap<String, Long>>() { |
| @Override |
| protected HashMap<String, Long> initialValue() { |
| return new HashMap<String, Long>(); |
| } |
| }; |
| |
| private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { |
| |
| @Override |
| public ScanType getScanType(CompactionRequestImpl request) { |
| return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; |
| } |
| |
| @Override |
| public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, |
| ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { |
| return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, |
| fd.earliestPutTs); |
| } |
| }; |
| |
| private final CellSinkFactory<StoreFileWriter> writerFactory = |
| new CellSinkFactory<StoreFileWriter>() { |
| @Override |
| public StoreFileWriter createWriter(InternalScanner scanner, |
| org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, |
| boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker) |
| throws IOException { |
| // make this writer with tags always because of possible new cells with tags. |
| return store.getStoreEngine() |
| .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker) |
| .includeMVCCReadpoint(true).includesTag(true)); |
| } |
| }; |
| |
| public DefaultMobStoreCompactor(Configuration conf, HStore store) { |
| super(conf, store); |
| // The mob cells reside in the mob-enabled column family which is held by HMobStore. |
| // During the compaction, the compactor reads the cells from the mob files and |
| // probably creates new mob files. All of these operations are included in HMobStore, |
| // so we need to cast the Store to HMobStore. |
| if (!(store instanceof HMobStore)) { |
| throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); |
| } |
| this.mobStore = (HMobStore) store; |
| this.mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); |
| this.ioOptimizedMode = |
| conf.get(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.DEFAULT_MOB_COMPACTION_TYPE) |
| .equals(MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); |
| |
| } |
| |
| @Override |
| public List<Path> compact(CompactionRequestImpl request, |
| ThroughputController throughputController, User user) throws IOException { |
| String tableName = store.getTableName().toString(); |
| String regionName = store.getRegionInfo().getRegionNameAsString(); |
| String familyName = store.getColumnFamilyName(); |
| LOG.info( |
| "MOB compaction: major={} isAll={} priority={} throughput controller={}" |
| + " table={} cf={} region={}", |
| request.isMajor(), request.isAllFiles(), request.getPriority(), throughputController, |
| tableName, familyName, regionName); |
| if (request.getPriority() == HStore.PRIORITY_USER) { |
| userRequest.set(Boolean.TRUE); |
| } else { |
| userRequest.set(Boolean.FALSE); |
| } |
| LOG.debug("MOB compaction table={} cf={} region={} files: {}", tableName, familyName, |
| regionName, request.getFiles()); |
| // Check if I/O optimized MOB compaction |
| if (ioOptimizedMode) { |
| if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) { |
| try { |
| final SetMultimap<TableName, String> mobRefs = request.getFiles().stream().map(file -> { |
| byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS); |
| ImmutableSetMultimap.Builder<TableName, String> builder; |
| if (value == null) { |
| builder = ImmutableSetMultimap.builder(); |
| } else { |
| try { |
| builder = MobUtils.deserializeMobFileRefs(value); |
| } catch (RuntimeException exception) { |
| throw new RuntimeException("failure getting mob references for hfile " + file, |
| exception); |
| } |
| } |
| return builder; |
| }).reduce((a, b) -> a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder).build(); |
| // reset disableIO |
| disableIO.set(Boolean.FALSE); |
| if (!mobRefs.isEmpty()) { |
| calculateMobLengthMap(mobRefs); |
| } |
| LOG.info( |
| "Table={} cf={} region={}. I/O optimized MOB compaction. " |
| + "Total referenced MOB files: {}", |
| tableName, familyName, regionName, mobRefs.size()); |
| } catch (RuntimeException exception) { |
| throw new IOException("Failed to get list of referenced hfiles for request " + request, |
| exception); |
| } |
| } |
| } |
| |
| return compact(request, scannerFactory, writerFactory, throughputController, user); |
| } |
| |
| /** |
| * @param mobRefs multimap of original table name -> mob hfile |
| */ |
| private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException { |
| FileSystem fs = store.getFileSystem(); |
| HashMap<String, Long> map = mobLengthMap.get(); |
| map.clear(); |
| for (Entry<TableName, String> reference : mobRefs.entries()) { |
| final TableName table = reference.getKey(); |
| final String mobfile = reference.getValue(); |
| if (MobFileName.isOldMobFileName(mobfile)) { |
| disableIO.set(Boolean.TRUE); |
| } |
| List<Path> locations = mobStore.getLocations(table); |
| for (Path p : locations) { |
| try { |
| FileStatus st = fs.getFileStatus(new Path(p, mobfile)); |
| long size = st.getLen(); |
| LOG.debug("Referenced MOB file={} size={}", mobfile, size); |
| map.put(mobfile, size); |
| break; |
| } catch (FileNotFoundException exception) { |
| LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile, |
| p); |
| } |
| } |
| if (!map.containsKey(mobfile)) { |
| throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of " |
| + "expected locations: " + locations); |
| } |
| } |
| } |
| |
| /** |
| * Performs compaction on a column family with the mob flag enabled. This works only when MOB |
| * compaction is explicitly requested (by User), or by Master There are two modes of a MOB |
| * compaction:<br> |
| * <p> |
| * <ul> |
| * <li>1. Full mode - when all MOB data for a region is compacted into a single MOB file. |
| * <li>2. I/O optimized mode - for use cases with no or infrequent updates/deletes of a <br> |
| * MOB data. The main idea behind i/o optimized compaction is to limit maximum size of a MOB file |
| * produced during compaction and to limit I/O write/read amplification. |
| * </ul> |
| * The basic algorithm of compaction is the following: <br> |
| * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. |
| * <ol> |
| * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob, |
| * directly copy the (with mob tag) cell into the new store file.</li> |
| * <li>Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into the |
| * new store file.</li> |
| * </ol> |
| * 2. If the Put cell doesn't have a reference tag. |
| * <ol> |
| * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob, |
| * write this cell to a mob file, and write the path of this mob file to the store file.</li> |
| * <li>Otherwise, directly write this cell into the store file.</li> |
| * </ol> |
| * @param fd File details |
| * @param scanner Where to read from. |
| * @param writer Where to write to. |
| * @param smallestReadPoint Smallest read point. |
| * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= |
| * smallestReadPoint |
| * @param throughputController The compaction throughput controller. |
| * @param request compaction request. |
| * @param progress Progress reporter. |
| * @return Whether compaction ended; false if it was interrupted for any reason. |
| */ |
| @Override |
| protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, |
| long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, |
| CompactionRequestImpl request, CompactionProgress progress) throws IOException { |
| long bytesWrittenProgressForLog = 0; |
| long bytesWrittenProgressForShippedCall = 0; |
| // Clear old mob references |
| mobRefSet.get().clear(); |
| boolean isUserRequest = userRequest.get(); |
| boolean major = request.isAllFiles(); |
| boolean compactMOBs = major && isUserRequest; |
| boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, |
| MobConstants.DEFAULT_MOB_DISCARD_MISS); |
| if (discardMobMiss) { |
| LOG.warn("{}=true. This is unsafe setting recommended only when first upgrading to a version" |
| + " with the distributed mob compaction feature on a cluster that has experienced MOB data " |
| + "corruption.", MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY); |
| } |
| long maxMobFileSize = conf.getLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, |
| MobConstants.DEFAULT_MOB_COMPACTION_MAX_FILE_SIZE); |
| boolean ioOptimizedMode = this.ioOptimizedMode && !disableIO.get(); |
| LOG.info( |
| "Compact MOB={} optimized configured={} optimized enabled={} maximum MOB file size={}" |
| + " major={} store={}", |
| compactMOBs, this.ioOptimizedMode, ioOptimizedMode, maxMobFileSize, major, getStoreInfo()); |
| // Since scanner.next() can return 'false' but still be delivering data, |
| // we have to use a do/while loop. |
| List<Cell> cells = new ArrayList<>(); |
| // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME |
| long currentTime = EnvironmentEdgeManager.currentTime(); |
| long lastMillis = 0; |
| if (LOG.isDebugEnabled()) { |
| lastMillis = currentTime; |
| } |
| CloseChecker closeChecker = new CloseChecker(conf, currentTime); |
| String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); |
| long now = 0; |
| boolean hasMore; |
| byte[] fileName = null; |
| StoreFileWriter mobFileWriter = null; |
| /* |
| * mobCells are used only to decide if we need to commit or abort current MOB output file. |
| */ |
| long mobCells = 0; |
| long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; |
| long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; |
| boolean finished = false; |
| |
| ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) |
| .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, |
| compactScannerSizeLimit) |
| .build(); |
| throughputController.start(compactionName); |
| KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; |
| long shippedCallSizeLimit = |
| (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); |
| |
| Cell mobCell = null; |
| List<String> committedMobWriterFileNames = new ArrayList<>(); |
| try { |
| |
| mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); |
| fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); |
| |
| do { |
| hasMore = scanner.next(cells, scannerContext); |
| currentTime = EnvironmentEdgeManager.currentTime(); |
| if (LOG.isDebugEnabled()) { |
| now = currentTime; |
| } |
| if (closeChecker.isTimeLimit(store, currentTime)) { |
| progress.cancel(); |
| return false; |
| } |
| for (Cell c : cells) { |
| if (compactMOBs) { |
| if (MobUtils.isMobReferenceCell(c)) { |
| String fName = MobUtils.getMobFileName(c); |
| // Added to support migration |
| try { |
| mobCell = mobStore.resolve(c, true, false).getCell(); |
| } catch (DoNotRetryIOException e) { |
| if ( |
| discardMobMiss && e.getCause() != null |
| && e.getCause() instanceof FileNotFoundException |
| ) { |
| LOG.error("Missing MOB cell: file={} not found cell={}", fName, c); |
| continue; |
| } else { |
| throw e; |
| } |
| } |
| |
| if (discardMobMiss && mobCell.getValueLength() == 0) { |
| LOG.error("Missing MOB cell value: file={} mob cell={} cell={}", fName, mobCell, c); |
| continue; |
| } else if (mobCell.getValueLength() == 0) { |
| String errMsg = |
| String.format("Found 0 length MOB cell in a file=%s mob cell=%s " + " cell=%s", |
| fName, mobCell, c); |
| throw new IOException(errMsg); |
| } |
| |
| if (mobCell.getValueLength() > mobSizeThreshold) { |
| // put the mob data back to the MOB store file |
| PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); |
| if (!ioOptimizedMode) { |
| mobFileWriter.append(mobCell); |
| mobCells++; |
| writer.append( |
| MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); |
| } else { |
| // I/O optimized mode |
| // Check if MOB cell origin file size is |
| // greater than threshold |
| Long size = mobLengthMap.get().get(fName); |
| if (size == null) { |
| // FATAL error (we should never get here though), abort compaction |
| // This error means that meta section of store file does not contain |
| // MOB file, which has references in at least one cell from this store file |
| String msg = String.format( |
| "Found an unexpected MOB file during compaction %s, aborting compaction %s", |
| fName, getStoreInfo()); |
| throw new IOException(msg); |
| } |
| // Can not be null |
| if (size < maxMobFileSize) { |
| // If MOB cell origin file is below threshold |
| // it is get compacted |
| mobFileWriter.append(mobCell); |
| // Update number of mobCells in a current mob writer |
| mobCells++; |
| writer.append( |
| MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); |
| // Update total size of the output (we do not take into account |
| // file compression yet) |
| long len = mobFileWriter.getPos(); |
| if (len > maxMobFileSize) { |
| LOG.debug("Closing output MOB File, length={} file={}, store={}", len, |
| mobFileWriter.getPath().getName(), getStoreInfo()); |
| mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, |
| request, committedMobWriterFileNames); |
| fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); |
| mobCells = 0; |
| } |
| } else { |
| // We leave large MOB file as is (is not compacted), |
| // then we update set of MOB file references |
| // and append mob cell directly to the store's writer |
| Optional<TableName> refTable = MobUtils.getTableName(c); |
| if (refTable.isPresent()) { |
| mobRefSet.get().put(refTable.get(), fName); |
| writer.append(c); |
| } else { |
| throw new IOException(String.format("MOB cell did not contain a tablename " |
| + "tag. should not be possible. see ref guide on mob troubleshooting. " |
| + "store=%s cell=%s", getStoreInfo(), c)); |
| } |
| } |
| } |
| } else { |
| // If MOB value is less than threshold, append it directly to a store file |
| PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); |
| writer.append(mobCell); |
| cellsCountCompactedFromMob++; |
| cellsSizeCompactedFromMob += mobCell.getValueLength(); |
| } |
| } else { |
| // Not a MOB reference cell |
| int size = c.getValueLength(); |
| if (size > mobSizeThreshold) { |
| // This MOB cell comes from a regular store file |
| // therefore we store it into original mob output |
| mobFileWriter.append(c); |
| writer |
| .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags())); |
| mobCells++; |
| cellsCountCompactedToMob++; |
| cellsSizeCompactedToMob += c.getValueLength(); |
| if (ioOptimizedMode) { |
| // Update total size of the output (we do not take into account |
| // file compression yet) |
| long len = mobFileWriter.getPos(); |
| if (len > maxMobFileSize) { |
| mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, |
| request, committedMobWriterFileNames); |
| fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); |
| mobCells = 0; |
| } |
| } |
| } else { |
| // Not a MOB cell, write it directly to a store file |
| writer.append(c); |
| } |
| } |
| } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) { |
| // Not a major compaction or major with MOB disabled |
| // If the kv type is not put, directly write the cell |
| // to the store file. |
| writer.append(c); |
| } else if (MobUtils.isMobReferenceCell(c)) { |
| // Not a major MOB compaction, Put MOB reference |
| if (MobUtils.hasValidMobRefCellValue(c)) { |
| // We do not check mobSizeThreshold during normal compaction, |
| // leaving it to a MOB compaction run |
| Optional<TableName> refTable = MobUtils.getTableName(c); |
| if (refTable.isPresent()) { |
| mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); |
| writer.append(c); |
| } else { |
| throw new IOException(String.format("MOB cell did not contain a tablename " |
| + "tag. should not be possible. see ref guide on mob troubleshooting. " |
| + "store=%s cell=%s", getStoreInfo(), c)); |
| } |
| } else { |
| String errMsg = String.format("Corrupted MOB reference: %s", c.toString()); |
| throw new IOException(errMsg); |
| } |
| } else if (c.getValueLength() <= mobSizeThreshold) { |
| // If the value size of a cell is not larger than the threshold, directly write it to |
| // the store file. |
| writer.append(c); |
| } else { |
| // If the value size of a cell is larger than the threshold, it's regarded as a mob, |
| // write this cell to a mob file, and write the path to the store file. |
| mobCells++; |
| // append the original keyValue in the mob file. |
| mobFileWriter.append(c); |
| Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); |
| // write the cell whose value is the path of a mob file to the store file. |
| writer.append(reference); |
| cellsCountCompactedToMob++; |
| cellsSizeCompactedToMob += c.getValueLength(); |
| if (ioOptimizedMode) { |
| long len = mobFileWriter.getPos(); |
| if (len > maxMobFileSize) { |
| mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, request, |
| committedMobWriterFileNames); |
| fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); |
| mobCells = 0; |
| } |
| } |
| } |
| |
| int len = c.getSerializedSize(); |
| ++progress.currentCompactedKVs; |
| progress.totalCompactedSize += len; |
| bytesWrittenProgressForShippedCall += len; |
| if (LOG.isDebugEnabled()) { |
| bytesWrittenProgressForLog += len; |
| } |
| throughputController.control(compactionName, len); |
| if (closeChecker.isSizeLimit(store, len)) { |
| progress.cancel(); |
| return false; |
| } |
| if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { |
| ((ShipperListener) writer).beforeShipped(); |
| kvs.shipped(); |
| bytesWrittenProgressForShippedCall = 0; |
| } |
| } |
| // Log the progress of long running compactions every minute if |
| // logging at DEBUG level |
| if (LOG.isDebugEnabled()) { |
| if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { |
| String rate = String.format("%.2f", |
| (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); |
| LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", |
| compactionName, progress, rate, throughputController); |
| lastMillis = now; |
| bytesWrittenProgressForLog = 0; |
| } |
| } |
| cells.clear(); |
| } while (hasMore); |
| // Commit last MOB writer |
| commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); |
| finished = true; |
| } catch (InterruptedException e) { |
| progress.cancel(); |
| throw new InterruptedIOException( |
| "Interrupted while control throughput of compacting " + compactionName); |
| } catch (IOException t) { |
| String msg = "Mob compaction failed for region: " + store.getRegionInfo().getEncodedName(); |
| throw new IOException(msg, t); |
| } finally { |
| // Clone last cell in the final because writer will append last cell when committing. If |
| // don't clone here and once the scanner get closed, then the memory of last cell will be |
| // released. (HBASE-22582) |
| ((ShipperListener) writer).beforeShipped(); |
| throughputController.finish(compactionName); |
| if (!finished && mobFileWriter != null) { |
| // Remove all MOB references because compaction failed |
| clearThreadLocals(); |
| // Abort writer |
| LOG.debug("Aborting writer for {} because of a compaction failure, Store {}", |
| mobFileWriter.getPath(), getStoreInfo()); |
| abortWriter(mobFileWriter); |
| deleteCommittedMobFiles(committedMobWriterFileNames); |
| } |
| } |
| |
| mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); |
| mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); |
| mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); |
| mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); |
| progress.complete(); |
| return true; |
| } |
| |
| protected String getStoreInfo() { |
| return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(), |
| store.getColumnFamilyName(), store.getRegionInfo().getEncodedName()); |
| } |
| |
| private void clearThreadLocals() { |
| mobRefSet.get().clear(); |
| HashMap<String, Long> map = mobLengthMap.get(); |
| if (map != null) { |
| map.clear(); |
| } |
| } |
| |
| private StoreFileWriter newMobWriter(FileDetails fd, boolean major, |
| Consumer<Path> writerCreationTracker) throws IOException { |
| try { |
| StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst() |
| ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, |
| major ? majorCompactionCompression : minorCompactionCompression, |
| store.getRegionInfo().getStartKey(), true) |
| : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount, |
| major ? majorCompactionCompression : minorCompactionCompression, |
| store.getRegionInfo().getStartKey(), true, writerCreationTracker); |
| LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(), |
| getStoreInfo()); |
| // Add reference we get for compact MOB |
| mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); |
| return mobFileWriter; |
| } catch (IOException e) { |
| // Bailing out |
| throw new IOException(String.format("Failed to create mob writer, store=%s", getStoreInfo()), |
| e); |
| } |
| } |
| |
| private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId, long mobCells, |
| boolean major) throws IOException { |
| // Commit or abort major mob writer |
| // If IOException happens during below operation, some |
| // MOB files can be committed partially, but corresponding |
| // store file won't be committed, therefore these MOB files |
| // become orphans and will be deleted during next MOB cleaning chore cycle |
| |
| if (mobFileWriter != null) { |
| LOG.debug("Commit or abort size={} mobCells={} major={} file={}, store={}", |
| mobFileWriter.getPos(), mobCells, major, mobFileWriter.getPath().getName(), getStoreInfo()); |
| Path path = |
| MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); |
| if (mobCells > 0) { |
| // If the mob file is not empty, commit it. |
| mobFileWriter.appendMetadata(maxSeqId, major, mobCells); |
| mobFileWriter.close(); |
| mobStore.commitFile(mobFileWriter.getPath(), path); |
| } else { |
| // If the mob file is empty, delete it instead of committing. |
| LOG.debug("Aborting writer for {} because there are no MOB cells, store={}", |
| mobFileWriter.getPath(), getStoreInfo()); |
| // Remove MOB file from reference set |
| mobRefSet.get().remove(store.getTableName(), mobFileWriter.getPath().getName()); |
| abortWriter(mobFileWriter); |
| } |
| } else { |
| LOG.debug("Mob file writer is null, skipping commit/abort, store=", getStoreInfo()); |
| } |
| } |
| |
| @Override |
| protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd, |
| CompactionRequestImpl request) throws IOException { |
| List<Path> newFiles = Lists.newArrayList(writer.getPath()); |
| writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); |
| writer.appendMobMetadata(mobRefSet.get()); |
| writer.close(); |
| clearThreadLocals(); |
| return newFiles; |
| } |
| |
| private StoreFileWriter switchToNewMobWriter(StoreFileWriter mobFileWriter, FileDetails fd, |
| long mobCells, boolean major, CompactionRequestImpl request, |
| List<String> committedMobWriterFileNames) throws IOException { |
| commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); |
| committedMobWriterFileNames.add(mobFileWriter.getPath().getName()); |
| return newMobWriter(fd, major, request.getWriterCreationTracker()); |
| } |
| |
| private void deleteCommittedMobFiles(List<String> fileNames) { |
| if (fileNames.isEmpty()) { |
| return; |
| } |
| Path mobColumnFamilyPath = |
| MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); |
| for (String fileName : fileNames) { |
| if (fileName == null) { |
| continue; |
| } |
| Path path = new Path(mobColumnFamilyPath, fileName); |
| try { |
| if (store.getFileSystem().exists(path)) { |
| store.getFileSystem().delete(path, false); |
| } |
| } catch (IOException e) { |
| LOG.warn("Failed to delete the mob file {} for an failed mob compaction.", path, e); |
| } |
| } |
| |
| } |
| |
| } |