blob: da7478f13fef4ec3a7024d5de7d33b6cad2e1fec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* This is a lock handler implementation for the materializations rebuild.
* It is lightweight: it does not persist any information to metastore db.
* Its states are as follows:
* 1) request lock -> 2) ACQUIRED -> 4) COMMIT_READY -> 6) release lock
* -> 5) EXPIRED ->
* -> 3) NOT_ACQUIRED
* First, the rebuild operation will ACQUIRE the lock. If other rebuild
* operation for the same operation is already running, we lock status
* will be NOT_ACQUIRED.
* Before committing the rebuild, the txn handler will signal the handler
* that it is ready to commit the resource (move state to COMMIT_READY).
* We make sure the lock is still available before moving to the new state.
* A lock will not be able to expire when it is in COMMIT_READY state.
* The unlock method is always call by the txn handler, no matter whether
* the transaction succeeds or not, e.g., due to an Exception.
* From ACQUIRED, locks can be also moved to EXPIRED state when they
* expire. From EXPIRED, they can only be released.
*/
public class MaterializationsRebuildLockHandler {
/* Singleton */
private static final MaterializationsRebuildLockHandler SINGLETON = new MaterializationsRebuildLockHandler();
private final ConcurrentMap<String, ResourceLock> locks = new ConcurrentHashMap<>();
private MaterializationsRebuildLockHandler() {
}
/**
* Get instance of MaterializationsRebuildLockHandler.
*
* @return the singleton
*/
public static MaterializationsRebuildLockHandler get() {
return SINGLETON;
}
/**
* Lock materialized view (first step for rebuild). Response contains a lock id
* that corresponds to the input transaction id, and whether the lock was
* ACQUIRED or NOT_ACQUIRED.
* @param dbName the db name of the materialization
* @param tableName the table name of the materialization
* @param txnId the transaction id for the rebuild
* @return the response to the lock request
*/
public LockResponse lockResource(String dbName, String tableName, long txnId) {
final ResourceLock prevResourceLock = locks.putIfAbsent(
Warehouse.getQualifiedName(dbName, tableName),
new ResourceLock(txnId, System.nanoTime(), State.ACQUIRED));
if (prevResourceLock != null) {
return new LockResponse(txnId, LockState.NOT_ACQUIRED);
}
return new LockResponse(txnId, LockState.ACQUIRED);
}
/**
* Moves from ACQUIRED state to COMMIT_READY.
* @param dbName the db name of the materialization
* @param tableName the table name of the materialization
* @param txnId the transaction id for the rebuild
* @return true if the lock was still active and we could move the materialization
* to COMMIT_READY state, false otherwise
*/
public boolean readyToCommitResource(String dbName, String tableName, long txnId) {
final ResourceLock prevResourceLock = locks.get(Warehouse.getQualifiedName(dbName, tableName));
if (prevResourceLock == null || prevResourceLock.txnId != txnId) {
// Lock was outdated and it was removed (then maybe another transaction picked it up)
return false;
}
return prevResourceLock.state.compareAndSet(State.ACQUIRED, State.COMMIT_READY);
}
/**
* Heartbeats a certain lock and refreshes its timer.
* @param dbName the db name of the materialization
* @param tableName the table name of the materialization
* @param txnId the transaction id for the rebuild
*/
public boolean refreshLockResource(String dbName, String tableName, long txnId) {
final ResourceLock prevResourceLock = locks.get(Warehouse.getQualifiedName(dbName, tableName));
if (prevResourceLock == null || prevResourceLock.txnId != txnId ||
prevResourceLock.state.get() != State.ACQUIRED) {
// Lock was outdated and it was removed (then maybe another transaction picked it up)
// or changed its state
return false;
}
prevResourceLock.lastHeartBeatTime.set(System.currentTimeMillis());
return true;
}
/**
* Releases a certain lock.
* @param dbName the db name of the materialization
* @param tableName the table name of the materialization
* @param txnId the transaction id for the rebuild
* @return true if the lock could be released properly, false otherwise
*/
public boolean unlockResource(String dbName, String tableName, long txnId) {
final String fullyQualifiedName = Warehouse.getQualifiedName(dbName, tableName);
final ResourceLock prevResourceLock = locks.get(fullyQualifiedName);
if (prevResourceLock == null || prevResourceLock.txnId != txnId) {
return false;
}
return locks.remove(fullyQualifiedName, prevResourceLock);
}
/**
* Method that removes from the handler those locks that have expired.
* @param timeout time after which we consider the locks to have expired
*/
public long cleanupResourceLocks(long timeout) {
long removed = 0L;
final long currentTime = System.currentTimeMillis();
for (Iterator<Map.Entry<String, ResourceLock>> it = locks.entrySet().iterator(); it.hasNext();) {
final ResourceLock resourceLock = it.next().getValue();
if (currentTime - resourceLock.lastHeartBeatTime.get() > timeout) {
if (resourceLock.state.compareAndSet(State.ACQUIRED, State.EXPIRED)) {
it.remove();
removed++;
}
}
}
return removed;
}
/**
* This class represents a lock that consists of transaction id,
* last refresh time, and state.
*/
private class ResourceLock {
final long txnId;
final AtomicLong lastHeartBeatTime;
final AtomicStateEnum state;
ResourceLock(long txnId, long lastHeartBeatTime, State state) {
this.txnId = txnId;
this.lastHeartBeatTime = new AtomicLong(lastHeartBeatTime);
this.state = new AtomicStateEnum(state);
}
}
private enum State {
// This is the initial state for a lock
ACQUIRED,
// This means that the lock is being committed at this instant, hence
// the cleaner should not remove it even if it times out. If transaction
// fails, the finally clause will remove the lock
COMMIT_READY,
// This means that the lock is ready to be cleaned, hence it cannot
// be committed anymore
EXPIRED;
}
/**
* Wrapper class around State enum to make its operations atomic.
*/
private class AtomicStateEnum {
private final AtomicReference<State> ref;
public AtomicStateEnum(final State initialValue) {
this.ref = new AtomicReference<State>(initialValue);
}
public void set(final State newValue) {
this.ref.set(newValue);
}
public State get() {
return this.ref.get();
}
public State getAndSet(final State newValue) {
return this.ref.getAndSet(newValue);
}
public boolean compareAndSet(final State expect, final State update) {
return this.ref.compareAndSet(expect, update);
}
}
}