| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hbase.client.locking; |
| |
| import java.io.IOException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.Abortable; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; |
| import org.apache.hadoop.hbase.util.Threads; |
| |
| /** |
| * Lock for HBase Entity either a Table, a Namespace, or Regions. |
| * |
| * These are remote locks which live on master, and need periodic heartbeats to keep them alive. |
| * (Once we request the lock, internally an heartbeat thread will be started on the client). |
| * If master does not receive the heartbeat in time, it'll release the lock and make it available |
| * to other users. |
| * |
| * <p>Use {@link LockServiceClient} to build instances. Then call {@link #requestLock()}. |
| * {@link #requestLock} will contact master to queue the lock and start the heartbeat thread |
| * which will check lock's status periodically and once the lock is acquired, it will send the |
| * heartbeats to the master. |
| * |
| * <p>Use {@link #await} or {@link #await(long, TimeUnit)} to wait for the lock to be acquired. |
| * Always call {@link #unlock()} irrespective of whether lock was acquired or not. If the lock |
| * was acquired, it'll be released. If it was not acquired, it is possible that master grants the |
| * lock in future and the heartbeat thread keeps it alive forever by sending heartbeats. |
| * Calling {@link #unlock()} will stop the heartbeat thread and cancel the lock queued on master. |
| * |
| * <p>There are 4 ways in which these remote locks may be released/can be lost: |
| * <ul><li>Call {@link #unlock}.</li> |
| * <li>Lock times out on master: Can happen because of network issues, GC pauses, etc. |
| * Worker thread will call the given abortable as soon as it detects such a situation.</li> |
| * <li>Fail to contact master: If worker thread can not contact mater and thus fails to send |
| * heartbeat before the timeout expires, it assumes that lock is lost and calls the |
| * abortable.</li> |
| * <li>Worker thread is interrupted.</li> |
| * </ul> |
| * |
| * Use example: |
| * <code> |
| * EntityLock lock = lockServiceClient.*Lock(...., "exampled lock", abortable); |
| * lock.requestLock(); |
| * .... |
| * ....can do other initializations here since lock is 'asynchronous'... |
| * .... |
| * if (lock.await(timeout)) { |
| * ....logic requiring mutual exclusion |
| * } |
| * lock.unlock(); |
| * </code> |
| */ |
| @InterfaceAudience.Public |
| public class EntityLock { |
| private static final Logger LOG = LoggerFactory.getLogger(EntityLock.class); |
| |
| public static final String HEARTBEAT_TIME_BUFFER = |
| "hbase.client.locks.heartbeat.time.buffer.ms"; |
| |
| private final AtomicBoolean locked = new AtomicBoolean(false); |
| private final CountDownLatch latch = new CountDownLatch(1); |
| |
| private final LockService.BlockingInterface stub; |
| private final LockHeartbeatWorker worker; |
| private final LockRequest lockRequest; |
| private final Abortable abort; |
| |
| // Buffer for unexpected delays (GC, network delay, etc) in heartbeat rpc. |
| private final int heartbeatTimeBuffer; |
| |
| // set to a non-zero value for tweaking sleep time during testing so that worker doesn't wait |
| // for long time periods between heartbeats. |
| private long testingSleepTime = 0; |
| |
| private Long procId = null; |
| |
| /** |
| * Abortable.abort() is called when the lease of the lock will expire. |
| * It's up to the user decide if simply abort the process or handle the loss of the lock |
| * by aborting the operation that was supposed to be under lock. |
| */ |
| EntityLock(Configuration conf, LockService.BlockingInterface stub, |
| LockRequest request, Abortable abort) { |
| this.stub = stub; |
| this.lockRequest = request; |
| this.abort = abort; |
| |
| this.heartbeatTimeBuffer = conf.getInt(HEARTBEAT_TIME_BUFFER, 10000); |
| this.worker = new LockHeartbeatWorker(lockRequest.getDescription()); |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder(); |
| sb.append("EntityLock locked="); |
| sb.append(locked.get()); |
| sb.append(", procId="); |
| sb.append(procId); |
| sb.append(", type="); |
| sb.append(lockRequest.getLockType()); |
| if (lockRequest.getRegionInfoCount() > 0) { |
| sb.append(", regions="); |
| for (int i = 0; i < lockRequest.getRegionInfoCount(); ++i) { |
| if (i > 0) sb.append(", "); |
| sb.append(lockRequest.getRegionInfo(i)); |
| } |
| } else if (lockRequest.hasTableName()) { |
| sb.append(", table="); |
| sb.append(lockRequest.getTableName()); |
| } else if (lockRequest.hasNamespace()) { |
| sb.append(", namespace="); |
| sb.append(lockRequest.getNamespace()); |
| } |
| sb.append(", description="); |
| sb.append(lockRequest.getDescription()); |
| return sb.toString(); |
| } |
| |
| @InterfaceAudience.Private |
| void setTestingSleepTime(long timeInMillis) { |
| testingSleepTime = timeInMillis; |
| } |
| |
| @InterfaceAudience.Private |
| LockHeartbeatWorker getWorker() { |
| return worker; |
| } |
| |
| public boolean isLocked() { |
| return locked.get(); |
| } |
| |
| /** |
| * Sends rpc to the master to request lock. |
| * The lock request is queued with other lock requests. |
| * Call {@link #await()} to wait on lock. |
| * Always call {@link #unlock()} after calling the below, even after error. |
| */ |
| public void requestLock() throws IOException { |
| if (procId == null) { |
| try { |
| procId = stub.requestLock(null, lockRequest).getProcId(); |
| } catch (Exception e) { |
| throw ProtobufUtil.handleRemoteException(e); |
| } |
| worker.start(); |
| } else { |
| LOG.info("Lock already queued : " + toString()); |
| } |
| } |
| |
| /** |
| * @param timeout in milliseconds. If set to 0, waits indefinitely. |
| * @return true if lock was acquired; and false if waiting time elapsed before lock could be |
| * acquired. |
| */ |
| public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { |
| final boolean result = latch.await(timeout, timeUnit); |
| String lockRequestStr = lockRequest.toString().replace("\n", ", "); |
| if (result) { |
| LOG.info("Acquired " + lockRequestStr); |
| } else { |
| LOG.info(String.format("Failed acquire in %s %s of %s", timeout, timeUnit.toString(), |
| lockRequestStr)); |
| } |
| return result; |
| } |
| |
| public void await() throws InterruptedException { |
| latch.await(); |
| } |
| |
| public void unlock() throws IOException { |
| Threads.shutdown(worker.shutdown()); |
| try { |
| stub.lockHeartbeat(null, |
| LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build()); |
| } catch (Exception e) { |
| throw ProtobufUtil.handleRemoteException(e); |
| } |
| } |
| |
| protected class LockHeartbeatWorker extends Thread { |
| private volatile boolean shutdown = false; |
| |
| public LockHeartbeatWorker(final String desc) { |
| super("LockHeartbeatWorker(" + desc + ")"); |
| setDaemon(true); |
| } |
| |
| /** |
| * @return Shuts down the thread clean and quietly. |
| */ |
| Thread shutdown() { |
| shutdown = true; |
| interrupt(); |
| return this; |
| } |
| |
| @Override |
| public void run() { |
| final LockHeartbeatRequest lockHeartbeatRequest = |
| LockHeartbeatRequest.newBuilder().setProcId(procId).build(); |
| |
| LockHeartbeatResponse response; |
| while (true) { |
| try { |
| response = stub.lockHeartbeat(null, lockHeartbeatRequest); |
| } catch (Exception e) { |
| e = ProtobufUtil.handleRemoteException(e); |
| locked.set(false); |
| LOG.error("Heartbeat failed, releasing " + EntityLock.this, e); |
| abort.abort("Heartbeat failed", e); |
| return; |
| } |
| if (!isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) { |
| locked.set(true); |
| latch.countDown(); |
| } else if (isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.UNLOCKED) { |
| // Lock timed out. |
| locked.set(false); |
| abort.abort("Lock timed out.", null); |
| return; |
| } |
| |
| try { |
| // If lock not acquired yet, poll faster so we can notify faster. |
| long sleepTime = 1000; |
| if (isLocked()) { |
| // If lock acquired, then use lock timeout to determine heartbeat rate. |
| // If timeout is <heartbeatTimeBuffer, send back to back heartbeats. |
| sleepTime = Math.max(response.getTimeoutMs() - heartbeatTimeBuffer, 1); |
| } |
| if (testingSleepTime != 0) { |
| sleepTime = testingSleepTime; |
| } |
| Thread.sleep(sleepTime); |
| } catch (InterruptedException e) { |
| // Since there won't be any more heartbeats, assume lock will be lost. |
| locked.set(false); |
| if (!this.shutdown) { |
| LOG.error("Interrupted, releasing " + this, e); |
| abort.abort("Worker thread interrupted", e); |
| } |
| return; |
| } |
| } |
| } |
| } |
| } |