| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.master.region; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.Abortable; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.log.HBaseMarkers; |
| import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore; |
| import org.apache.hadoop.hbase.regionserver.HRegion; |
| import org.apache.hadoop.hbase.regionserver.HStore; |
| import org.apache.hadoop.hbase.regionserver.Store; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.HFileArchiveUtil; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * As long as there is no RegionServerServices for a master local region, we need implement the |
| * flush and compaction logic by our own. |
| * <p/> |
| * The flush logic is very simple, every time after calling a modification method in |
| * {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this |
| * method, we will check the memstore size and if it is above the flush size, we will call |
| * {@link HRegion#flush(boolean)} to force flush all stores. |
| * <p/> |
| * And for compaction, the logic is also very simple. After flush, we will check the store file |
| * count, if it is above the compactMin, we will do a major compaction. |
| */ |
| @InterfaceAudience.Private |
| class MasterRegionFlusherAndCompactor implements Closeable { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MasterRegionFlusherAndCompactor.class); |
| |
| private final Configuration conf; |
| |
| private final Abortable abortable; |
| |
| private final HRegion region; |
| |
| // as we can only count this outside the region's write/flush process so it is not accurate, but |
| // it is enough. |
| private final AtomicLong changesAfterLastFlush = new AtomicLong(0); |
| |
| private final long flushSize; |
| |
| private final long flushPerChanges; |
| |
| private final long flushIntervalMs; |
| |
| private final int compactMin; |
| |
| private final Path globalArchivePath; |
| |
| private final String archivedHFileSuffix; |
| |
| private final Thread flushThread; |
| |
| private final Lock flushLock = new ReentrantLock(); |
| |
| private final Condition flushCond = flushLock.newCondition(); |
| |
| private boolean flushRequest = false; |
| |
| private long lastFlushTime; |
| |
| private final ExecutorService compactExecutor; |
| |
| private final Lock compactLock = new ReentrantLock(); |
| |
| private boolean compactRequest = false; |
| |
| private volatile boolean closed = false; |
| |
| MasterRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region, |
| long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin, |
| Path globalArchivePath, String archivedHFileSuffix) { |
| this.conf = conf; |
| this.abortable = abortable; |
| this.region = region; |
| this.flushSize = flushSize; |
| this.flushPerChanges = flushPerChanges; |
| this.flushIntervalMs = flushIntervalMs; |
| this.compactMin = compactMin; |
| this.globalArchivePath = globalArchivePath; |
| this.archivedHFileSuffix = archivedHFileSuffix; |
| flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher"); |
| flushThread.setDaemon(true); |
| flushThread.start(); |
| compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() |
| .setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true) |
| .build()); |
| LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}", |
| flushSize, flushPerChanges, flushIntervalMs, compactMin); |
| } |
| |
| // inject our flush related configurations |
| static void setupConf(Configuration conf, long flushSize, long flushPerChanges, |
| long flushIntervalMs) { |
| conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize); |
| conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges); |
| conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs); |
| LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize, |
| flushPerChanges, flushIntervalMs); |
| } |
| |
| private void moveHFileToGlobalArchiveDir() throws IOException { |
| FileSystem fs = region.getRegionFileSystem().getFileSystem(); |
| for (HStore store : region.getStores()) { |
| store.closeAndArchiveCompactedFiles(); |
| Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, region.getRegionInfo(), |
| store.getColumnFamilyDescriptor().getName()); |
| Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath( |
| globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName()); |
| try { |
| if (fs.exists(storeArchiveDir)) { |
| MasterRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir, |
| archivedHFileSuffix); |
| } else { |
| LOG.warn( |
| "Archived dir {} does not exist, there is no need to move archived hfiles from {} " |
| + "to global dir {} .", |
| storeArchiveDir, storeArchiveDir, globalStoreArchiveDir); |
| } |
| } catch (IOException e) { |
| LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir, |
| globalStoreArchiveDir, e); |
| } |
| } |
| } |
| |
| private void compact() { |
| try { |
| region.compact(true); |
| moveHFileToGlobalArchiveDir(); |
| } catch (IOException e) { |
| LOG.error("Failed to compact master local region", e); |
| } |
| compactLock.lock(); |
| try { |
| if (needCompaction()) { |
| compactExecutor.execute(this::compact); |
| } else { |
| compactRequest = false; |
| } |
| } finally { |
| compactLock.unlock(); |
| } |
| } |
| |
| private boolean needCompaction() { |
| for (Store store : region.getStores()) { |
| if (store.getStorefilesCount() >= compactMin) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private void flushLoop() { |
| recordLastFlushTime(); |
| while (!closed) { |
| flushLock.lock(); |
| try { |
| while (!flushRequest) { |
| long waitTimeMs = lastFlushTime + flushIntervalMs - EnvironmentEdgeManager.currentTime(); |
| if (waitTimeMs <= 0) { |
| flushRequest = true; |
| break; |
| } |
| flushCond.await(waitTimeMs, TimeUnit.MILLISECONDS); |
| if (closed) { |
| return; |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| continue; |
| } finally { |
| flushLock.unlock(); |
| } |
| assert flushRequest; |
| resetChangesAfterLastFlush(); |
| try { |
| region.flush(true); |
| recordLastFlushTime(); |
| } catch (IOException e) { |
| LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e); |
| abortable.abort("Failed to flush master local region", e); |
| return; |
| } |
| compactLock.lock(); |
| try { |
| if (!compactRequest && needCompaction()) { |
| compactRequest = true; |
| compactExecutor.execute(this::compact); |
| } |
| } finally { |
| compactLock.unlock(); |
| } |
| flushLock.lock(); |
| try { |
| // reset the flushRequest flag |
| if (!shouldFlush(changesAfterLastFlush.get())) { |
| flushRequest = false; |
| } |
| } finally { |
| flushLock.unlock(); |
| } |
| } |
| } |
| |
| private boolean shouldFlush(long changes) { |
| long heapSize = region.getMemStoreHeapSize(); |
| long offHeapSize = region.getMemStoreOffHeapSize(); |
| boolean flush = heapSize + offHeapSize >= flushSize || changes > flushPerChanges; |
| if (flush && LOG.isTraceEnabled()) { |
| LOG.trace("shouldFlush totalMemStoreSize={}, flushSize={}, changes={}, flushPerChanges={}", |
| heapSize + offHeapSize, flushSize, changes, flushPerChanges); |
| } |
| return flush; |
| } |
| |
| void onUpdate() { |
| long changes = changesAfterLastFlush.incrementAndGet(); |
| if (shouldFlush(changes)) { |
| requestFlush(); |
| } |
| } |
| |
| void requestFlush() { |
| flushLock.lock(); |
| try { |
| if (flushRequest) { |
| return; |
| } |
| flushRequest = true; |
| flushCond.signalAll(); |
| } finally { |
| flushLock.unlock(); |
| } |
| } |
| |
| void resetChangesAfterLastFlush() { |
| changesAfterLastFlush.set(0); |
| } |
| |
| void recordLastFlushTime() { |
| lastFlushTime = EnvironmentEdgeManager.currentTime(); |
| } |
| |
| @Override |
| public void close() { |
| closed = true; |
| flushThread.interrupt(); |
| compactExecutor.shutdown(); |
| } |
| } |