blob: da96dc813a44f30ec1b41267bbd57c8782e3c13f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.common;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.common.TransactionException;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.log4j.Logger;
import com.google.common.base.Preconditions;
import com.sun.tools.javac.code.Attribute.Array;
/**
* Object of this class creates a daemon thread that periodically heartbeats the
* registered transactions and locks to HMS to keep them alive.
* TODO(IMPALA-8788) once we start opening a transaction for every query we should
* re-think our wait policy to spread out RPCs in time.
*/
public class TransactionKeepalive {
public static final Logger LOG = Logger.getLogger(TransactionKeepalive.class);
// TODO: should be calculated from hive.txn.timeout or metastore.txn.timeout
private static final long SLEEP_INTERVAL_SECONDS = 60;
private static final long MILLION = 1000000L;
private static final long BILLION = 1000000000L;
final private Thread daemonThread_;
private final MetaStoreClientPool metaStoreClientPool_;
// Stores information for logging purposes. Stores either a TQueryCtx or a cause
// string. toString() returns the stored TQueryCtx if it is set or the string cause
// otherwise.
public static class HeartbeatContext {
private TQueryCtx queryCtx;
private String cause;
private long creationTime;
public HeartbeatContext(TQueryCtx queryCtx, long creationTime) {
this.queryCtx = queryCtx;
this.creationTime = creationTime;
}
public HeartbeatContext(String cause, long creationTime) {
this.queryCtx = null;
this.cause = "'" + cause + "'";
this.creationTime = creationTime;
}
public String toString() {
if (queryCtx != null) return queryCtx.query_id.toString();
return cause;
}
}
// Map of transactions
private Map<Long, HeartbeatContext> transactions_ = new HashMap<>();
// Maps of locks.
private Map<Long, HeartbeatContext> locks_ = new HashMap<>();
private class DaemonThread implements Runnable {
/**
* Background thread does the periodic heartbeating.
*/
@Override
public void run() {
Random rand = new Random();
try {
// Let's sleep for a random interval to make the different coordinators
// out-of-sync to each other. This way we probably lower the workload on HMS.
Thread.sleep(rand.nextInt((int)(SLEEP_INTERVAL_SECONDS * 1000)));
} catch (Throwable e) {
LOG.error("Unexpected exception thrown", e);
}
while (true) {
try {
// Let's deepcopy the transactions and locks to narrow the critical section.
Map<Long, HeartbeatContext> copyOfTransactions;
Map<Long, HeartbeatContext> copyOfLocks;
synchronized (TransactionKeepalive.this) {
copyOfTransactions = transactions_.entrySet().stream().collect(
Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
copyOfLocks = locks_.entrySet().stream().collect(
Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}
long durationOfHeartbeatingMillis = 0;
if (!copyOfTransactions.isEmpty() || !copyOfLocks.isEmpty()) {
LOG.info("There are " + String.valueOf(copyOfTransactions.size()) +
" open transactions and " + String.valueOf(copyOfLocks.size()) +
" independent locks in TransactionKeepalive. Start heartbeating them.");
long startHeartbeating = System.nanoTime();
sendHeartbeatsFor(copyOfTransactions, copyOfLocks);
durationOfHeartbeatingMillis =
(System.nanoTime() - startHeartbeating) / MILLION;
LOG.info("Heartbeating the transactions and locks took " +
durationOfHeartbeatingMillis + " milliseconds.");
}
long sleepMillis = SLEEP_INTERVAL_SECONDS * 1000 - durationOfHeartbeatingMillis;
if (sleepMillis > 0) {
long randomness = rand.nextInt((int)(sleepMillis / 10));
Thread.sleep(sleepMillis + randomness);
}
} catch (Throwable e) {
LOG.error("Unexpected exception thrown", e);
}
}
}
/**
* Sends heartbeats for transactions and locks that are old enough, i.e. older than
* the sleep interval.
* TODO: we can be more clever than that and should also take into consideration
* metastore.txn.timeout as well.
*/
private void sendHeartbeatsFor(Map<Long, HeartbeatContext> transactions,
Map<Long, HeartbeatContext> locks) {
try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
IMetaStoreClient hmsClient = client.getHiveClient();
for (Map.Entry<Long, HeartbeatContext> entry : transactions.entrySet()) {
HeartbeatContext ctx = entry.getValue();
// Only heartbeat old transactions
if (oldEnough(ctx)) {
Long transactionId = entry.getKey();
sendHeartbeat(hmsClient, transactionId, 0L, ctx);
}
}
for (Map.Entry<Long, HeartbeatContext> entry : locks.entrySet()) {
HeartbeatContext ctx = entry.getValue();
// Only heartbeat old locks
if (oldEnough(ctx)) {
Long lockId = entry.getKey();
sendHeartbeat(hmsClient, 0L, lockId, ctx);
}
}
}
}
/**
* Determines whether a transaction or lock is old enough for heartbeating.
* @param heartbeatContext context information about creation time.
* @return True if we should heartbeat this entry.
*/
private boolean oldEnough(HeartbeatContext heartbeatContext) {
Long ageInSeconds = (System.nanoTime() - heartbeatContext.creationTime) / BILLION;
return ageInSeconds > SLEEP_INTERVAL_SECONDS;
}
/**
* Sends a single heartbeat for 'transactionId' or 'lockId'.
*/
private void sendHeartbeat(IMetaStoreClient hmsClient, long transactionId,
long lockId, HeartbeatContext context) {
// One of the values must be zero, but only one.
Preconditions.checkState(transactionId == 0 || lockId == 0);
Preconditions.checkState(transactionId != 0 || lockId != 0);
try {
if (!MetastoreShim.heartbeat(hmsClient, transactionId, lockId)) {
// Transaction or lock doesn't exist anymore, let's remove them.
if (transactionId != 0) {
LOG.warn("Transaction " + String.valueOf(transactionId) + " of query " +
context.toString() + " doesn't exist anymore. Stop heartbeating it.");
TransactionKeepalive.this.deleteTransaction(transactionId);
}
if (lockId != 0) {
LOG.warn("Lock " + String.valueOf(lockId) + " of query " +
context.toString() + " doesn't exist anymore. Stop heartbeating it.");
TransactionKeepalive.this.deleteLock(lockId);
}
}
} catch (TransactionException e) {
LOG.warn("Caught exception during heartbeating transaction " +
String.valueOf(transactionId) + " lock " + String.valueOf(lockId) +
" for query " + context.toString(), e);
}
}
}
/**
* Creates TransactionKeepalive object and starts the background thread.
*/
public TransactionKeepalive(MetaStoreClientPool metaStoreClientPool) {
Preconditions.checkNotNull(metaStoreClientPool);
metaStoreClientPool_ = metaStoreClientPool;
daemonThread_ = new Thread(new DaemonThread());
daemonThread_.setDaemon(true);
daemonThread_.setName("Transaction keepalive thread");
daemonThread_.start();
}
/**
* Add transaction to heartbeat. Associated locks shouldn't be added.
*/
synchronized public void addTransaction(Long transactionId, HeartbeatContext ctx) {
Preconditions.checkNotNull(transactionId);
Preconditions.checkNotNull(ctx);
Preconditions.checkState(!transactions_.containsKey(transactionId));
transactions_.put(transactionId, ctx);
}
/**
* Add lock to heartbeat. This should be a lock without a transaction context.
*/
synchronized public void addLock(Long lockId, HeartbeatContext ctx) {
Preconditions.checkNotNull(lockId);
Preconditions.checkNotNull(ctx);
Preconditions.checkState(!locks_.containsKey(lockId));
locks_.put(lockId, ctx);
}
/**
* Stop heartbeating transaction.
*/
synchronized public void deleteTransaction(Long transactionId) {
Preconditions.checkNotNull(transactionId);
if (transactions_.remove(transactionId) == null) {
LOG.info("Transaction id " + transactionId + " was already removed from " +
"TransactionKeepalive object or never existed.");
};
}
/**
* Stop heartbeating lock.
*/
synchronized public void deleteLock(Long lockId) {
Preconditions.checkNotNull(lockId);
if (locks_.remove(lockId) == null) {
LOG.info("Lock id " + lockId + " was already removed from " +
"TransactionKeepalive object or never existed.");
};
}
}