blob: e0ea974b194523bdade77a73a545600e33d336e3 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of nonce manager that stores nonces in a hash map and cleans them up after
* some time; if nonce group/client ID is supplied, nonces are stored by client ID.
*/
@InterfaceAudience.Private
public class ServerNonceManager {
public static final String HASH_NONCE_GRACE_PERIOD_KEY = "hbase.server.hashNonce.gracePeriod";
private static final Logger LOG = LoggerFactory.getLogger(ServerNonceManager.class);
/** The time to wait in an extremely unlikely case of a conflict with a running op.
* Only here so that tests could override it and not wait. */
private int conflictWaitIterationMs = 30000;
private static final SimpleDateFormat tsFormat = new SimpleDateFormat("HH:mm:ss.SSS");
// This object is used to synchronize on in case of collisions, and for cleanup.
private static class OperationContext {
static final int DONT_PROCEED = 0;
static final int PROCEED = 1;
static final int WAIT = 2;
// 0..1 - state, 2..2 - whether anyone is waiting, 3.. - ts of last activity
private long data = 0;
private static final long STATE_BITS = 3;
private static final long WAITING_BIT = 4;
private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS;
private volatile long mvcc;
@Override
public String toString() {
return "[state " + getState() + ", hasWait " + hasWait() + ", activity "
+ tsFormat.format(new Date(getActivityTime())) + "]";
}
public OperationContext() {
setState(WAIT);
reportActivity();
}
public void setState(int state) {
this.data = (this.data & ~STATE_BITS) | state;
}
public int getState() {
return (int)(this.data & STATE_BITS);
}
public void setHasWait() {
this.data = this.data | WAITING_BIT;
}
public boolean hasWait() {
return (this.data & WAITING_BIT) == WAITING_BIT;
}
public void reportActivity() {
long now = EnvironmentEdgeManager.currentTime();
this.data = (this.data & ALL_FLAG_BITS) | (now << 3);
}
public boolean isExpired(long minRelevantTime) {
return getActivityTime() < (minRelevantTime & (~0L >>> 3));
}
public void setMvcc(long mvcc) {
this.mvcc = mvcc;
}
public long getMvcc() {
return this.mvcc;
}
private long getActivityTime() {
return this.data >>> 3;
}
}
/**
* Nonces.
* Approximate overhead per nonce: 64 bytes from hashmap, 32 from two objects (k/v),
* NK: 16 bytes (2 longs), OC: 8 bytes (1 long) - so, 120 bytes.
* With 30min expiration time, 5k increments/appends per sec., we'd use approximately 1Gb,
* which is a realistic worst case. If it's much worse, we could use some sort of memory
* limit and cleanup.
*/
private ConcurrentHashMap<NonceKey, OperationContext> nonces = new ConcurrentHashMap<>();
private int deleteNonceGracePeriod;
public ServerNonceManager(Configuration conf) {
// Default - 30 minutes.
deleteNonceGracePeriod = conf.getInt(HASH_NONCE_GRACE_PERIOD_KEY, 30 * 60 * 1000);
if (deleteNonceGracePeriod < 60 * 1000) {
LOG.warn("Nonce grace period " + deleteNonceGracePeriod
+ " is less than a minute; might be too small to be useful");
}
}
public void setConflictWaitIterationMs(int conflictWaitIterationMs) {
this.conflictWaitIterationMs = conflictWaitIterationMs;
}
/**
* Starts the operation if operation with such nonce has not already succeeded. If the
* operation is in progress, waits for it to end and checks whether it has succeeded.
* @param group Nonce group.
* @param nonce Nonce.
* @param stoppable Stoppable that terminates waiting (if any) when the server is stopped.
* @return true if the operation has not already succeeded and can proceed; false otherwise.
*/
public boolean startOperation(long group, long nonce, Stoppable stoppable)
throws InterruptedException {
if (nonce == HConstants.NO_NONCE) return true;
NonceKey nk = new NonceKey(group, nonce);
OperationContext ctx = new OperationContext();
while (true) {
OperationContext oldResult = nonces.putIfAbsent(nk, ctx);
if (oldResult == null) return true;
// Collision with some operation - should be extremely rare.
synchronized (oldResult) {
int oldState = oldResult.getState();
LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult);
if (oldState != OperationContext.WAIT) {
return oldState == OperationContext.PROCEED; // operation ended
}
oldResult.setHasWait();
oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop
if (stoppable.isStopped()) {
throw new InterruptedException("Server stopped");
}
}
}
}
/**
* Ends the operation started by startOperation.
* @param group Nonce group.
* @param nonce Nonce.
* @param success Whether the operation has succeeded.
*/
public void endOperation(long group, long nonce, boolean success) {
if (nonce == HConstants.NO_NONCE) return;
NonceKey nk = new NonceKey(group, nonce);
OperationContext newResult = nonces.get(nk);
assert newResult != null;
synchronized (newResult) {
assert newResult.getState() == OperationContext.WAIT;
// If we failed, other retries can proceed.
newResult.setState(success ? OperationContext.DONT_PROCEED : OperationContext.PROCEED);
if (success) {
newResult.reportActivity(); // Set time to use for cleanup.
} else {
OperationContext val = nonces.remove(nk);
assert val == newResult;
}
if (newResult.hasWait()) {
LOG.debug("Conflict with running op ended: " + nk + ", " + newResult);
newResult.notifyAll();
}
}
}
/**
* Store the write point in OperationContext when the operation succeed.
* @param group Nonce group.
* @param nonce Nonce.
* @param mvcc Write point of the succeed operation.
*/
public void addMvccToOperationContext(long group, long nonce, long mvcc) {
if (nonce == HConstants.NO_NONCE) {
return;
}
NonceKey nk = new NonceKey(group, nonce);
OperationContext result = nonces.get(nk);
assert result != null;
synchronized (result) {
result.setMvcc(mvcc);
}
}
/**
* Return the write point of the previous succeed operation.
* @param group Nonce group.
* @param nonce Nonce.
* @return write point of the previous succeed operation.
*/
public long getMvccFromOperationContext(long group, long nonce) {
if (nonce == HConstants.NO_NONCE) {
return Long.MAX_VALUE;
}
NonceKey nk = new NonceKey(group, nonce);
OperationContext result = nonces.get(nk);
return result == null ? Long.MAX_VALUE : result.getMvcc();
}
/**
* Reports the operation from WAL during replay.
* @param group Nonce group.
* @param nonce Nonce.
* @param writeTime Entry write time, used to ignore entries that are too old.
*/
public void reportOperationFromWal(long group, long nonce, long writeTime) {
if (nonce == HConstants.NO_NONCE) return;
// Give the write time some slack in case the clocks are not synchronized.
long now = EnvironmentEdgeManager.currentTime();
if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return;
OperationContext newResult = new OperationContext();
newResult.setState(OperationContext.DONT_PROCEED);
NonceKey nk = new NonceKey(group, nonce);
OperationContext oldResult = nonces.putIfAbsent(nk, newResult);
if (oldResult != null) {
// Some schemes can have collisions (for example, expiring hashes), so just log it.
// We have no idea about the semantics here, so this is the least of many evils.
LOG.warn("Nonce collision during WAL recovery: " + nk
+ ", " + oldResult + " with " + newResult);
}
}
/**
* Creates a scheduled chore that is used to clean up old nonces.
* @param stoppable Stoppable for the chore.
* @return ScheduledChore; the scheduled chore is not started.
*/
public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) {
// By default, it will run every 6 minutes (30 / 5).
return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) {
@Override
protected void chore() {
cleanUpOldNonces();
}
};
}
private void cleanUpOldNonces() {
long cutoff = EnvironmentEdgeManager.currentTime() - deleteNonceGracePeriod;
for (Map.Entry<NonceKey, OperationContext> entry : nonces.entrySet()) {
OperationContext oc = entry.getValue();
if (!oc.isExpired(cutoff)) continue;
synchronized (oc) {
if (oc.getState() == OperationContext.WAIT || !oc.isExpired(cutoff)) continue;
nonces.remove(entry.getKey());
}
}
}
}