| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.hadoop.ozone.lease; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| |
| import static org.apache.hadoop.ozone.lease.Lease.messageForResource; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * LeaseManager is someone who can provide you leases based on your |
| * requirement. If you want to return the lease back before it expires, |
| * you can give it back to Lease Manager. He is the one responsible for |
| * the lifecycle of leases. The resource for which lease is created |
| * should have proper {@code equals} method implementation, resource |
| * equality is checked while the lease is created. |
| * |
| * @param <T> Type of leases that this lease manager can create |
| */ |
| public class LeaseManager<T> { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(LeaseManager.class); |
| |
| private final String name; |
| private final long defaultTimeout; |
| private final Object monitor = new Object(); |
| private Map<T, Lease<T>> activeLeases; |
| private LeaseMonitor leaseMonitor; |
| private Thread leaseMonitorThread; |
| private boolean isRunning; |
| |
| /** |
| * Creates an instance of lease manager. |
| * |
| * @param name |
| * Name for the LeaseManager instance. |
| * @param defaultTimeout |
| * Default timeout in milliseconds to be used for lease creation. |
| */ |
| public LeaseManager(String name, long defaultTimeout) { |
| this.name = name; |
| this.defaultTimeout = defaultTimeout; |
| } |
| |
| /** |
| * Starts the lease manager service. |
| */ |
| public void start() { |
| LOG.debug("Starting {} LeaseManager service", name); |
| activeLeases = new ConcurrentHashMap<>(); |
| leaseMonitor = new LeaseMonitor(); |
| leaseMonitorThread = new Thread(leaseMonitor); |
| leaseMonitorThread.setName(name + "-LeaseManager#LeaseMonitor"); |
| leaseMonitorThread.setDaemon(true); |
| leaseMonitorThread.setUncaughtExceptionHandler((thread, throwable) -> { |
| // Let us just restart this thread after logging an error. |
| // if this thread is not running we cannot handle Lease expiry. |
| LOG.error("LeaseMonitor thread encountered an error. Thread: {}", |
| thread.toString(), throwable); |
| leaseMonitorThread.start(); |
| }); |
| LOG.debug("Starting {}-LeaseManager#LeaseMonitor Thread", name); |
| leaseMonitorThread.start(); |
| isRunning = true; |
| } |
| |
| /** |
| * Returns a lease for the specified resource with default timeout. |
| * |
| * @param resource |
| * Resource for which lease has to be created |
| * @throws LeaseAlreadyExistException |
| * If there is already a lease on the resource |
| */ |
| public synchronized Lease<T> acquire(T resource) |
| throws LeaseAlreadyExistException { |
| return acquire(resource, defaultTimeout); |
| } |
| |
| /** |
| * Returns a lease for the specified resource with the timeout provided. |
| * |
| * @param resource |
| * Resource for which lease has to be created |
| * @param timeout |
| * The timeout in milliseconds which has to be set on the lease |
| * @throws LeaseAlreadyExistException |
| * If there is already a lease on the resource |
| */ |
| public synchronized Lease<T> acquire(T resource, long timeout) |
| throws LeaseAlreadyExistException { |
| checkStatus(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Acquiring lease on {} for {} milliseconds", resource, timeout); |
| } |
| if (activeLeases.containsKey(resource)) { |
| throw new LeaseAlreadyExistException(messageForResource(resource)); |
| } |
| Lease<T> lease = new Lease<>(resource, timeout); |
| activeLeases.put(resource, lease); |
| synchronized (monitor) { |
| monitor.notifyAll(); |
| } |
| return lease; |
| } |
| |
| /** |
| * Returns a lease associated with the specified resource. |
| * |
| * @param resource |
| * Resource for which the lease has to be returned |
| * @throws LeaseNotFoundException |
| * If there is no active lease on the resource |
| */ |
| public Lease<T> get(T resource) throws LeaseNotFoundException { |
| checkStatus(); |
| Lease<T> lease = activeLeases.get(resource); |
| if (lease != null) { |
| return lease; |
| } |
| throw new LeaseNotFoundException(messageForResource(resource)); |
| } |
| |
| /** |
| * Releases the lease associated with the specified resource. |
| * |
| * @param resource |
| * The for which the lease has to be released |
| * @throws LeaseNotFoundException |
| * If there is no active lease on the resource |
| */ |
| public synchronized void release(T resource) |
| throws LeaseNotFoundException { |
| checkStatus(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Releasing lease on {}", resource); |
| } |
| Lease<T> lease = activeLeases.remove(resource); |
| if (lease == null) { |
| throw new LeaseNotFoundException(messageForResource(resource)); |
| } |
| lease.invalidate(); |
| } |
| |
| /** |
| * Shuts down the LeaseManager and releases the resources. All the active |
| * {@link Lease} will be released (callbacks on leases will not be |
| * executed). |
| */ |
| public void shutdown() { |
| checkStatus(); |
| LOG.debug("Shutting down LeaseManager service"); |
| leaseMonitor.disable(); |
| synchronized (monitor) { |
| monitor.notifyAll(); |
| } |
| for (T resource : activeLeases.keySet()) { |
| try { |
| release(resource); |
| } catch (LeaseNotFoundException ex) { |
| //Ignore the exception, someone might have released the lease |
| } |
| } |
| isRunning = false; |
| } |
| |
| /** |
| * Throws {@link LeaseManagerNotRunningException} if the service is not |
| * running. |
| */ |
| private void checkStatus() { |
| if (!isRunning) { |
| throw new LeaseManagerNotRunningException("LeaseManager not running."); |
| } |
| } |
| |
| /** |
| * Monitors the leases and expires them based on the timeout, also |
| * responsible for executing the callbacks of expired leases. |
| */ |
| private final class LeaseMonitor implements Runnable { |
| |
| private final ExecutorService executorService; |
| private volatile boolean running = true; |
| |
| private LeaseMonitor() { |
| this.executorService = Executors.newCachedThreadPool(); |
| } |
| |
| @Override |
| public void run() { |
| while (running) { |
| LOG.debug("{}-LeaseMonitor: checking for lease expiry", name); |
| long sleepTime = Long.MAX_VALUE; |
| |
| for (T resource : activeLeases.keySet()) { |
| try { |
| Lease<T> lease = get(resource); |
| long remainingTime = lease.getRemainingTime(); |
| if (remainingTime <= 0) { |
| //Lease has timed out |
| List<Callable<Void>> leaseCallbacks = lease.getCallbacks(); |
| release(resource); |
| executorService.execute( |
| new LeaseCallbackExecutor<>(resource, leaseCallbacks)); |
| } else { |
| sleepTime = Math.min(remainingTime, sleepTime); |
| } |
| } catch (LeaseNotFoundException | LeaseExpiredException ex) { |
| //Ignore the exception, someone might have released the lease |
| } |
| } |
| |
| try { |
| synchronized (monitor) { |
| monitor.wait(sleepTime); |
| } |
| } catch (InterruptedException e) { |
| // This means a new lease is added to activeLeases. |
| LOG.warn("Lease manager is interrupted. Shutting down...", e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| /** |
| * Disables lease monitor, next interrupt call on the thread |
| * will stop lease monitor. |
| */ |
| public void disable() { |
| running = false; |
| } |
| } |
| |
| } |