blob: 900d7301bf9cbd5f9b0ad00f778054cf24e644aa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import java.util.concurrent.atomic.AtomicInteger;
/**
* An Azure blob lease that automatically renews itself indefinitely
* using a background thread. Use it to synchronize distributed processes,
* or to prevent writes to the blob by other processes that don't
* have the lease.
*
* Creating a new Lease object blocks the caller until the Azure blob lease is
* acquired.
*
* Attempting to get a lease on a non-existent blob throws StorageException.
*
* Call free() to release the Lease.
*
* You can use this Lease like a distributed lock. If the holder process
* dies, the lease will time out since it won't be renewed.
*/
public class SelfRenewingLease {
private CloudBlobWrapper blobWrapper;
private Thread renewer;
private volatile boolean leaseFreed;
private String leaseID = null;
private static final int LEASE_TIMEOUT = 60; // Lease timeout in seconds
// Time to wait to renew lease in milliseconds
public static final int LEASE_RENEWAL_PERIOD = 40000;
private static final Log LOG = LogFactory.getLog(SelfRenewingLease.class);
// Used to allocate thread serial numbers in thread name
private static AtomicInteger threadNumber = new AtomicInteger(0);
// Time to wait to retry getting the lease in milliseconds
@VisibleForTesting
static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
public SelfRenewingLease(CloudBlobWrapper blobWrapper)
throws StorageException {
this.leaseFreed = false;
this.blobWrapper = blobWrapper;
// Keep trying to get the lease until you get it.
CloudBlob blob = blobWrapper.getBlob();
while(leaseID == null) {
try {
leaseID = blob.acquireLease(LEASE_TIMEOUT, null);
} catch (StorageException e) {
// Throw again if we don't want to keep waiting.
// We expect it to be that the lease is already present,
// or in some cases that the blob does not exist.
if (!e.getErrorCode().equals("LeaseAlreadyPresent")) {
LOG.info(
"Caught exception when trying to get lease on blob "
+ blobWrapper.getUri().toString() + ". " + e.getMessage());
throw e;
}
}
if (leaseID == null) {
try {
Thread.sleep(LEASE_ACQUIRE_RETRY_INTERVAL);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}
renewer = new Thread(new Renewer());
// A Renewer running should not keep JVM from exiting, so make it a daemon.
renewer.setDaemon(true);
renewer.setName("AzureLeaseRenewer-" + threadNumber.getAndIncrement());
renewer.start();
LOG.debug("Acquired lease " + leaseID + " on " + blob.getUri()
+ " managed by thread " + renewer.getName());
}
/**
* Free the lease and stop the keep-alive thread.
* @throws StorageException
*/
public void free() throws StorageException {
AccessCondition accessCondition = AccessCondition.generateEmptyCondition();
accessCondition.setLeaseID(leaseID);
try {
blobWrapper.getBlob().releaseLease(accessCondition);
} catch (StorageException e) {
if (e.getErrorCode().equals("BlobNotFound")) {
// Don't do anything -- it's okay to free a lease
// on a deleted file. The delete freed the lease
// implicitly.
} else {
// This error is not anticipated, so re-throw it.
LOG.warn("Unanticipated exception when trying to free lease " + leaseID
+ " on " + blobWrapper.getStorageUri());
throw(e);
}
} finally {
// Even if releasing the lease fails (e.g. because the file was deleted),
// make sure to record that we freed the lease, to terminate the
// keep-alive thread.
leaseFreed = true;
LOG.debug("Freed lease " + leaseID + " on " + blobWrapper.getUri()
+ " managed by thread " + renewer.getName());
}
}
public boolean isFreed() {
return leaseFreed;
}
public String getLeaseID() {
return leaseID;
}
public CloudBlob getCloudBlob() {
return blobWrapper.getBlob();
}
private class Renewer implements Runnable {
/**
* Start a keep-alive thread that will continue to renew
* the lease until it is freed or the process dies.
*/
@Override
public void run() {
LOG.debug("Starting lease keep-alive thread.");
AccessCondition accessCondition =
AccessCondition.generateEmptyCondition();
accessCondition.setLeaseID(leaseID);
while(!leaseFreed) {
try {
Thread.sleep(LEASE_RENEWAL_PERIOD);
} catch (InterruptedException e) {
LOG.debug("Keep-alive thread for lease " + leaseID +
" interrupted.");
// Restore the interrupted status
Thread.currentThread().interrupt();
}
try {
if (!leaseFreed) {
blobWrapper.getBlob().renewLease(accessCondition);
// It'll be very rare to renew the lease (most will be short)
// so log that we did it, to help with system debugging.
LOG.info("Renewed lease " + leaseID + " on "
+ getCloudBlob().getUri());
}
} catch (StorageException e) {
if (!leaseFreed) {
// Free the lease so we don't leave this thread running forever.
leaseFreed = true;
// Normally leases should be freed and there should be no
// exceptions, so log a warning.
LOG.warn("Attempt to renew lease " + leaseID + " on "
+ getCloudBlob().getUri()
+ " failed, but lease not yet freed. Reason: " +
e.getMessage());
}
}
}
}
}
}