| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.procedure2.store.wal; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.Set; |
| import java.util.concurrent.LinkedTransferQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FSError; |
| import org.apache.hadoop.fs.FileAlreadyExistsException; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.fs.StreamCapabilities; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.log.HBaseMarkers; |
| import org.apache.hadoop.hbase.procedure2.Procedure; |
| import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; |
| import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; |
| import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; |
| import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; |
| import org.apache.hadoop.hbase.procedure2.util.ByteSlot; |
| import org.apache.hadoop.hbase.procedure2.util.StringUtils; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; |
| |
| /** |
| * WAL implementation of the ProcedureStore. |
| * <p/> |
| * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()}, |
| * then {@link #load(ProcedureLoader)}. |
| * <p/> |
| * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by |
| * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all |
| * the old wal files. |
| * <p/> |
| * FIXME: notice that the current recover lease implementation is problematic, it can not deal with |
| * the races if there are two master both wants to acquire the lease... |
| * <p/> |
| * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the |
| * comments of this method for more details. |
| * <p/> |
| * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is |
| * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete |
| * methods we will put thing into the {@link #slots} and wait. And there is a background sync |
| * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them |
| * to the FileSystem, and notify the caller that we have finished. |
| * <p/> |
| * TODO: try using disruptor to increase performance and simplify the logic? |
| * <p/> |
| * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is |
| * also the one being written currently. And the deleted bits in it are for all the procedures, not |
| * only the ones in the newest wal file. And when rolling a log, we will first store it in the |
| * trailer of the current wal file, and then reset its modified bits, so that it can start to track |
| * the modified procedures for the new wal file. |
| * <p/> |
| * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal |
| * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It |
| * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the |
| * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it |
| * with the tracker of every newer wal files, using the |
| * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. |
| * If we find out |
| * that all the modified procedures for the oldest wal file are modified or deleted in newer wal |
| * files, then we can delete it. This is because that, every time we call |
| * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will |
| * persist the full state of a Procedure, so the earlier wal records for this procedure can all be |
| * deleted. |
| * @see ProcedureWALPrettyPrinter for printing content of a single WAL. |
| * @see #main(String[]) to parse a directory of MasterWALProcs. |
| * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we |
| * use the new region based procedure store. |
| */ |
| @Deprecated |
| @InterfaceAudience.Private |
| public class WALProcedureStore extends ProcedureStoreBase { |
| private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class); |
| public static final String LOG_PREFIX = "pv2-"; |
| /** Used to construct the name of the log directory for master procedures */ |
| public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; |
| |
| |
| public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY = |
| "hbase.procedure.store.wal.warn.threshold"; |
| private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10; |
| |
| public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = |
| "hbase.procedure.store.wal.exec.cleanup.on.load"; |
| private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true; |
| |
| public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = |
| "hbase.procedure.store.wal.max.retries.before.roll"; |
| private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3; |
| |
| public static final String WAIT_BEFORE_ROLL_CONF_KEY = |
| "hbase.procedure.store.wal.wait.before.roll"; |
| private static final int DEFAULT_WAIT_BEFORE_ROLL = 500; |
| |
| public static final String ROLL_RETRIES_CONF_KEY = |
| "hbase.procedure.store.wal.max.roll.retries"; |
| private static final int DEFAULT_ROLL_RETRIES = 3; |
| |
| public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY = |
| "hbase.procedure.store.wal.sync.failure.roll.max"; |
| private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3; |
| |
| public static final String PERIODIC_ROLL_CONF_KEY = |
| "hbase.procedure.store.wal.periodic.roll.msec"; |
| private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h |
| |
| public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; |
| private static final int DEFAULT_SYNC_WAIT_MSEC = 100; |
| |
| public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync"; |
| private static final boolean DEFAULT_USE_HSYNC = true; |
| |
| public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold"; |
| private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M |
| |
| public static final String STORE_WAL_SYNC_STATS_COUNT = |
| "hbase.procedure.store.wal.sync.stats.count"; |
| private static final int DEFAULT_SYNC_STATS_COUNT = 10; |
| |
| private final LinkedList<ProcedureWALFile> logs = new LinkedList<>(); |
| private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker(); |
| private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); |
| private final ReentrantLock lock = new ReentrantLock(); |
| private final Condition waitCond = lock.newCondition(); |
| private final Condition slotCond = lock.newCondition(); |
| private final Condition syncCond = lock.newCondition(); |
| |
| private final LeaseRecovery leaseRecovery; |
| private final Configuration conf; |
| private final FileSystem fs; |
| private final Path walDir; |
| private final Path walArchiveDir; |
| private final boolean enforceStreamCapability; |
| |
| private final AtomicReference<Throwable> syncException = new AtomicReference<>(); |
| private final AtomicBoolean loading = new AtomicBoolean(true); |
| private final AtomicBoolean inSync = new AtomicBoolean(false); |
| private final AtomicLong totalSynced = new AtomicLong(0); |
| private final AtomicLong lastRollTs = new AtomicLong(0); |
| private final AtomicLong syncId = new AtomicLong(0); |
| |
| private LinkedTransferQueue<ByteSlot> slotsCache = null; |
| private Set<ProcedureWALFile> corruptedLogs = null; |
| private FSDataOutputStream stream = null; |
| private int runningProcCount = 1; |
| private long flushLogId = 0; |
| private int syncMaxSlot = 1; |
| private int slotIndex = 0; |
| private Thread syncThread; |
| private ByteSlot[] slots; |
| |
| private int walCountWarnThreshold; |
| private int maxRetriesBeforeRoll; |
| private int maxSyncFailureRoll; |
| private int waitBeforeRoll; |
| private int rollRetries; |
| private int periodicRollMsec; |
| private long rollThreshold; |
| private boolean useHsync; |
| private int syncWaitMsec; |
| |
| // Variables used for UI display |
| private CircularFifoQueue<SyncMetrics> syncMetricsQueue; |
| |
| public static class SyncMetrics { |
| private long timestamp; |
| private long syncWaitMs; |
| private long totalSyncedBytes; |
| private int syncedEntries; |
| private float syncedPerSec; |
| |
| public long getTimestamp() { |
| return timestamp; |
| } |
| |
| public long getSyncWaitMs() { |
| return syncWaitMs; |
| } |
| |
| public long getTotalSyncedBytes() { |
| return totalSyncedBytes; |
| } |
| |
| public long getSyncedEntries() { |
| return syncedEntries; |
| } |
| |
| public float getSyncedPerSec() { |
| return syncedPerSec; |
| } |
| } |
| |
| public WALProcedureStore(Configuration conf, LeaseRecovery leaseRecovery) throws IOException { |
| this(conf, new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR), |
| new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), |
| leaseRecovery); |
| } |
| |
| public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir, |
| final LeaseRecovery leaseRecovery) throws IOException { |
| this.conf = conf; |
| this.leaseRecovery = leaseRecovery; |
| this.walDir = walDir; |
| this.walArchiveDir = walArchiveDir; |
| this.fs = CommonFSUtils.getWALFileSystem(conf); |
| this.enforceStreamCapability = conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, |
| true); |
| |
| // Create the log directory for the procedure store |
| if (!fs.exists(walDir)) { |
| if (!fs.mkdirs(walDir)) { |
| throw new IOException("Unable to mkdir " + walDir); |
| } |
| } |
| // Now that it exists, set the log policy |
| String storagePolicy = |
| conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); |
| CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy); |
| |
| // Create archive dir up front. Rename won't work w/o it up on HDFS. |
| if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { |
| if (this.fs.mkdirs(this.walArchiveDir)) { |
| LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir); |
| } else { |
| LOG.warn("Failed create of {}", this.walArchiveDir); |
| } |
| } |
| } |
| |
| @Override |
| public void start(int numSlots) throws IOException { |
| if (!setRunning(true)) { |
| return; |
| } |
| |
| // Init buffer slots |
| loading.set(true); |
| runningProcCount = numSlots; |
| syncMaxSlot = numSlots; |
| slots = new ByteSlot[numSlots]; |
| slotsCache = new LinkedTransferQueue<>(); |
| while (slotsCache.size() < numSlots) { |
| slotsCache.offer(new ByteSlot()); |
| } |
| |
| // Tunings |
| walCountWarnThreshold = |
| conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD); |
| maxRetriesBeforeRoll = |
| conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL); |
| maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL); |
| waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL); |
| rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES); |
| rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); |
| periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL); |
| syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); |
| useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); |
| |
| // WebUI |
| syncMetricsQueue = new CircularFifoQueue<>( |
| conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT)); |
| |
| // Init sync thread |
| syncThread = new Thread("WALProcedureStoreSyncThread") { |
| @Override |
| public void run() { |
| try { |
| syncLoop(); |
| } catch (Throwable e) { |
| LOG.error("Got an exception from the sync-loop", e); |
| if (!isSyncAborted()) { |
| sendAbortProcessSignal(); |
| } |
| } |
| } |
| }; |
| syncThread.start(); |
| } |
| |
| @Override |
| public void stop(final boolean abort) { |
| if (!setRunning(false)) { |
| return; |
| } |
| |
| LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort + |
| (isSyncAborted() ? " (self aborting)" : "")); |
| sendStopSignal(); |
| if (!isSyncAborted()) { |
| try { |
| while (syncThread.isAlive()) { |
| sendStopSignal(); |
| syncThread.join(250); |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("join interrupted", e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| // Close the writer |
| closeCurrentLogStream(abort); |
| |
| // Close the old logs |
| // they should be already closed, this is just in case the load fails |
| // and we call start() and then stop() |
| for (ProcedureWALFile log: logs) { |
| log.close(); |
| } |
| logs.clear(); |
| loading.set(true); |
| } |
| |
| private void sendStopSignal() { |
| if (lock.tryLock()) { |
| try { |
| waitCond.signalAll(); |
| syncCond.signalAll(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| } |
| |
| @Override |
| public int getNumThreads() { |
| return slots == null ? 0 : slots.length; |
| } |
| |
| @Override |
| public int setRunningProcedureCount(final int count) { |
| this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; |
| return this.runningProcCount; |
| } |
| |
| public ProcedureStoreTracker getStoreTracker() { |
| return storeTracker; |
| } |
| |
| public ArrayList<ProcedureWALFile> getActiveLogs() { |
| lock.lock(); |
| try { |
| return new ArrayList<>(logs); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public Set<ProcedureWALFile> getCorruptedLogs() { |
| return corruptedLogs; |
| } |
| |
| @Override |
| public void recoverLease() throws IOException { |
| lock.lock(); |
| try { |
| LOG.debug("Starting WAL Procedure Store lease recovery"); |
| boolean afterFirstAttempt = false; |
| while (isRunning()) { |
| // Don't sleep before first attempt |
| if (afterFirstAttempt) { |
| LOG.trace("Sleep {} ms after first lease recovery attempt.", |
| waitBeforeRoll); |
| Threads.sleepWithoutInterrupt(waitBeforeRoll); |
| } else { |
| afterFirstAttempt = true; |
| } |
| FileStatus[] oldLogs = getLogFiles(); |
| // Get Log-MaxID and recover lease on old logs |
| try { |
| flushLogId = initOldLogs(oldLogs); |
| } catch (FileNotFoundException e) { |
| LOG.warn("Someone else is active and deleted logs. retrying.", e); |
| continue; |
| } |
| |
| // Create new state-log |
| if (!rollWriter(flushLogId + 1)) { |
| // someone else has already created this log |
| LOG.debug("Someone else has already created log {}. Retrying.", flushLogId); |
| continue; |
| } |
| |
| // We have the lease on the log |
| oldLogs = getLogFiles(); |
| if (getMaxLogId(oldLogs) > flushLogId) { |
| LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId); |
| logs.getLast().removeFile(this.walArchiveDir); |
| continue; |
| } |
| |
| LOG.debug("Lease acquired for flushLogId={}", flushLogId); |
| break; |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public void load(ProcedureLoader loader) throws IOException { |
| lock.lock(); |
| try { |
| if (logs.isEmpty()) { |
| throw new IllegalStateException("recoverLease() must be called before loading data"); |
| } |
| |
| // Nothing to do, If we have only the current log. |
| if (logs.size() == 1) { |
| LOG.debug("No state logs to replay."); |
| loader.setMaxProcId(0); |
| loading.set(false); |
| return; |
| } |
| |
| // Load the old logs |
| Iterator<ProcedureWALFile> it = logs.descendingIterator(); |
| it.next(); // Skip the current log |
| |
| ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { |
| |
| @Override |
| public void setMaxProcId(long maxProcId) { |
| loader.setMaxProcId(maxProcId); |
| } |
| |
| @Override |
| public void load(ProcedureIterator procIter) throws IOException { |
| loader.load(procIter); |
| } |
| |
| @Override |
| public void handleCorrupted(ProcedureIterator procIter) throws IOException { |
| loader.handleCorrupted(procIter); |
| } |
| |
| @Override |
| public void markCorruptedWAL(ProcedureWALFile log, IOException e) { |
| if (corruptedLogs == null) { |
| corruptedLogs = new HashSet<>(); |
| } |
| corruptedLogs.add(log); |
| // TODO: sideline corrupted log |
| } |
| }); |
| // if we fail when loading, we should prevent persisting the storeTracker later in the stop |
| // method. As it may happen that, we have finished constructing the modified and deleted bits, |
| // but before we call resetModified, we fail, then if we persist the storeTracker then when |
| // restarting, we will consider that all procedures have been included in this file and delete |
| // all the previous files. Obviously this not correct. So here we will only set loading to |
| // false when we successfully loaded all the procedures, and when closing we will skip |
| // persisting the store tracker. And also, this will prevent the sync thread to do |
| // periodicRoll, where we may also clean old logs. |
| loading.set(false); |
| // try to cleanup inactive wals and complete the operation |
| buildHoldingCleanupTracker(); |
| tryCleanupLogsOnLoad(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void tryCleanupLogsOnLoad() { |
| // nothing to cleanup. |
| if (logs.size() <= 1) { |
| return; |
| } |
| |
| // the config says to not cleanup wals on load. |
| if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, |
| DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)) { |
| LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs()); |
| return; |
| } |
| |
| try { |
| periodicRoll(); |
| } catch (IOException e) { |
| LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e); |
| } |
| } |
| |
| @Override |
| public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); |
| } |
| |
| ByteSlot slot = acquireSlot(); |
| try { |
| // Serialize the insert |
| long[] subProcIds = null; |
| if (subprocs != null) { |
| ProcedureWALFormat.writeInsert(slot, proc, subprocs); |
| subProcIds = new long[subprocs.length]; |
| for (int i = 0; i < subprocs.length; ++i) { |
| subProcIds[i] = subprocs[i].getProcId(); |
| } |
| } else { |
| assert !proc.hasParent(); |
| ProcedureWALFormat.writeInsert(slot, proc); |
| } |
| |
| // Push the transaction data and wait until it is persisted |
| pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds); |
| } catch (IOException e) { |
| // We are not able to serialize the procedure. |
| // this is a code error, and we are not able to go on. |
| LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" + |
| proc + ", subprocs=" + Arrays.toString(subprocs), e); |
| throw new RuntimeException(e); |
| } finally { |
| releaseSlot(slot); |
| } |
| } |
| |
| @Override |
| public void insert(Procedure<?>[] procs) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Insert " + Arrays.toString(procs)); |
| } |
| |
| ByteSlot slot = acquireSlot(); |
| try { |
| // Serialize the insert |
| long[] procIds = new long[procs.length]; |
| for (int i = 0; i < procs.length; ++i) { |
| assert !procs[i].hasParent(); |
| procIds[i] = procs[i].getProcId(); |
| ProcedureWALFormat.writeInsert(slot, procs[i]); |
| } |
| |
| // Push the transaction data and wait until it is persisted |
| pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds); |
| } catch (IOException e) { |
| // We are not able to serialize the procedure. |
| // this is a code error, and we are not able to go on. |
| LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: " + |
| Arrays.toString(procs), e); |
| throw new RuntimeException(e); |
| } finally { |
| releaseSlot(slot); |
| } |
| } |
| |
| @Override |
| public void update(Procedure<?> proc) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Update " + proc); |
| } |
| |
| ByteSlot slot = acquireSlot(); |
| try { |
| // Serialize the update |
| ProcedureWALFormat.writeUpdate(slot, proc); |
| |
| // Push the transaction data and wait until it is persisted |
| pushData(PushType.UPDATE, slot, proc.getProcId(), null); |
| } catch (IOException e) { |
| // We are not able to serialize the procedure. |
| // this is a code error, and we are not able to go on. |
| LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); |
| throw new RuntimeException(e); |
| } finally { |
| releaseSlot(slot); |
| } |
| } |
| |
| @Override |
| public void delete(long procId) { |
| LOG.trace("Delete {}", procId); |
| ByteSlot slot = acquireSlot(); |
| try { |
| // Serialize the delete |
| ProcedureWALFormat.writeDelete(slot, procId); |
| |
| // Push the transaction data and wait until it is persisted |
| pushData(PushType.DELETE, slot, procId, null); |
| } catch (IOException e) { |
| // We are not able to serialize the procedure. |
| // this is a code error, and we are not able to go on. |
| LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e); |
| throw new RuntimeException(e); |
| } finally { |
| releaseSlot(slot); |
| } |
| } |
| |
| @Override |
| public void delete(Procedure<?> proc, long[] subProcIds) { |
| assert proc != null : "expected a non-null procedure"; |
| assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds)); |
| } |
| |
| ByteSlot slot = acquireSlot(); |
| try { |
| // Serialize the delete |
| ProcedureWALFormat.writeDelete(slot, proc, subProcIds); |
| |
| // Push the transaction data and wait until it is persisted |
| pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds); |
| } catch (IOException e) { |
| // We are not able to serialize the procedure. |
| // this is a code error, and we are not able to go on. |
| LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); |
| throw new RuntimeException(e); |
| } finally { |
| releaseSlot(slot); |
| } |
| } |
| |
| @Override |
| public void delete(final long[] procIds, final int offset, final int count) { |
| if (count == 0) { |
| return; |
| } |
| |
| if (offset == 0 && count == procIds.length) { |
| delete(procIds); |
| } else if (count == 1) { |
| delete(procIds[offset]); |
| } else { |
| delete(Arrays.copyOfRange(procIds, offset, offset + count)); |
| } |
| } |
| |
| private void delete(long[] procIds) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Delete " + Arrays.toString(procIds)); |
| } |
| |
| final ByteSlot slot = acquireSlot(); |
| try { |
| // Serialize the delete |
| for (int i = 0; i < procIds.length; ++i) { |
| ProcedureWALFormat.writeDelete(slot, procIds[i]); |
| } |
| |
| // Push the transaction data and wait until it is persisted |
| pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds); |
| } catch (IOException e) { |
| // We are not able to serialize the procedure. |
| // this is a code error, and we are not able to go on. |
| LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e); |
| throw new RuntimeException(e); |
| } finally { |
| releaseSlot(slot); |
| } |
| } |
| |
| private ByteSlot acquireSlot() { |
| ByteSlot slot = slotsCache.poll(); |
| return slot != null ? slot : new ByteSlot(); |
| } |
| |
| private void releaseSlot(final ByteSlot slot) { |
| slot.reset(); |
| slotsCache.offer(slot); |
| } |
| |
| private enum PushType { INSERT, UPDATE, DELETE } |
| |
| private long pushData(final PushType type, final ByteSlot slot, |
| final long procId, final long[] subProcIds) { |
| if (!isRunning()) { |
| throw new RuntimeException("the store must be running before inserting data"); |
| } |
| if (logs.isEmpty()) { |
| throw new RuntimeException("recoverLease() must be called before inserting data"); |
| } |
| |
| long logId = -1; |
| lock.lock(); |
| try { |
| // Wait for the sync to be completed |
| while (true) { |
| if (!isRunning()) { |
| throw new RuntimeException("store no longer running"); |
| } else if (isSyncAborted()) { |
| throw new RuntimeException("sync aborted", syncException.get()); |
| } else if (inSync.get()) { |
| syncCond.await(); |
| } else if (slotIndex >= syncMaxSlot) { |
| slotCond.signal(); |
| syncCond.await(); |
| } else { |
| break; |
| } |
| } |
| |
| final long pushSyncId = syncId.get(); |
| updateStoreTracker(type, procId, subProcIds); |
| slots[slotIndex++] = slot; |
| logId = flushLogId; |
| |
| // Notify that there is new data |
| if (slotIndex == 1) { |
| waitCond.signal(); |
| } |
| |
| // Notify that the slots are full |
| if (slotIndex == syncMaxSlot) { |
| waitCond.signal(); |
| slotCond.signal(); |
| } |
| |
| while (pushSyncId == syncId.get() && isRunning()) { |
| syncCond.await(); |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| sendAbortProcessSignal(); |
| throw new RuntimeException(e); |
| } finally { |
| lock.unlock(); |
| if (isSyncAborted()) { |
| throw new RuntimeException("sync aborted", syncException.get()); |
| } |
| } |
| return logId; |
| } |
| |
| private void updateStoreTracker(final PushType type, |
| final long procId, final long[] subProcIds) { |
| switch (type) { |
| case INSERT: |
| if (subProcIds == null) { |
| storeTracker.insert(procId); |
| } else if (procId == Procedure.NO_PROC_ID) { |
| storeTracker.insert(subProcIds); |
| } else { |
| storeTracker.insert(procId, subProcIds); |
| holdingCleanupTracker.setDeletedIfModified(procId); |
| } |
| break; |
| case UPDATE: |
| storeTracker.update(procId); |
| holdingCleanupTracker.setDeletedIfModified(procId); |
| break; |
| case DELETE: |
| if (subProcIds != null && subProcIds.length > 0) { |
| storeTracker.delete(subProcIds); |
| holdingCleanupTracker.setDeletedIfModified(subProcIds); |
| } else { |
| storeTracker.delete(procId); |
| holdingCleanupTracker.setDeletedIfModified(procId); |
| } |
| break; |
| default: |
| throw new RuntimeException("invalid push type " + type); |
| } |
| } |
| |
| private boolean isSyncAborted() { |
| return syncException.get() != null; |
| } |
| |
| private void syncLoop() throws Throwable { |
| long totalSyncedToStore = 0; |
| inSync.set(false); |
| lock.lock(); |
| try { |
| while (isRunning()) { |
| try { |
| // Wait until new data is available |
| if (slotIndex == 0) { |
| if (!loading.get()) { |
| periodicRoll(); |
| } |
| |
| if (LOG.isTraceEnabled()) { |
| float rollTsSec = getMillisFromLastRoll() / 1000.0f; |
| LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", |
| StringUtils.humanSize(totalSynced.get()), |
| StringUtils.humanSize(totalSynced.get() / rollTsSec))); |
| } |
| |
| waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); |
| if (slotIndex == 0) { |
| // no data.. probably a stop() or a periodic roll |
| continue; |
| } |
| } |
| // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing |
| syncMaxSlot = runningProcCount; |
| assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot; |
| final long syncWaitSt = System.currentTimeMillis(); |
| if (slotIndex != syncMaxSlot) { |
| slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); |
| } |
| |
| final long currentTs = System.currentTimeMillis(); |
| final long syncWaitMs = currentTs - syncWaitSt; |
| final float rollSec = getMillisFromLastRoll() / 1000.0f; |
| final float syncedPerSec = totalSyncedToStore / rollSec; |
| if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) { |
| LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", |
| StringUtils.humanTimeDiff(syncWaitMs), slotIndex, |
| StringUtils.humanSize(totalSyncedToStore), |
| StringUtils.humanSize(syncedPerSec))); |
| } |
| |
| // update webui circular buffers (TODO: get rid of allocations) |
| final SyncMetrics syncMetrics = new SyncMetrics(); |
| syncMetrics.timestamp = currentTs; |
| syncMetrics.syncWaitMs = syncWaitMs; |
| syncMetrics.syncedEntries = slotIndex; |
| syncMetrics.totalSyncedBytes = totalSyncedToStore; |
| syncMetrics.syncedPerSec = syncedPerSec; |
| syncMetricsQueue.add(syncMetrics); |
| |
| // sync |
| inSync.set(true); |
| long slotSize = syncSlots(); |
| logs.getLast().addToSize(slotSize); |
| totalSyncedToStore = totalSynced.addAndGet(slotSize); |
| slotIndex = 0; |
| inSync.set(false); |
| syncId.incrementAndGet(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| syncException.compareAndSet(null, e); |
| sendAbortProcessSignal(); |
| throw e; |
| } catch (Throwable t) { |
| syncException.compareAndSet(null, t); |
| sendAbortProcessSignal(); |
| throw t; |
| } finally { |
| syncCond.signalAll(); |
| } |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public ArrayList<SyncMetrics> getSyncMetrics() { |
| lock.lock(); |
| try { |
| return new ArrayList<>(syncMetricsQueue); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private long syncSlots() throws Throwable { |
| int retry = 0; |
| int logRolled = 0; |
| long totalSynced = 0; |
| do { |
| try { |
| totalSynced = syncSlots(stream, slots, 0, slotIndex); |
| break; |
| } catch (Throwable e) { |
| LOG.warn("unable to sync slots, retry=" + retry); |
| if (++retry >= maxRetriesBeforeRoll) { |
| if (logRolled >= maxSyncFailureRoll && isRunning()) { |
| LOG.error("Sync slots after log roll failed, abort.", e); |
| throw e; |
| } |
| |
| if (!rollWriterWithRetries()) { |
| throw e; |
| } |
| |
| logRolled++; |
| retry = 0; |
| } |
| } |
| } while (isRunning()); |
| return totalSynced; |
| } |
| |
| protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots, |
| final int offset, final int count) throws IOException { |
| long totalSynced = 0; |
| for (int i = 0; i < count; ++i) { |
| final ByteSlot data = slots[offset + i]; |
| data.writeTo(stream); |
| totalSynced += data.size(); |
| } |
| |
| syncStream(stream); |
| sendPostSyncSignal(); |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Sync slots=" + count + '/' + syncMaxSlot + |
| ", flushed=" + StringUtils.humanSize(totalSynced)); |
| } |
| return totalSynced; |
| } |
| |
| protected void syncStream(final FSDataOutputStream stream) throws IOException { |
| if (useHsync) { |
| stream.hsync(); |
| } else { |
| stream.hflush(); |
| } |
| } |
| |
| private boolean rollWriterWithRetries() { |
| for (int i = 0; i < rollRetries && isRunning(); ++i) { |
| if (i > 0) { |
| Threads.sleepWithoutInterrupt(waitBeforeRoll * i); |
| } |
| |
| try { |
| if (rollWriter()) { |
| return true; |
| } |
| } catch (IOException e) { |
| LOG.warn("Unable to roll the log, attempt=" + (i + 1), e); |
| } |
| } |
| LOG.error(HBaseMarkers.FATAL, "Unable to roll the log"); |
| return false; |
| } |
| |
| private boolean tryRollWriter() { |
| try { |
| return rollWriter(); |
| } catch (IOException e) { |
| LOG.warn("Unable to roll the log", e); |
| return false; |
| } |
| } |
| |
| public long getMillisToNextPeriodicRoll() { |
| if (lastRollTs.get() > 0 && periodicRollMsec > 0) { |
| return periodicRollMsec - getMillisFromLastRoll(); |
| } |
| return Long.MAX_VALUE; |
| } |
| |
| public long getMillisFromLastRoll() { |
| return (System.currentTimeMillis() - lastRollTs.get()); |
| } |
| |
| void periodicRollForTesting() throws IOException { |
| lock.lock(); |
| try { |
| periodicRoll(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public boolean rollWriterForTesting() throws IOException { |
| lock.lock(); |
| try { |
| return rollWriter(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| void removeInactiveLogsForTesting() throws Exception { |
| lock.lock(); |
| try { |
| removeInactiveLogs(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void periodicRoll() throws IOException { |
| if (storeTracker.isEmpty()) { |
| LOG.trace("no active procedures"); |
| tryRollWriter(); |
| removeAllLogs(flushLogId - 1, "no active procedures"); |
| } else { |
| if (storeTracker.isAllModified()) { |
| LOG.trace("all the active procedures are in the latest log"); |
| removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log"); |
| } |
| |
| // if the log size has exceeded the roll threshold |
| // or the periodic roll timeout is expired, try to roll the wal. |
| if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { |
| tryRollWriter(); |
| } |
| |
| removeInactiveLogs(); |
| } |
| } |
| |
| private boolean rollWriter() throws IOException { |
| if (!isRunning()) { |
| return false; |
| } |
| |
| // Create new state-log |
| if (!rollWriter(flushLogId + 1)) { |
| LOG.warn("someone else has already created log {}", flushLogId); |
| return false; |
| } |
| |
| // We have the lease on the log, |
| // but we should check if someone else has created new files |
| if (getMaxLogId(getLogFiles()) > flushLogId) { |
| LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId); |
| logs.getLast().removeFile(this.walArchiveDir); |
| return false; |
| } |
| |
| // We have the lease on the log |
| return true; |
| } |
| |
| boolean rollWriter(long logId) throws IOException { |
| assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; |
| assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); |
| |
| ProcedureWALHeader header = ProcedureWALHeader.newBuilder() |
| .setVersion(ProcedureWALFormat.HEADER_VERSION) |
| .setType(ProcedureWALFormat.LOG_TYPE_STREAM) |
| .setMinProcId(storeTracker.getActiveMinProcId()) |
| .setLogId(logId) |
| .build(); |
| |
| FSDataOutputStream newStream = null; |
| Path newLogFile = null; |
| long startPos = -1; |
| newLogFile = getLogFilePath(logId); |
| try { |
| newStream = CommonFSUtils.createForWal(fs, newLogFile, false); |
| } catch (FileAlreadyExistsException e) { |
| LOG.error("Log file with id={} already exists", logId, e); |
| return false; |
| } catch (RemoteException re) { |
| LOG.warn("failed to create log file with id={}", logId, re); |
| return false; |
| } |
| // After we create the stream but before we attempt to use it at all |
| // ensure that we can provide the level of data safety we're configured |
| // to provide. |
| final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH; |
| if (enforceStreamCapability && !newStream.hasCapability(durability)) { |
| throw new IllegalStateException("The procedure WAL relies on the ability to " + durability + |
| " for proper operation during component failures, but the underlying filesystem does " + |
| "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY + |
| "' to set the desired level of robustness and ensure the config value of '" + |
| CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it."); |
| } |
| try { |
| ProcedureWALFormat.writeHeader(newStream, header); |
| startPos = newStream.getPos(); |
| } catch (IOException ioe) { |
| LOG.warn("Encountered exception writing header", ioe); |
| newStream.close(); |
| return false; |
| } |
| |
| closeCurrentLogStream(false); |
| |
| storeTracker.resetModified(); |
| stream = newStream; |
| flushLogId = logId; |
| totalSynced.set(0); |
| long rollTs = System.currentTimeMillis(); |
| lastRollTs.set(rollTs); |
| logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs)); |
| |
| // if it's the first next WAL being added, build the holding cleanup tracker |
| if (logs.size() == 2) { |
| buildHoldingCleanupTracker(); |
| } else if (logs.size() > walCountWarnThreshold) { |
| LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" + |
| " to see if something is stuck.", logs.size(), walCountWarnThreshold); |
| // This is just like what we have done at RS side when there are too many wal files. For RS, |
| // if there are too many wal files, we will find out the wal entries in the oldest file, and |
| // tell the upper layer to flush these regions so the wal entries will be useless and then we |
| // can delete the wal file. For WALProcedureStore, the assumption is that, if all the |
| // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then |
| // we are safe to delete it. So here if there are too many proc wal files, we will find out |
| // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc |
| // wal files, and tell upper layer to update the state of these procedures to the newest proc |
| // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal |
| // file. |
| sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds()); |
| } |
| |
| LOG.info("Rolled new Procedure Store WAL, id={}", logId); |
| return true; |
| } |
| |
| private void closeCurrentLogStream(boolean abort) { |
| if (stream == null || logs.isEmpty()) { |
| return; |
| } |
| |
| try { |
| ProcedureWALFile log = logs.getLast(); |
| // If the loading flag is true, it usually means that we fail when loading procedures, so we |
| // should not persist the store tracker, as its state may not be correct. |
| if (!loading.get()) { |
| log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); |
| log.updateLocalTracker(storeTracker); |
| if (!abort) { |
| long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); |
| log.addToSize(trailerSize); |
| } |
| } |
| } catch (IOException | FSError e) { |
| LOG.warn("Unable to write the trailer", e); |
| } |
| try { |
| stream.close(); |
| } catch (IOException | FSError e) { |
| LOG.error("Unable to close the stream", e); |
| } |
| stream = null; |
| } |
| |
| // ========================================================================== |
| // Log Files cleaner helpers |
| // ========================================================================== |
| private void removeInactiveLogs() throws IOException { |
| // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. |
| // once there is nothing olding the oldest WAL we can remove it. |
| while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { |
| LOG.info("Remove the oldest log {}", logs.getFirst()); |
| removeLogFile(logs.getFirst(), walArchiveDir); |
| buildHoldingCleanupTracker(); |
| } |
| |
| // TODO: In case we are holding up a lot of logs for long time we should |
| // rewrite old procedures (in theory parent procs) to the new WAL. |
| } |
| |
| private void buildHoldingCleanupTracker() { |
| if (logs.size() <= 1) { |
| // we only have one wal, so nothing to do |
| holdingCleanupTracker.reset(); |
| return; |
| } |
| |
| // compute the holding tracker. |
| // - the first WAL is used for the 'updates' |
| // - the global tracker will be used to determine whether a procedure has been deleted |
| // - other trackers will be used to determine whether a procedure has been updated, as a deleted |
| // procedure can always be detected by checking the global tracker, we can save the deleted |
| // checks when applying other trackers |
| holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); |
| holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker); |
| // the logs is a linked list, so avoid calling get(index) on it. |
| Iterator<ProcedureWALFile> iter = logs.iterator(); |
| // skip the tracker for the first file when creating the iterator. |
| iter.next(); |
| ProcedureStoreTracker tracker = iter.next().getTracker(); |
| // testing iter.hasNext after calling iter.next to skip applying the tracker for last file, |
| // which is just the storeTracker above. |
| while (iter.hasNext()) { |
| holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker); |
| if (holdingCleanupTracker.isEmpty()) { |
| break; |
| } |
| tracker = iter.next().getTracker(); |
| } |
| } |
| |
| /** |
| * Remove all logs with logId <= {@code lastLogId}. |
| */ |
| private void removeAllLogs(long lastLogId, String why) { |
| if (logs.size() <= 1) { |
| return; |
| } |
| |
| LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why); |
| |
| boolean removed = false; |
| while (logs.size() > 1) { |
| ProcedureWALFile log = logs.getFirst(); |
| if (lastLogId < log.getLogId()) { |
| break; |
| } |
| removeLogFile(log, walArchiveDir); |
| removed = true; |
| } |
| |
| if (removed) { |
| buildHoldingCleanupTracker(); |
| } |
| } |
| |
| private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { |
| try { |
| LOG.trace("Removing log={}", log); |
| log.removeFile(walArchiveDir); |
| logs.remove(log); |
| LOG.debug("Removed log={}, activeLogs={}", log, logs); |
| assert logs.size() > 0 : "expected at least one log"; |
| } catch (IOException e) { |
| LOG.error("Unable to remove log: " + log, e); |
| return false; |
| } |
| return true; |
| } |
| |
| // ========================================================================== |
| // FileSystem Log Files helpers |
| // ========================================================================== |
| public Path getWALDir() { |
| return this.walDir; |
| } |
| |
| Path getWalArchiveDir() { |
| return this.walArchiveDir; |
| } |
| |
| public FileSystem getFileSystem() { |
| return this.fs; |
| } |
| |
| protected Path getLogFilePath(final long logId) throws IOException { |
| return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId)); |
| } |
| |
| private static long getLogIdFromName(final String name) { |
| int end = name.lastIndexOf(".log"); |
| int start = name.lastIndexOf('-') + 1; |
| return Long.parseLong(name.substring(start, end)); |
| } |
| |
| private static final PathFilter WALS_PATH_FILTER = new PathFilter() { |
| @Override |
| public boolean accept(Path path) { |
| String name = path.getName(); |
| return name.startsWith(LOG_PREFIX) && name.endsWith(".log"); |
| } |
| }; |
| |
| private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR = |
| new Comparator<FileStatus>() { |
| @Override |
| public int compare(FileStatus a, FileStatus b) { |
| final long aId = getLogIdFromName(a.getPath().getName()); |
| final long bId = getLogIdFromName(b.getPath().getName()); |
| return Long.compare(aId, bId); |
| } |
| }; |
| |
| private FileStatus[] getLogFiles() throws IOException { |
| try { |
| FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER); |
| Arrays.sort(files, FILE_STATUS_ID_COMPARATOR); |
| return files; |
| } catch (FileNotFoundException e) { |
| LOG.warn("Log directory not found: " + e.getMessage()); |
| return null; |
| } |
| } |
| |
| /** |
| * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort |
| * the file set by log id. |
| * @return Max-LogID of the specified log file set |
| */ |
| private static long getMaxLogId(FileStatus[] logFiles) { |
| if (logFiles == null || logFiles.length == 0) { |
| return 0L; |
| } |
| return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName()); |
| } |
| |
| /** |
| * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort |
| * the file set by log id. |
| * @return Max-LogID of the specified log file set |
| */ |
| private long initOldLogs(FileStatus[] logFiles) throws IOException { |
| if (logFiles == null || logFiles.length == 0) { |
| return 0L; |
| } |
| long maxLogId = 0; |
| for (int i = 0; i < logFiles.length; ++i) { |
| final Path logPath = logFiles[i].getPath(); |
| leaseRecovery.recoverFileLease(fs, logPath); |
| if (!isRunning()) { |
| throw new IOException("wal aborting"); |
| } |
| |
| maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); |
| ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); |
| if (log != null) { |
| this.logs.add(log); |
| } |
| } |
| initTrackerFromOldLogs(); |
| return maxLogId; |
| } |
| |
| /** |
| * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker |
| * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log. |
| */ |
| private void initTrackerFromOldLogs() { |
| if (logs.isEmpty() || !isRunning()) { |
| return; |
| } |
| ProcedureWALFile log = logs.getLast(); |
| if (!log.getTracker().isPartial()) { |
| storeTracker.resetTo(log.getTracker()); |
| } else { |
| storeTracker.reset(); |
| storeTracker.setPartialFlag(true); |
| } |
| } |
| |
| /** |
| * Loads given log file and it's tracker. |
| */ |
| private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) |
| throws IOException { |
| final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); |
| if (logFile.getLen() == 0) { |
| LOG.warn("Remove uninitialized log: {}", logFile); |
| log.removeFile(walArchiveDir); |
| return null; |
| } |
| LOG.debug("Opening Pv2 {}", logFile); |
| try { |
| log.open(); |
| } catch (ProcedureWALFormat.InvalidWALDataException e) { |
| LOG.warn("Remove uninitialized log: {}", logFile, e); |
| log.removeFile(walArchiveDir); |
| return null; |
| } catch (IOException e) { |
| String msg = "Unable to read state log: " + logFile; |
| LOG.error(msg, e); |
| throw new IOException(msg, e); |
| } |
| |
| try { |
| log.readTracker(); |
| } catch (IOException e) { |
| log.getTracker().reset(); |
| log.getTracker().setPartialFlag(true); |
| LOG.warn("Unable to read tracker for {}", log, e); |
| } |
| |
| log.close(); |
| return log; |
| } |
| |
| /** |
| * Parses a directory of WALs building up ProcedureState. |
| * For testing parse and profiling. |
| * @param args Include pointer to directory of WAL files for a store instance to parse & load. |
| */ |
| public static void main(String [] args) throws IOException { |
| Configuration conf = HBaseConfiguration.create(); |
| if (args == null || args.length != 1) { |
| System.out.println("ERROR: Empty arguments list; pass path to MASTERPROCWALS_DIR."); |
| System.out.println("Usage: WALProcedureStore MASTERPROCWALS_DIR"); |
| System.exit(-1); |
| } |
| WALProcedureStore store = new WALProcedureStore(conf, new Path(args[0]), null, |
| new LeaseRecovery() { |
| @Override |
| public void recoverFileLease(FileSystem fs, Path path) throws IOException { |
| // no-op |
| } |
| }); |
| try { |
| store.start(16); |
| ProcedureExecutor<?> pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store); |
| pe.init(1, true); |
| } finally { |
| store.stop(true); |
| } |
| } |
| } |