blob: 3cd7fffc587aad012437f8f7ce4c6e32394f004c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.server;
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.util.AutoCloseableLock;
/**
* An in-memory cache of edits in their serialized form. This is used to serve
* the {@link Journal#getJournaledEdits(long, int)} call, used by the
* QJM when {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} is
* enabled.
*
* <p>When a batch of edits is received by the JournalNode, it is put into this
* cache via {@link #storeEdits(byte[], long, long, int)}. Edits must be
* stored contiguously; if a batch of edits is stored that does not align with
* the previously stored edits, the cache will be cleared before storing new
* edits to avoid gaps. This decision is made because gaps are only handled
* when in recovery mode, which the cache is not intended to be used for.
*
* <p>Batches of edits are stored in a {@link TreeMap} mapping the starting
* transaction ID of the batch to the data buffer. Upon retrieval, the
* relevant data buffers are concatenated together and a header is added
* to construct a fully-formed edit data stream.
*
* <p>The cache is of a limited size capacity determined by
* {@value DFSConfigKeys#DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY}. If the capacity
* is exceeded after adding a new batch of edits, batches of edits are removed
* until the total size is less than the capacity, starting from the ones
* containing the oldest transactions. Transactions range in size, but a
* decent rule of thumb is that 200 bytes are needed per transaction. Monitoring
* the {@link JournalMetrics#rpcRequestCacheMissAmount} metric is recommended
* to determine if the cache is too small; it will indicate both how many
* cache misses occurred, and how many more transactions would have been
* needed in the cache to serve the request.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class JournaledEditsCache {
private static final int INVALID_LAYOUT_VERSION = 0;
private static final long INVALID_TXN_ID = -1;
/** The capacity, in bytes, of this cache. */
private final int capacity;
/**
* Read/write lock pair wrapped in AutoCloseable; these refer to the same
* underlying lock.
*/
private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock;
// ** Start lock-protected fields **
/**
* Stores the actual data as a mapping of the StartTxnId of a batch of edits
* to the serialized batch of edits. Stores only contiguous ranges; that is,
* the last transaction ID in one batch is always one less than the first
* transaction ID in the next batch. Though the map is protected by the lock,
* individual data buffers are immutable and can be accessed without locking.
*/
private final NavigableMap<Long, byte[]> dataMap = new TreeMap<>();
/** Stores the layout version currently present in the cache. */
private int layoutVersion = INVALID_LAYOUT_VERSION;
/** Stores the serialized version of the header for the current version. */
private ByteBuffer layoutHeader;
/**
* The lowest/highest transaction IDs present in the cache.
* {@value INVALID_TXN_ID} if there are no transactions in the cache.
*/
private long lowestTxnId;
private long highestTxnId;
/**
* The lowest transaction ID that was ever present in the cache since last
* being reset (i.e. since initialization or since reset due to being out of
* sync with the Journal). Until the cache size goes above capacity, this is
* equal to lowestTxnId.
*/
private long initialTxnId;
/** The current total size of all buffers in this cache. */
private int totalSize;
// ** End lock-protected fields **
JournaledEditsCache(Configuration conf) {
capacity = conf.getInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT);
if (capacity > 0.9 * Runtime.getRuntime().maxMemory()) {
Journal.LOG.warn(String.format("Cache capacity is set at %d bytes but " +
"maximum JVM memory is only %d bytes. It is recommended that you " +
"decrease the cache size or increase the heap size.",
capacity, Runtime.getRuntime().maxMemory()));
}
Journal.LOG.info("Enabling the journaled edits cache with a capacity " +
"of bytes: " + capacity);
ReadWriteLock lock = new ReentrantReadWriteLock(true);
readLock = new AutoCloseableLock(lock.readLock());
writeLock = new AutoCloseableLock(lock.writeLock());
initialize(INVALID_TXN_ID);
}
/**
* Fetch the data for edits starting at the specific transaction ID, fetching
* up to {@code maxTxns} transactions. Populates a list of output buffers
* which contains a serialized version of the edits, and returns the count of
* edits contained within the serialized buffers. The serialized edits are
* prefixed with a standard edit log header containing information about the
* layout version. The transactions returned are guaranteed to have contiguous
* transaction IDs.
*
* If {@code requestedStartTxn} is higher than the highest transaction which
* has been added to this cache, a response with an empty buffer and a
* transaction count of 0 will be returned. If {@code requestedStartTxn} is
* lower than the lowest transaction currently contained in this cache, or no
* transactions have yet been added to the cache, an exception will be thrown.
*
* @param requestedStartTxn The ID of the first transaction to return. If any
* transactions are returned, it is guaranteed that
* the first one will have this ID.
* @param maxTxns The maximum number of transactions to return.
* @param outputBuffers A list to populate with output buffers. When
* concatenated, these form a full response.
* @return The number of transactions contained within the set of output
* buffers.
* @throws IOException If transactions are requested which cannot be served
* by this cache.
*/
int retrieveEdits(long requestedStartTxn, int maxTxns,
List<ByteBuffer> outputBuffers) throws IOException {
int txnCount = 0;
try (AutoCloseableLock l = readLock.acquire()) {
if (lowestTxnId == INVALID_TXN_ID || requestedStartTxn < lowestTxnId) {
throw getCacheMissException(requestedStartTxn);
} else if (requestedStartTxn > highestTxnId) {
return 0;
}
outputBuffers.add(layoutHeader);
Iterator<Map.Entry<Long, byte[]>> incrBuffIter =
dataMap.tailMap(dataMap.floorKey(requestedStartTxn), true)
.entrySet().iterator();
long prevTxn = requestedStartTxn;
byte[] prevBuf = null;
// Stop when maximum transactions reached...
while ((txnCount < maxTxns) &&
// ... or there are no more entries ...
(incrBuffIter.hasNext() || prevBuf != null)) {
long currTxn;
byte[] currBuf;
if (incrBuffIter.hasNext()) {
Map.Entry<Long, byte[]> ent = incrBuffIter.next();
currTxn = ent.getKey();
currBuf = ent.getValue();
} else {
// This accounts for the trailing entry
currTxn = highestTxnId + 1;
currBuf = null;
}
if (prevBuf != null) { // True except for the first loop iteration
outputBuffers.add(ByteBuffer.wrap(prevBuf));
// if prevTxn < requestedStartTxn, the extra transactions will get
// removed after the loop, so don't include them in the txn count
txnCount += currTxn - Math.max(requestedStartTxn, prevTxn);
}
prevTxn = currTxn;
prevBuf = currBuf;
}
// Release the lock before doing operations on the buffers (deserializing
// to find transaction boundaries, and copying into an output buffer)
}
// Remove extra leading transactions in the first buffer
ByteBuffer firstBuf = outputBuffers.get(1); // 0th is the header
firstBuf.position(
findTransactionPosition(firstBuf.array(), requestedStartTxn));
// Remove trailing transactions in the last buffer if necessary
if (txnCount > maxTxns) {
ByteBuffer lastBuf = outputBuffers.get(outputBuffers.size() - 1);
int limit =
findTransactionPosition(lastBuf.array(), requestedStartTxn + maxTxns);
lastBuf.limit(limit);
txnCount = maxTxns;
}
return txnCount;
}
/**
* Store a batch of serialized edits into this cache. Removes old batches
* as necessary to keep the total size of the cache below the capacity.
* See the class Javadoc for more info.
*
* This attempts to always handle malformed inputs gracefully rather than
* throwing an exception, to allow the rest of the Journal's operations
* to proceed normally.
*
* @param inputData A buffer containing edits in serialized form
* @param newStartTxn The txn ID of the first edit in {@code inputData}
* @param newEndTxn The txn ID of the last edit in {@code inputData}
* @param newLayoutVersion The version of the layout used to serialize
* the edits
*/
void storeEdits(byte[] inputData, long newStartTxn, long newEndTxn,
int newLayoutVersion) {
if (newStartTxn < 0 || newEndTxn < newStartTxn) {
Journal.LOG.error(String.format("Attempted to cache data of length %d " +
"with newStartTxn %d and newEndTxn %d",
inputData.length, newStartTxn, newEndTxn));
return;
}
try (AutoCloseableLock l = writeLock.acquire()) {
if (newLayoutVersion != layoutVersion) {
try {
updateLayoutVersion(newLayoutVersion, newStartTxn);
} catch (IOException ioe) {
Journal.LOG.error(String.format("Unable to save new edits [%d, %d] " +
"due to exception when updating to new layout version %d",
newStartTxn, newEndTxn, newLayoutVersion), ioe);
return;
}
} else if (lowestTxnId == INVALID_TXN_ID) {
Journal.LOG.info("Initializing edits cache starting from txn ID " +
newStartTxn);
initialize(newStartTxn);
} else if (highestTxnId + 1 != newStartTxn) {
// Cache is out of sync; clear to avoid storing noncontiguous regions
Journal.LOG.error(String.format("Edits cache is out of sync; " +
"looked for next txn id at %d but got start txn id for " +
"cache put request at %d. Reinitializing at new request.",
highestTxnId + 1, newStartTxn));
initialize(newStartTxn);
}
while ((totalSize + inputData.length) > capacity && !dataMap.isEmpty()) {
Map.Entry<Long, byte[]> lowest = dataMap.firstEntry();
dataMap.remove(lowest.getKey());
totalSize -= lowest.getValue().length;
}
if (inputData.length > capacity) {
initialize(INVALID_TXN_ID);
Journal.LOG.warn(String.format("A single batch of edits was too " +
"large to fit into the cache: startTxn = %d, endTxn = %d, " +
"input length = %d. The capacity of the cache (%s) must be " +
"increased for it to work properly (current capacity %d)." +
"Cache is now empty.",
newStartTxn, newEndTxn, inputData.length,
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity));
return;
}
if (dataMap.isEmpty()) {
lowestTxnId = newStartTxn;
} else {
lowestTxnId = dataMap.firstKey();
}
dataMap.put(newStartTxn, inputData);
highestTxnId = newEndTxn;
totalSize += inputData.length;
}
}
/**
* Skip through a given stream of edits until the given transaction ID is
* found. Return the number of bytes that appear prior to the given
* transaction.
*
* @param buf A buffer containing a stream of serialized edits
* @param txnId The transaction ID to search for
* @return The number of bytes appearing in {@code buf} <i>before</i>
* the start of the transaction with ID {@code txnId}.
*/
private int findTransactionPosition(byte[] buf, long txnId)
throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(buf);
FSEditLogLoader.PositionTrackingInputStream tracker =
new FSEditLogLoader.PositionTrackingInputStream(bais);
FSEditLogOp.Reader reader = FSEditLogOp.Reader.create(
new DataInputStream(tracker), tracker, layoutVersion);
long previousPos = 0;
while (reader.scanOp() < txnId) {
previousPos = tracker.getPos();
}
// tracker is backed by a byte[]; position cannot go above an integer
return (int) previousPos;
}
/**
* Update the layout version of the cache. This clears out all existing
* entries, and populates the new layout version and header for that version.
*
* @param newLayoutVersion The new layout version to be stored in the cache
* @param newStartTxn The new lowest transaction in the cache
*/
private void updateLayoutVersion(int newLayoutVersion, long newStartTxn)
throws IOException {
StringBuilder logMsg = new StringBuilder()
.append("Updating edits cache to use layout version ")
.append(newLayoutVersion)
.append(" starting from txn ID ")
.append(newStartTxn);
if (layoutVersion != INVALID_LAYOUT_VERSION) {
logMsg.append("; previous version was ").append(layoutVersion)
.append("; old entries will be cleared.");
}
Journal.LOG.info(logMsg.toString());
initialize(newStartTxn);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
EditLogFileOutputStream.writeHeader(newLayoutVersion,
new DataOutputStream(baos));
layoutVersion = newLayoutVersion;
layoutHeader = ByteBuffer.wrap(baos.toByteArray());
}
/**
* Initialize the cache back to a clear state.
*
* @param newInitialTxnId The new lowest transaction ID stored in the cache.
* This should be {@value INVALID_TXN_ID} if the cache
* is to remain empty at this time.
*/
private void initialize(long newInitialTxnId) {
dataMap.clear();
totalSize = 0;
initialTxnId = newInitialTxnId;
lowestTxnId = initialTxnId;
highestTxnId = INVALID_TXN_ID; // this will be set later
}
/**
* Return the underlying data buffer used to store information about the
* given transaction ID.
*
* @param txnId Transaction ID whose containing buffer should be fetched.
* @return The data buffer for the transaction
*/
@VisibleForTesting
byte[] getRawDataForTests(long txnId) {
try (AutoCloseableLock l = readLock.acquire()) {
return dataMap.floorEntry(txnId).getValue();
}
}
private CacheMissException getCacheMissException(long requestedTxnId) {
if (lowestTxnId == INVALID_TXN_ID) {
return new CacheMissException(0, "Cache is empty; either it was never " +
"written to or the last write overflowed the cache capacity.");
} else if (requestedTxnId < initialTxnId) {
return new CacheMissException(initialTxnId - requestedTxnId,
"Cache started at txn ID %d but requested txns starting at %d.",
initialTxnId, requestedTxnId);
} else {
return new CacheMissException(lowestTxnId - requestedTxnId,
"Oldest txn ID available in the cache is %d, but requested txns " +
"starting at %d. The cache size (%s) may need to be increased " +
"to hold more transactions (currently %d bytes containing %d " +
"transactions)", lowestTxnId, requestedTxnId,
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity,
highestTxnId - lowestTxnId + 1);
}
}
static class CacheMissException extends IOException {
private static final long serialVersionUID = 0L;
private final long cacheMissAmount;
CacheMissException(long cacheMissAmount, String msgFormat,
Object... msgArgs) {
super(String.format(msgFormat, msgArgs));
this.cacheMissAmount = cacheMissAmount;
}
long getCacheMissAmount() {
return cacheMissAmount;
}
}
}