blob: da7478f13fef4ec3a7024d5de7d33b6cad2e1fec [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hive.metastore;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
* This is a lock handler implementation for the materializations rebuild.
* It is lightweight: it does not persist any information to metastore db.
* Its states are as follows:
* 1) request lock -> 2) ACQUIRED -> 4) COMMIT_READY -> 6) release lock
* -> 5) EXPIRED ->
* First, the rebuild operation will ACQUIRE the lock. If other rebuild
* operation for the same operation is already running, we lock status
* will be NOT_ACQUIRED.
* Before committing the rebuild, the txn handler will signal the handler
* that it is ready to commit the resource (move state to COMMIT_READY).
* We make sure the lock is still available before moving to the new state.
* A lock will not be able to expire when it is in COMMIT_READY state.
* The unlock method is always call by the txn handler, no matter whether
* the transaction succeeds or not, e.g., due to an Exception.
* From ACQUIRED, locks can be also moved to EXPIRED state when they
* expire. From EXPIRED, they can only be released.
public class MaterializationsRebuildLockHandler {
/* Singleton */
private static final MaterializationsRebuildLockHandler SINGLETON = new MaterializationsRebuildLockHandler();
private final ConcurrentMap<String, ResourceLock> locks = new ConcurrentHashMap<>();
private MaterializationsRebuildLockHandler() {
* Get instance of MaterializationsRebuildLockHandler.
* @return the singleton
public static MaterializationsRebuildLockHandler get() {
* Lock materialized view (first step for rebuild). Response contains a lock id
* that corresponds to the input transaction id, and whether the lock was
* @param dbName the db name of the materialization
* @param tableName the table name of the materialization
* @param txnId the transaction id for the rebuild
* @return the response to the lock request
public LockResponse lockResource(String dbName, String tableName, long txnId) {
final ResourceLock prevResourceLock = locks.putIfAbsent(
Warehouse.getQualifiedName(dbName, tableName),
new ResourceLock(txnId, System.nanoTime(), State.ACQUIRED));
if (prevResourceLock != null) {
return new LockResponse(txnId, LockState.NOT_ACQUIRED);
return new LockResponse(txnId, LockState.ACQUIRED);
* Moves from ACQUIRED state to COMMIT_READY.
* @param dbName the db name of the materialization
* @param tableName the table name of the materialization
* @param txnId the transaction id for the rebuild
* @return true if the lock was still active and we could move the materialization
* to COMMIT_READY state, false otherwise
public boolean readyToCommitResource(String dbName, String tableName, long txnId) {
final ResourceLock prevResourceLock = locks.get(Warehouse.getQualifiedName(dbName, tableName));
if (prevResourceLock == null || prevResourceLock.txnId != txnId) {
// Lock was outdated and it was removed (then maybe another transaction picked it up)
return false;
return prevResourceLock.state.compareAndSet(State.ACQUIRED, State.COMMIT_READY);
* Heartbeats a certain lock and refreshes its timer.
* @param dbName the db name of the materialization
* @param tableName the table name of the materialization
* @param txnId the transaction id for the rebuild
public boolean refreshLockResource(String dbName, String tableName, long txnId) {
final ResourceLock prevResourceLock = locks.get(Warehouse.getQualifiedName(dbName, tableName));
if (prevResourceLock == null || prevResourceLock.txnId != txnId ||
prevResourceLock.state.get() != State.ACQUIRED) {
// Lock was outdated and it was removed (then maybe another transaction picked it up)
// or changed its state
return false;
return true;
* Releases a certain lock.
* @param dbName the db name of the materialization
* @param tableName the table name of the materialization
* @param txnId the transaction id for the rebuild
* @return true if the lock could be released properly, false otherwise
public boolean unlockResource(String dbName, String tableName, long txnId) {
final String fullyQualifiedName = Warehouse.getQualifiedName(dbName, tableName);
final ResourceLock prevResourceLock = locks.get(fullyQualifiedName);
if (prevResourceLock == null || prevResourceLock.txnId != txnId) {
return false;
return locks.remove(fullyQualifiedName, prevResourceLock);
* Method that removes from the handler those locks that have expired.
* @param timeout time after which we consider the locks to have expired
public long cleanupResourceLocks(long timeout) {
long removed = 0L;
final long currentTime = System.currentTimeMillis();
for (Iterator<Map.Entry<String, ResourceLock>> it = locks.entrySet().iterator(); it.hasNext();) {
final ResourceLock resourceLock =;
if (currentTime - resourceLock.lastHeartBeatTime.get() > timeout) {
if (resourceLock.state.compareAndSet(State.ACQUIRED, State.EXPIRED)) {
return removed;
* This class represents a lock that consists of transaction id,
* last refresh time, and state.
private class ResourceLock {
final long txnId;
final AtomicLong lastHeartBeatTime;
final AtomicStateEnum state;
ResourceLock(long txnId, long lastHeartBeatTime, State state) {
this.txnId = txnId;
this.lastHeartBeatTime = new AtomicLong(lastHeartBeatTime);
this.state = new AtomicStateEnum(state);
private enum State {
// This is the initial state for a lock
// This means that the lock is being committed at this instant, hence
// the cleaner should not remove it even if it times out. If transaction
// fails, the finally clause will remove the lock
// This means that the lock is ready to be cleaned, hence it cannot
// be committed anymore
* Wrapper class around State enum to make its operations atomic.
private class AtomicStateEnum {
private final AtomicReference<State> ref;
public AtomicStateEnum(final State initialValue) {
this.ref = new AtomicReference<State>(initialValue);
public void set(final State newValue) {
public State get() {
return this.ref.get();
public State getAndSet(final State newValue) {
return this.ref.getAndSet(newValue);
public boolean compareAndSet(final State expect, final State update) {
return this.ref.compareAndSet(expect, update);