blob: 68ae49b26a27ac712ae7cce751c46a5cadcead72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lease;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.hadoop.ozone.lease.Lease.messageForResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* LeaseManager is someone who can provide you leases based on your
* requirement. If you want to return the lease back before it expires,
* you can give it back to Lease Manager. He is the one responsible for
* the lifecycle of leases. The resource for which lease is created
* should have proper {@code equals} method implementation, resource
* equality is checked while the lease is created.
*
* @param <T> Type of leases that this lease manager can create
*/
public class LeaseManager<T> {
private static final Logger LOG =
LoggerFactory.getLogger(LeaseManager.class);
private final String name;
private final long defaultTimeout;
private final Object monitor = new Object();
private Map<T, Lease<T>> activeLeases;
private LeaseMonitor leaseMonitor;
private Thread leaseMonitorThread;
private boolean isRunning;
/**
* Creates an instance of lease manager.
*
* @param name
* Name for the LeaseManager instance.
* @param defaultTimeout
* Default timeout in milliseconds to be used for lease creation.
*/
public LeaseManager(String name, long defaultTimeout) {
this.name = name;
this.defaultTimeout = defaultTimeout;
}
/**
* Starts the lease manager service.
*/
public void start() {
LOG.debug("Starting {} LeaseManager service", name);
activeLeases = new ConcurrentHashMap<>();
leaseMonitor = new LeaseMonitor();
leaseMonitorThread = new Thread(leaseMonitor);
leaseMonitorThread.setName(name + "-LeaseManager#LeaseMonitor");
leaseMonitorThread.setDaemon(true);
leaseMonitorThread.setUncaughtExceptionHandler((thread, throwable) -> {
// Let us just restart this thread after logging an error.
// if this thread is not running we cannot handle Lease expiry.
LOG.error("LeaseMonitor thread encountered an error. Thread: {}",
thread.toString(), throwable);
leaseMonitorThread.start();
});
LOG.debug("Starting {}-LeaseManager#LeaseMonitor Thread", name);
leaseMonitorThread.start();
isRunning = true;
}
/**
* Returns a lease for the specified resource with default timeout.
*
* @param resource
* Resource for which lease has to be created
* @throws LeaseAlreadyExistException
* If there is already a lease on the resource
*/
public synchronized Lease<T> acquire(T resource)
throws LeaseAlreadyExistException {
return acquire(resource, defaultTimeout);
}
/**
* Returns a lease for the specified resource with the timeout provided.
*
* @param resource
* Resource for which lease has to be created
* @param timeout
* The timeout in milliseconds which has to be set on the lease
* @throws LeaseAlreadyExistException
* If there is already a lease on the resource
*/
public synchronized Lease<T> acquire(T resource, long timeout)
throws LeaseAlreadyExistException {
checkStatus();
if (LOG.isDebugEnabled()) {
LOG.debug("Acquiring lease on {} for {} milliseconds", resource, timeout);
}
if (activeLeases.containsKey(resource)) {
throw new LeaseAlreadyExistException(messageForResource(resource));
}
Lease<T> lease = new Lease<>(resource, timeout);
activeLeases.put(resource, lease);
synchronized (monitor) {
monitor.notifyAll();
}
return lease;
}
/**
* Returns a lease associated with the specified resource.
*
* @param resource
* Resource for which the lease has to be returned
* @throws LeaseNotFoundException
* If there is no active lease on the resource
*/
public Lease<T> get(T resource) throws LeaseNotFoundException {
checkStatus();
Lease<T> lease = activeLeases.get(resource);
if (lease != null) {
return lease;
}
throw new LeaseNotFoundException(messageForResource(resource));
}
/**
* Releases the lease associated with the specified resource.
*
* @param resource
* The for which the lease has to be released
* @throws LeaseNotFoundException
* If there is no active lease on the resource
*/
public synchronized void release(T resource)
throws LeaseNotFoundException {
checkStatus();
if (LOG.isDebugEnabled()) {
LOG.debug("Releasing lease on {}", resource);
}
Lease<T> lease = activeLeases.remove(resource);
if (lease == null) {
throw new LeaseNotFoundException(messageForResource(resource));
}
lease.invalidate();
}
/**
* Shuts down the LeaseManager and releases the resources. All the active
* {@link Lease} will be released (callbacks on leases will not be
* executed).
*/
public void shutdown() {
checkStatus();
LOG.debug("Shutting down LeaseManager service");
leaseMonitor.disable();
synchronized (monitor) {
monitor.notifyAll();
}
for (T resource : activeLeases.keySet()) {
try {
release(resource);
} catch (LeaseNotFoundException ex) {
//Ignore the exception, someone might have released the lease
}
}
isRunning = false;
}
/**
* Throws {@link LeaseManagerNotRunningException} if the service is not
* running.
*/
private void checkStatus() {
if (!isRunning) {
throw new LeaseManagerNotRunningException("LeaseManager not running.");
}
}
/**
* Monitors the leases and expires them based on the timeout, also
* responsible for executing the callbacks of expired leases.
*/
private final class LeaseMonitor implements Runnable {
private final ExecutorService executorService;
private volatile boolean running = true;
private LeaseMonitor() {
this.executorService = Executors.newCachedThreadPool();
}
@Override
public void run() {
while (running) {
LOG.debug("{}-LeaseMonitor: checking for lease expiry", name);
long sleepTime = Long.MAX_VALUE;
for (T resource : activeLeases.keySet()) {
try {
Lease<T> lease = get(resource);
long remainingTime = lease.getRemainingTime();
if (remainingTime <= 0) {
//Lease has timed out
List<Callable<Void>> leaseCallbacks = lease.getCallbacks();
release(resource);
executorService.execute(
new LeaseCallbackExecutor<>(resource, leaseCallbacks));
} else {
sleepTime = Math.min(remainingTime, sleepTime);
}
} catch (LeaseNotFoundException | LeaseExpiredException ex) {
//Ignore the exception, someone might have released the lease
}
}
try {
synchronized (monitor) {
monitor.wait(sleepTime);
}
} catch (InterruptedException e) {
// This means a new lease is added to activeLeases.
LOG.warn("Lease manager is interrupted. Shutting down...", e);
Thread.currentThread().interrupt();
}
}
}
/**
* Disables lease monitor, next interrupt call on the thread
* will stop lease monitor.
*/
public void disable() {
running = false;
}
}
}