blob: fcacdaa6a734d9a85a25e4858fe7f520dfd4656e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client.impl;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientFaultInjector;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing
* file-being-written leases on the namenode.
* When a file is opened for write (create or append),
* namenode stores a file lease for recording the identity of the writer.
* The writer (i.e. the DFSClient) is required to renew the lease periodically.
* When the lease is not renewed before it expires,
* the namenode considers the writer as failed and then it may either let
* another writer to obtain the lease or close the file.
* </p>
* <p>
* This class also provides the following functionality:
* <ul>
* <li>
* It maintains a map from (namenode, user) pairs to lease renewers.
* The same {@link LeaseRenewer} instance is used for renewing lease
* for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and
* the same user.
* </li>
* <li>
* Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}.
* Periodically the leases for all the clients are renewed.
* A client is removed from the list when the client is closed.
* </li>
* <li>
* A thread per namenode per user is used by the {@link LeaseRenewer}
* to renew the leases.
* </li>
* </ul>
* <p>
*/
@InterfaceAudience.Private
public class LeaseRenewer {
public static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
private static long leaseRenewerGraceDefault = 60*1000L;
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
private AtomicBoolean isLSRunning = new AtomicBoolean(false);
/** Get a {@link LeaseRenewer} instance */
public static LeaseRenewer getInstance(final String authority,
final UserGroupInformation ugi, final DFSClient dfsc) {
final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
r.addClient(dfsc);
return r;
}
/**
* Remove the given renewer from the Factory.
* Subsequent call will receive new {@link LeaseRenewer} instance.
* @param renewer Instance to be cleared from Factory
*/
public static void remove(LeaseRenewer renewer) {
synchronized (renewer) {
Factory.INSTANCE.remove(renewer);
}
}
/**
* A factory for sharing {@link LeaseRenewer} objects
* among {@link DFSClient} instances
* so that there is only one renewer per authority per user.
*/
private static class Factory {
private static final Factory INSTANCE = new Factory();
private static class Key {
/** Namenode info */
final String authority;
/** User info */
final UserGroupInformation ugi;
private Key(final String authority, final UserGroupInformation ugi) {
if (authority == null) {
throw new HadoopIllegalArgumentException("authority == null");
} else if (ugi == null) {
throw new HadoopIllegalArgumentException("ugi == null");
}
this.authority = authority;
this.ugi = ugi;
}
@Override
public int hashCode() {
return authority.hashCode() ^ ugi.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj != null && obj instanceof Key) {
final Key that = (Key)obj;
return this.authority.equals(that.authority)
&& this.ugi.equals(that.ugi);
}
return false;
}
@Override
public String toString() {
return ugi.getShortUserName() + "@" + authority;
}
}
/** A map for per user per namenode renewers. */
private final Map<Key, LeaseRenewer> renewers = new HashMap<>();
/** Get a renewer. */
private synchronized LeaseRenewer get(final String authority,
final UserGroupInformation ugi) {
final Key k = new Key(authority, ugi);
LeaseRenewer r = renewers.get(k);
if (r == null) {
r = new LeaseRenewer(k);
renewers.put(k, r);
}
return r;
}
/** Remove the given renewer. */
private synchronized void remove(final LeaseRenewer r) {
final LeaseRenewer stored = renewers.get(r.factorykey);
//Since a renewer may expire, the stored renewer can be different.
if (r == stored) {
// Expire LeaseRenewer daemon thread as soon as possible.
r.clearClients();
r.setEmptyTime(0);
renewers.remove(r.factorykey);
}
}
}
/** The time in milliseconds that the map became empty. */
private long emptyTime = Long.MAX_VALUE;
/** A fixed lease renewal time period in milliseconds */
private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD / 2;
/** A daemon for renewing lease */
private Daemon daemon = null;
/** Only the daemon with currentId should run. */
private int currentId = 0;
/**
* A period in milliseconds that the lease renewer thread should run
* after the map became empty.
* In other words,
* if the map is empty for a time period longer than the grace period,
* the renewer should terminate.
*/
private long gracePeriod;
/**
* The time period in milliseconds
* that the renewer sleeps for each iteration.
*/
private long sleepPeriod;
private final Factory.Key factorykey;
/** A list of clients corresponding to this renewer. */
private final List<DFSClient> dfsclients = new ArrayList<>();
/**
* A stringified stack trace of the call stack when the Lease Renewer
* was instantiated. This is only generated if trace-level logging is
* enabled on this class.
*/
private final String instantiationTrace;
private LeaseRenewer(Factory.Key factorykey) {
this.factorykey = factorykey;
unsyncSetGraceSleepPeriod(leaseRenewerGraceDefault);
if (LOG.isTraceEnabled()) {
instantiationTrace = StringUtils.stringifyException(
new Throwable("TRACE"));
} else {
instantiationTrace = null;
}
}
/** @return the renewal time in milliseconds. */
private synchronized long getRenewalTime() {
return renewal;
}
/** Used for testing only. */
@VisibleForTesting
public synchronized void setRenewalTime(final long renewal) {
this.renewal = renewal;
}
/** Add a client. */
private synchronized void addClient(final DFSClient dfsc) {
for(DFSClient c : dfsclients) {
if (c == dfsc) {
//client already exists, nothing to do.
return;
}
}
//client not found, add it
dfsclients.add(dfsc);
//update renewal time
final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
if (hdfsTimeout > 0) {
final long half = hdfsTimeout/2;
if (half < renewal) {
this.renewal = half;
}
}
}
private synchronized void clearClients() {
dfsclients.clear();
}
private synchronized boolean clientsRunning() {
for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
if (!i.next().isClientRunning()) {
i.remove();
}
}
return !dfsclients.isEmpty();
}
private synchronized long getSleepPeriod() {
return sleepPeriod;
}
/** Set the grace period and adjust the sleep period accordingly. */
synchronized void setGraceSleepPeriod(final long gracePeriod) {
unsyncSetGraceSleepPeriod(gracePeriod);
}
private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
if (gracePeriod < 100L) {
throw new HadoopIllegalArgumentException(gracePeriod
+ " = gracePeriod < 100ms is too small.");
}
this.gracePeriod = gracePeriod;
final long half = gracePeriod/2;
this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
half: LEASE_RENEWER_SLEEP_DEFAULT;
}
@VisibleForTesting
/** Is the daemon running? */
public synchronized boolean isRunning() {
return daemon != null && daemon.isAlive();
}
/** Does this renewer have nothing to renew? */
public boolean isEmpty() {
return dfsclients.isEmpty();
}
/** Used only by tests */
synchronized String getDaemonName() {
return daemon.getName();
}
/** Is the empty period longer than the grace period? */
private synchronized boolean isRenewerExpired() {
return emptyTime != Long.MAX_VALUE
&& Time.monotonicNow() - emptyTime > gracePeriod;
}
public synchronized boolean put(final DFSClient dfsc) {
if (dfsc.isClientRunning()) {
if (!isRunning() || isRenewerExpired()) {
// Start a new daemon with a new id.
final int id = ++currentId;
if (isLSRunning.get()) {
// Not allowed to add multiple daemons into LeaseRenewer, let client
// create new LR and continue to acquire lease.
return false;
}
isLSRunning.getAndSet(true);
daemon = new Daemon(new Runnable() {
@Override
public void run() {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " started");
}
LeaseRenewer.this.run(id);
} catch(InterruptedException e) {
LOG.debug("LeaseRenewer is interrupted.", e);
} finally {
synchronized(LeaseRenewer.this) {
Factory.INSTANCE.remove(LeaseRenewer.this);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " exited");
}
}
}
@Override
public String toString() {
return String.valueOf(LeaseRenewer.this);
}
});
daemon.start();
}
emptyTime = Long.MAX_VALUE;
}
return true;
}
@VisibleForTesting
synchronized void setEmptyTime(long time) {
emptyTime = time;
}
/** Close the given client. */
public synchronized void closeClient(final DFSClient dfsc) {
dfsclients.remove(dfsc);
if (dfsclients.isEmpty()) {
if (!isRunning() || isRenewerExpired()) {
Factory.INSTANCE.remove(LeaseRenewer.this);
return;
}
if (emptyTime == Long.MAX_VALUE) {
//discover the first time that the client list is empty.
emptyTime = Time.monotonicNow();
}
}
//update renewal time
if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
for(DFSClient c : dfsclients) {
final int timeout = c.getConf().getHdfsTimeout();
if (timeout > 0 && timeout < min) {
min = timeout;
}
}
renewal = min/2;
}
}
public void interruptAndJoin() throws InterruptedException {
Daemon daemonCopy = null;
synchronized (this) {
if (isRunning()) {
daemon.interrupt();
daemonCopy = daemon;
}
}
if (daemonCopy != null) {
LOG.debug("Wait for lease checker to terminate");
daemonCopy.join();
}
}
private void renew() throws IOException {
final List<DFSClient> copies;
synchronized(this) {
copies = new ArrayList<>(dfsclients);
}
//sort the client names for finding out repeated names.
Collections.sort(copies, new Comparator<DFSClient>() {
@Override
public int compare(final DFSClient left, final DFSClient right) {
return left.getClientName().compareTo(right.getClientName());
}
});
String previousName = "";
for (final DFSClient c : copies) {
//skip if current client name is the same as the previous name.
if (!c.getClientName().equals(previousName)) {
if (!c.renewLease()) {
LOG.debug("Did not renew lease for client {}", c);
continue;
}
previousName = c.getClientName();
LOG.debug("Lease renewed for client {}", previousName);
}
}
}
/**
* Periodically check in with the namenode and renew all the leases
* when the lease period is half over.
*/
private void run(final int id) throws InterruptedException {
for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
Thread.sleep(getSleepPeriod())) {
final long elapsed = Time.monotonicNow() - lastRenewed;
if (elapsed >= getRenewalTime()) {
try {
renew();
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " executed");
}
lastRenewed = Time.monotonicNow();
} catch (SocketTimeoutException ie) {
LOG.warn("Failed to renew lease for " + clientsString() + " for "
+ (elapsed/1000) + " seconds. Aborting ...", ie);
List<DFSClient> dfsclientsCopy;
synchronized (this) {
DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
dfsclientsCopy = new ArrayList<>(dfsclients);
Factory.INSTANCE.remove(LeaseRenewer.this);
}
for (DFSClient dfsClient : dfsclientsCopy) {
dfsClient.closeAllFilesBeingWritten(true);
}
break;
} catch (IOException ie) {
LOG.warn("Failed to renew lease for " + clientsString() + " for "
+ (elapsed/1000) + " seconds. Will retry shortly ...", ie);
}
}
synchronized(this) {
if (id != currentId || isRenewerExpired()) {
if (LOG.isDebugEnabled()) {
if (id != currentId) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " is not current");
} else {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " expired");
}
}
//no longer the current daemon or expired
return;
}
// if no clients are in running state or there is no more clients
// registered with this renewer, stop the daemon after the grace
// period.
if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
emptyTime = Time.monotonicNow();
}
}
}
}
@Override
public String toString() {
String s = getClass().getSimpleName() + ":" + factorykey;
if (LOG.isTraceEnabled()) {
return s + ", clients=" + clientsString()
+ ", created at " + instantiationTrace;
}
return s;
}
/** Get the names of all clients */
private synchronized String clientsString() {
if (dfsclients.isEmpty()) {
return "[]";
} else {
final StringBuilder b = new StringBuilder("[").append(
dfsclients.get(0).getClientName());
for(int i = 1; i < dfsclients.size(); i++) {
b.append(", ").append(dfsclients.get(i).getClientName());
}
return b.append("]").toString();
}
}
@VisibleForTesting
public static void setLeaseRenewerGraceDefault(
long leaseRenewerGraceDefault) {
LeaseRenewer.leaseRenewerGraceDefault = leaseRenewerGraceDefault;
}
}