| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hive.metastore.txn; |
| |
| import org.apache.hadoop.hive.common.classification.RetrySemantics; |
| import org.apache.hadoop.hive.metastore.api.CompactionType; |
| import org.apache.hadoop.hive.metastore.api.MetaException; |
| import org.apache.hadoop.hive.metastore.conf.MetastoreConf; |
| import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; |
| import org.apache.hadoop.util.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.sql.Connection; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| /** |
| * Extends the transaction handler with methods needed only by the compactor threads. These |
| * methods are not available through the thrift interface. |
| */ |
| class CompactionTxnHandler extends TxnHandler { |
| static final private String CLASS_NAME = CompactionTxnHandler.class.getName(); |
| static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); |
| |
| public CompactionTxnHandler() { |
| } |
| |
| /** |
| * This will look through the completed_txn_components table and look for partitions or tables |
| * that may be ready for compaction. Also, look through txns and txn_components tables for |
| * aborted transactions that we should add to the list. |
| * @param abortedThreshold number of aborted queries forming a potential compaction request. |
| * @return list of CompactionInfo structs. These will not have id, type, |
| * or runAs set since these are only potential compactions not actual ones. |
| */ |
| @Override |
| @RetrySemantics.ReadOnly |
| public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedTimeThreshold) |
| throws MetaException { |
| return findPotentialCompactions(abortedThreshold, abortedTimeThreshold, -1); |
| } |
| |
| @Override |
| @RetrySemantics.ReadOnly |
| public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, |
| long abortedTimeThreshold, long checkInterval) throws MetaException { |
| Connection dbConn = null; |
| Set<CompactionInfo> response = new HashSet<>(); |
| Statement stmt = null; |
| ResultSet rs = null; |
| try { |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| // Check for completed transactions |
| final String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\"" + |
| ".\"CTC_PARTITION\" " + |
| "FROM \"COMPLETED_TXN_COMPONENTS\" \"TC\" " + (checkInterval > 0 ? |
| "LEFT JOIN ( " + |
| " SELECT \"C1\".* FROM \"COMPLETED_COMPACTIONS\" \"C1\" " + |
| " INNER JOIN ( " + |
| " SELECT MAX(\"CC_ID\") \"CC_ID\" FROM \"COMPLETED_COMPACTIONS\" " + |
| " GROUP BY \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\"" + |
| " ) \"C2\" " + |
| " ON \"C1\".\"CC_ID\" = \"C2\".\"CC_ID\" " + |
| " WHERE \"C1\".\"CC_STATE\" IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" + |
| ") \"C\" " + |
| "ON \"TC\".\"CTC_DATABASE\" = \"C\".\"CC_DATABASE\" AND \"TC\".\"CTC_TABLE\" = \"C\".\"CC_TABLE\" " + |
| " AND (\"TC\".\"CTC_PARTITION\" = \"C\".\"CC_PARTITION\" OR (\"TC\".\"CTC_PARTITION\" IS NULL AND \"C\".\"CC_PARTITION\" IS NULL)) " + |
| "WHERE \"C\".\"CC_ID\" IS NOT NULL OR " + isWithinCheckInterval("\"TC\".\"CTC_TIMESTAMP\"", checkInterval) : ""); |
| |
| LOG.debug("Going to execute query <" + s + ">"); |
| rs = stmt.executeQuery(s); |
| while (rs.next()) { |
| CompactionInfo info = new CompactionInfo(); |
| info.dbname = rs.getString(1); |
| info.tableName = rs.getString(2); |
| info.partName = rs.getString(3); |
| response.add(info); |
| } |
| rs.close(); |
| |
| // Check for aborted txns: number of aborted txns past threshold and age of aborted txns |
| // past time threshold |
| boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0; |
| final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"," |
| + "MIN(\"TXN_STARTED\"), COUNT(*)" |
| + "FROM \"TXNS\", \"TXN_COMPONENTS\" " |
| + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " |
| + "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"" |
| + (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold); |
| |
| LOG.debug("Going to execute query <" + sCheckAborted + ">"); |
| rs = stmt.executeQuery(sCheckAborted); |
| long systemTime = System.currentTimeMillis(); |
| while (rs.next()) { |
| boolean pastTimeThreshold = |
| checkAbortedTimeThreshold && rs.getLong(4) + abortedTimeThreshold < systemTime; |
| int numAbortedTxns = rs.getInt(5); |
| if (numAbortedTxns > abortedThreshold || pastTimeThreshold) { |
| CompactionInfo info = new CompactionInfo(); |
| info.dbname = rs.getString(1); |
| info.tableName = rs.getString(2); |
| info.partName = rs.getString(3); |
| info.tooManyAborts = numAbortedTxns > abortedThreshold; |
| info.hasOldAbort = pastTimeThreshold; |
| response.add(info); |
| } |
| } |
| |
| LOG.debug("Going to rollback"); |
| dbConn.rollback(); |
| } catch (SQLException e) { |
| LOG.error("Unable to connect to transaction database " + e.getMessage()); |
| checkRetryable(dbConn, e, |
| "findPotentialCompactions(maxAborted:" + abortedThreshold |
| + ", abortedTimeThreshold:" + abortedTimeThreshold + ")"); |
| } finally { |
| close(rs, stmt, dbConn); |
| } |
| return response; |
| } |
| catch (RetryException e) { |
| return findPotentialCompactions(abortedThreshold, abortedTimeThreshold, checkInterval); |
| } |
| } |
| |
| /** |
| * This will grab the next compaction request off of |
| * the queue, and assign it to the worker. |
| * @param workerId id of the worker calling this, will be recorded in the db |
| * @return an info element for this compaction request, or null if there is no work to do now. |
| */ |
| @Override |
| @RetrySemantics.SafeToRetry |
| public CompactionInfo findNextToCompact(String workerId) throws MetaException { |
| try { |
| Connection dbConn = null; |
| Statement stmt = null; |
| //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725) |
| Statement updStmt = null; |
| ResultSet rs = null; |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + |
| "\"CQ_TYPE\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "'"; |
| LOG.debug("Going to execute query <" + s + ">"); |
| rs = stmt.executeQuery(s); |
| if (!rs.next()) { |
| LOG.debug("No compactions found ready to compact"); |
| dbConn.rollback(); |
| return null; |
| } |
| updStmt = dbConn.createStatement(); |
| do { |
| CompactionInfo info = new CompactionInfo(); |
| info.id = rs.getLong(1); |
| info.dbname = rs.getString(2); |
| info.tableName = rs.getString(3); |
| info.partName = rs.getString(4); |
| info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); |
| info.properties = rs.getString(6); |
| // Now, update this record as being worked on by this worker. |
| long now = getDbTime(dbConn); |
| s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = '" + workerId + "', " + |
| "\"CQ_START\" = " + now + ", \"CQ_STATE\" = '" + WORKING_STATE + "' WHERE \"CQ_ID\" = " + info.id + |
| " AND \"CQ_STATE\"='" + INITIATED_STATE + "'"; |
| LOG.debug("Going to execute update <" + s + ">"); |
| int updCount = updStmt.executeUpdate(s); |
| if(updCount == 1) { |
| dbConn.commit(); |
| return info; |
| } |
| if(updCount == 0) { |
| LOG.debug("Another Worker picked up " + info); |
| continue; |
| } |
| LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " + |
| info + ". updCnt=" + updCount + "."); |
| dbConn.rollback(); |
| return null; |
| } while( rs.next()); |
| dbConn.rollback(); |
| return null; |
| } catch (SQLException e) { |
| LOG.error("Unable to select next element for compaction, " + e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "findNextToCompact(workerId:" + workerId + ")"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| closeStmt(updStmt); |
| close(rs, stmt, dbConn); |
| } |
| } catch (RetryException e) { |
| return findNextToCompact(workerId); |
| } |
| } |
| |
| /** |
| * This will mark an entry in the queue as compacted |
| * and put it in the ready to clean state. |
| * @param info info on the compaction entry to mark as compacted. |
| */ |
| @Override |
| @RetrySemantics.SafeToRetry |
| public void markCompacted(CompactionInfo info) throws MetaException { |
| try { |
| Connection dbConn = null; |
| Statement stmt = null; |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " |
| + "\"CQ_WORKER_ID\" = NULL, \"CQ_NEXT_TXN_ID\" = " |
| + "(SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\")" |
| + " WHERE \"CQ_ID\" = " + info.id; |
| LOG.debug("Going to execute update <" + s + ">"); |
| int updCnt = stmt.executeUpdate(s); |
| if (updCnt != 1) { |
| LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for compaction record: " + info + ". updCnt=" + updCnt); |
| LOG.debug("Going to rollback"); |
| dbConn.rollback(); |
| } |
| LOG.debug("Going to commit"); |
| dbConn.commit(); |
| } catch (SQLException e) { |
| LOG.error("Unable to update compaction queue " + e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "markCompacted(" + info + ")"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| closeStmt(stmt); |
| closeDbConn(dbConn); |
| } |
| } catch (RetryException e) { |
| markCompacted(info); |
| } |
| } |
| |
| /** |
| * Find entries in the queue that are ready to |
| * be cleaned. |
| * @return information on the entry in the queue. |
| */ |
| @Override |
| @RetrySemantics.ReadOnly |
| public List<CompactionInfo> findReadyToClean() throws MetaException { |
| Connection dbConn = null; |
| List<CompactionInfo> rc = new ArrayList<>(); |
| |
| Statement stmt = null; |
| ResultSet rs = null; |
| try { |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " |
| + "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" |
| + READY_FOR_CLEANING + "'"; |
| LOG.debug("Going to execute query <" + s + ">"); |
| rs = stmt.executeQuery(s); |
| while (rs.next()) { |
| CompactionInfo info = new CompactionInfo(); |
| info.id = rs.getLong(1); |
| info.dbname = rs.getString(2); |
| info.tableName = rs.getString(3); |
| info.partName = rs.getString(4); |
| switch (rs.getString(5).charAt(0)) { |
| case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; |
| case MINOR_TYPE: info.type = CompactionType.MINOR; break; |
| default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); |
| } |
| info.runAs = rs.getString(6); |
| info.highestWriteId = rs.getLong(7); |
| rc.add(info); |
| } |
| LOG.debug("Going to rollback"); |
| dbConn.rollback(); |
| return rc; |
| } catch (SQLException e) { |
| LOG.error("Unable to select next element for cleaning, " + e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "findReadyToClean"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| close(rs, stmt, dbConn); |
| } |
| } catch (RetryException e) { |
| return findReadyToClean(); |
| } |
| } |
| |
| /** |
| * This will remove an entry from the queue after |
| * it has been compacted. |
| * |
| * @param info info on the compaction entry to remove |
| */ |
| @Override |
| @RetrySemantics.CannotRetry |
| public void markCleaned(CompactionInfo info) throws MetaException { |
| try { |
| Connection dbConn = null; |
| PreparedStatement pStmt = null; |
| ResultSet rs = null; |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " |
| + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", " |
| + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\" " |
| + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); |
| pStmt.setLong(1, info.id); |
| rs = pStmt.executeQuery(); |
| if(rs.next()) { |
| info = CompactionInfo.loadFullFromCompactionQueue(rs); |
| } |
| else { |
| throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE"); |
| } |
| close(rs); |
| String s = "delete from \"COMPACTION_QUEUE\" where \"CQ_ID\" = ?"; |
| pStmt = dbConn.prepareStatement(s); |
| pStmt.setLong(1, info.id); |
| LOG.debug("Going to execute update <" + s + ">"); |
| int updCount = pStmt.executeUpdate(); |
| if (updCount != 1) { |
| LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount); |
| LOG.debug("Going to rollback"); |
| dbConn.rollback(); |
| } |
| pStmt = dbConn.prepareStatement("INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", " |
| + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", " |
| + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", " |
| + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\")" |
| + " VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); |
| info.state = SUCCEEDED_STATE; |
| CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); |
| updCount = pStmt.executeUpdate(); |
| |
| // Remove entries from completed_txn_components as well, so we don't start looking there |
| // again but only up to the highest write ID include in this compaction job. |
| //highestWriteId will be NULL in upgrade scenarios |
| s = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\" = ? AND " + |
| "\"CTC_TABLE\" = ?"; |
| if (info.partName != null) { |
| s += " AND \"CTC_PARTITION\" = ?"; |
| } |
| if(info.highestWriteId != 0) { |
| s += " AND \"CTC_WRITEID\" <= ?"; |
| } |
| pStmt = dbConn.prepareStatement(s); |
| int paramCount = 1; |
| pStmt.setString(paramCount++, info.dbname); |
| pStmt.setString(paramCount++, info.tableName); |
| if (info.partName != null) { |
| pStmt.setString(paramCount++, info.partName); |
| } |
| if(info.highestWriteId != 0) { |
| pStmt.setLong(paramCount++, info.highestWriteId); |
| } |
| LOG.debug("Going to execute update <" + s + ">"); |
| if ((updCount = pStmt.executeUpdate()) < 1) { |
| LOG.error("Expected to remove at least one row from completed_txn_components when " + |
| "marking compaction entry as clean!"); |
| } |
| /** |
| * compaction may remove data from aborted txns above tc_writeid bit it only guarantees to |
| * remove it up to (inclusive) tc_writeid, so it's critical to not remove metadata about |
| * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns). |
| * See {@link ql.txn.compactor.Cleaner.removeFiles()} |
| */ |
| s = "SELECT DISTINCT \"TXN_ID\" FROM \"TXNS\", \"TXN_COMPONENTS\" WHERE \"TXN_ID\" = \"TC_TXNID\" " |
| + "AND \"TXN_STATE\" = '" + TXN_ABORTED + "' AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?"; |
| if (info.highestWriteId != 0) s += " AND \"TC_WRITEID\" <= ?"; |
| if (info.partName != null) s += " AND \"TC_PARTITION\" = ?"; |
| |
| pStmt = dbConn.prepareStatement(s); |
| paramCount = 1; |
| pStmt.setString(paramCount++, info.dbname); |
| pStmt.setString(paramCount++, info.tableName); |
| if(info.highestWriteId != 0) { |
| pStmt.setLong(paramCount++, info.highestWriteId); |
| } |
| if (info.partName != null) { |
| pStmt.setString(paramCount++, info.partName); |
| } |
| |
| LOG.debug("Going to execute update <" + s + ">"); |
| rs = pStmt.executeQuery(); |
| List<Long> txnids = new ArrayList<>(); |
| List<String> questions = new ArrayList<>(); |
| while (rs.next()) { |
| long id = rs.getLong(1); |
| txnids.add(id); |
| questions.add("?"); |
| } |
| // Remove entries from txn_components, as there may be aborted txn components |
| if (txnids.size() > 0) { |
| List<String> queries = new ArrayList<>(); |
| |
| // Prepare prefix and suffix |
| StringBuilder prefix = new StringBuilder(); |
| StringBuilder suffix = new StringBuilder(); |
| |
| prefix.append("DELETE FROM \"TXN_COMPONENTS\" WHERE "); |
| |
| //because 1 txn may include different partitions/tables even in auto commit mode |
| suffix.append(" AND \"TC_DATABASE\" = ?"); |
| suffix.append(" AND \"TC_TABLE\" = ?"); |
| if (info.partName != null) { |
| suffix.append(" AND \"TC_PARTITION\" = ?"); |
| } |
| |
| // Populate the complete query with provided prefix and suffix |
| List<Integer> counts = TxnUtils |
| .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "\"TC_TXNID\"", |
| true, false); |
| int totalCount = 0; |
| for (int i = 0; i < queries.size(); i++) { |
| String query = queries.get(i); |
| int insertCount = counts.get(i); |
| |
| LOG.debug("Going to execute update <" + query + ">"); |
| pStmt = dbConn.prepareStatement(query); |
| for (int j = 0; j < insertCount; j++) { |
| pStmt.setLong(j + 1, txnids.get(totalCount + j)); |
| } |
| totalCount += insertCount; |
| paramCount = insertCount + 1; |
| pStmt.setString(paramCount++, info.dbname); |
| pStmt.setString(paramCount++, info.tableName); |
| if (info.partName != null) { |
| pStmt.setString(paramCount++, info.partName); |
| } |
| int rc = pStmt.executeUpdate(); |
| LOG.debug("Removed " + rc + " records from txn_components"); |
| |
| // Don't bother cleaning from the txns table. A separate call will do that. We don't |
| // know here which txns still have components from other tables or partitions in the |
| // table, so we don't know which ones we can and cannot clean. |
| } |
| } |
| |
| LOG.debug("Going to commit"); |
| dbConn.commit(); |
| } catch (SQLException e) { |
| LOG.error("Unable to delete from compaction queue " + e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "markCleaned(" + info + ")"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| close(rs, pStmt, dbConn); |
| } |
| } catch (RetryException e) { |
| markCleaned(info); |
| } |
| } |
| /** |
| * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by |
| * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)). |
| */ |
| @Override |
| @RetrySemantics.SafeToRetry |
| public void cleanTxnToWriteIdTable() throws MetaException { |
| try { |
| Connection dbConn = null; |
| Statement stmt = null; |
| ResultSet rs = null; |
| |
| try { |
| // We query for minimum values in all the queries and they can only increase by any concurrent |
| // operations. So, READ COMMITTED is sufficient. |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| |
| // First need to find the min_uncommitted_txnid which is currently seen by any open transactions. |
| // If there are no txns which are currently open or aborted in the system, then current value of |
| // max(TXNS.txn_id) could be min_uncommitted_txnid. |
| String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" + |
| "SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\" " + |
| "UNION " + |
| "SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " + |
| "UNION " + |
| "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) + |
| " OR \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + |
| ") \"RES\""; |
| LOG.debug("Going to execute query <" + s + ">"); |
| rs = stmt.executeQuery(s); |
| if (!rs.next()) { |
| throw new MetaException("Transaction tables not properly initialized, no record found in TXNS"); |
| } |
| long minUncommitedTxnid = rs.getLong(1); |
| |
| // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed |
| // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. |
| s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommitedTxnid; |
| LOG.debug("Going to execute delete <" + s + ">"); |
| int rc = stmt.executeUpdate(s); |
| LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommitedTxnid); |
| |
| LOG.debug("Going to commit"); |
| dbConn.commit(); |
| } catch (SQLException e) { |
| LOG.error("Unable to delete from txns table " + e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "cleanTxnToWriteIdTable"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| close(rs, stmt, dbConn); |
| } |
| } catch (RetryException e) { |
| cleanTxnToWriteIdTable(); |
| } |
| } |
| |
| /** |
| * Clean up aborted / committed transactions from txns that have no components in txn_components. |
| * The committed txns are left there for TXN_OPENTXN_TIMEOUT window period intentionally. |
| * The reason such aborted txns exist can be that now work was done in this txn |
| * (e.g. Streaming opened TransactionBatch and abandoned it w/o doing any work) |
| * or due to {@link #markCleaned(CompactionInfo)} being called. |
| */ |
| @Override |
| @RetrySemantics.SafeToRetry |
| public void cleanEmptyAbortedAndCommittedTxns() throws MetaException { |
| LOG.info("Start to clean empty aborted or committed TXNS"); |
| try { |
| Connection dbConn = null; |
| Statement stmt = null; |
| ResultSet rs = null; |
| try { |
| //Aborted and committed are terminal states, so nothing about the txn can change |
| //after that, so READ COMMITTED is sufficient. |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| /** |
| * Only delete aborted / committed transaction in a way that guarantees two things: |
| * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window |
| * 2. never deletes the maximum txnId even if it is before the TXN_OPENTXN_TIMEOUT window |
| */ |
| long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); |
| |
| String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + |
| "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + |
| " (\"TXN_STATE\" = '" + TXN_ABORTED + "' OR \"TXN_STATE\" = '" + TXN_COMMITTED + "') AND " |
| + " \"TXN_ID\" < " + lowWaterMark; |
| LOG.debug("Going to execute query <" + s + ">"); |
| rs = stmt.executeQuery(s); |
| List<Long> txnids = new ArrayList<>(); |
| while (rs.next()) txnids.add(rs.getLong(1)); |
| close(rs); |
| if(txnids.size() <= 0) { |
| return; |
| } |
| Collections.sort(txnids);//easier to read logs |
| |
| List<String> queries = new ArrayList<>(); |
| StringBuilder prefix = new StringBuilder(); |
| StringBuilder suffix = new StringBuilder(); |
| |
| // Delete from TXNS. |
| prefix.append("DELETE FROM \"TXNS\" WHERE "); |
| |
| TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false); |
| |
| for (String query : queries) { |
| LOG.debug("Going to execute update <" + query + ">"); |
| int rc = stmt.executeUpdate(query); |
| LOG.debug("Removed " + rc + " empty Aborted and Committed transactions from TXNS"); |
| } |
| LOG.info("Aborted and committed transactions removed from TXNS: " + txnids); |
| LOG.debug("Going to commit"); |
| dbConn.commit(); |
| } catch (SQLException e) { |
| LOG.error("Unable to delete from txns table " + e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "cleanEmptyAbortedTxns"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| close(rs, stmt, dbConn); |
| } |
| } catch (RetryException e) { |
| cleanEmptyAbortedAndCommittedTxns(); |
| } |
| } |
| |
| /** |
| * This will take all entries assigned to workers |
| * on a host return them to INITIATED state. The initiator should use this at start up to |
| * clean entries from any workers that were in the middle of compacting when the metastore |
| * shutdown. It does not reset entries from worker threads on other hosts as those may still |
| * be working. |
| * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, |
| * so that like hostname% will match the worker id. |
| */ |
| @Override |
| @RetrySemantics.Idempotent |
| public void revokeFromLocalWorkers(String hostname) throws MetaException { |
| try { |
| Connection dbConn = null; |
| Statement stmt = null; |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL, \"CQ_STATE\" = '" |
| + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' AND \"CQ_WORKER_ID\" LIKE '" |
| + hostname + "%'"; |
| LOG.debug("Going to execute update <" + s + ">"); |
| // It isn't an error if the following returns no rows, as the local workers could have died |
| // with nothing assigned to them. |
| stmt.executeUpdate(s); |
| LOG.debug("Going to commit"); |
| dbConn.commit(); |
| } catch (SQLException e) { |
| LOG.error("Unable to change dead worker's records back to initiated state " + |
| e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "revokeFromLocalWorkers(hostname:" + hostname +")"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| closeStmt(stmt); |
| closeDbConn(dbConn); |
| } |
| } catch (RetryException e) { |
| revokeFromLocalWorkers(hostname); |
| } |
| } |
| |
| /** |
| * This call will return all compaction queue |
| * entries assigned to a worker but over the timeout back to the initiated state. |
| * This should be called by the initiator on start up and occasionally when running to clean up |
| * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called |
| * first. |
| * @param timeout number of milliseconds since start time that should elapse before a worker is |
| * declared dead. |
| */ |
| @Override |
| @RetrySemantics.Idempotent |
| public void revokeTimedoutWorkers(long timeout) throws MetaException { |
| try { |
| Connection dbConn = null; |
| Statement stmt = null; |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| long latestValidStart = getDbTime(dbConn) - timeout; |
| stmt = dbConn.createStatement(); |
| String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL, \"CQ_STATE\" = '" |
| + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' AND \"CQ_START\" < " |
| + latestValidStart; |
| LOG.debug("Going to execute update <" + s + ">"); |
| // It isn't an error if the following returns no rows, as the local workers could have died |
| // with nothing assigned to them. |
| stmt.executeUpdate(s); |
| LOG.debug("Going to commit"); |
| dbConn.commit(); |
| } catch (SQLException e) { |
| LOG.error("Unable to change dead worker's records back to initiated state " + |
| e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "revokeTimedoutWorkers(timeout:" + timeout + ")"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| closeStmt(stmt); |
| closeDbConn(dbConn); |
| } |
| } catch (RetryException e) { |
| revokeTimedoutWorkers(timeout); |
| } |
| } |
| |
| /** |
| * Queries metastore DB directly to find columns in the table which have statistics information. |
| * If {@code ci} includes partition info then per partition stats info is examined, otherwise |
| * table level stats are examined. |
| * @throws MetaException |
| */ |
| @Override |
| @RetrySemantics.ReadOnly |
| public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException { |
| Connection dbConn = null; |
| PreparedStatement pStmt = null; |
| ResultSet rs = null; |
| try { |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| String quote = getIdentifierQuoteString(dbConn); |
| StringBuilder bldr = new StringBuilder(); |
| bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) |
| .append(" FROM ") |
| .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")) |
| .append(quote) |
| .append(" WHERE ") |
| .append(quote).append("DB_NAME").append(quote).append(" = ?") |
| .append(" AND ").append(quote).append("TABLE_NAME").append(quote) |
| .append(" = ?"); |
| if (ci.partName != null) { |
| bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = ?"); |
| } |
| String s = bldr.toString(); |
| pStmt = dbConn.prepareStatement(s); |
| pStmt.setString(1, ci.dbname); |
| pStmt.setString(2, ci.tableName); |
| if (ci.partName != null) { |
| pStmt.setString(3, ci.partName); |
| } |
| |
| /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : |
| "PART_COL_STATS") |
| + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'" |
| + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/ |
| LOG.debug("Going to execute <" + s + ">"); |
| rs = pStmt.executeQuery(); |
| List<String> columns = new ArrayList<>(); |
| while (rs.next()) { |
| columns.add(rs.getString(1)); |
| } |
| LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName + |
| (ci.partName == null ? "" : "/" + ci.partName)); |
| dbConn.commit(); |
| return columns; |
| } catch (SQLException e) { |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName + |
| (ci.partName == null ? "" : "/" + ci.partName) + ")"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| close(rs, pStmt, dbConn); |
| } |
| } catch (RetryException ex) { |
| return findColumnsWithStats(ci); |
| } |
| } |
| @Override |
| public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws MetaException { |
| Connection dbConn = null; |
| Statement stmt = null; |
| try { |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| String sqlText = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HIGHEST_WRITE_ID\" = " + |
| ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) + |
| " WHERE \"CQ_ID\" = " + ci.id; |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("About to execute: " + sqlText); |
| } |
| int updCount = stmt.executeUpdate(sqlText); |
| if(updCount != 1) { |
| throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci); |
| } |
| /*We make an entry in TXN_COMPONENTS for the partition/table that the compactor is |
| * working on in case this txn aborts and so we need to ensure that its TXNS entry is |
| * not removed until Cleaner has removed all files that this txn may have written, i.e. |
| * make it work the same way as any other write. TC_WRITEID is set to the highest |
| * WriteId that this compactor run considered since there compactor doesn't allocate |
| * a new write id (so as not to invalidate result set caches/materialized views) but |
| * we need to set it to something to that markCleaned() only cleans TXN_COMPONENTS up to |
| * the level to which aborted files/data has been cleaned.*/ |
| sqlText = "INSERT INTO \"TXN_COMPONENTS\"(" + |
| "\"TC_TXNID\", " + |
| "\"TC_DATABASE\", " + |
| "\"TC_TABLE\", " + |
| (ci.partName == null ? "" : "\"TC_PARTITION\", ") + |
| "\"TC_WRITEID\", " + |
| "\"TC_OPERATION_TYPE\")" + |
| " VALUES(" + |
| compactionTxnId + "," + |
| quoteString(ci.dbname) + "," + |
| quoteString(ci.tableName) + "," + |
| (ci.partName == null ? "" : quoteString(ci.partName) + ",") + |
| ci.highestWriteId + ", " + |
| quoteChar(OperationType.COMPACT.getSqlConst()) + ")"; |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("About to execute: " + sqlText); |
| } |
| updCount = stmt.executeUpdate(sqlText); |
| if(updCount != 1) { |
| throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci); |
| } |
| dbConn.commit(); |
| } catch (SQLException e) { |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "updateCompactorState(" + ci + "," + compactionTxnId +")"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| close(null, stmt, dbConn); |
| } |
| } catch (RetryException ex) { |
| updateCompactorState(ci, compactionTxnId); |
| } |
| } |
| private static class RetentionCounters { |
| int attemptedRetention = 0; |
| int failedRetention = 0; |
| int succeededRetention = 0; |
| RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) { |
| this.attemptedRetention = attemptedRetention; |
| this.failedRetention = failedRetention; |
| this.succeededRetention = succeededRetention; |
| } |
| } |
| private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, RetentionCounters rc) { |
| switch (ci.state) { |
| case ATTEMPTED_STATE: |
| if(--rc.attemptedRetention < 0) { |
| deleteSet.add(ci.id); |
| } |
| break; |
| case FAILED_STATE: |
| if(--rc.failedRetention < 0) { |
| deleteSet.add(ci.id); |
| } |
| break; |
| case SUCCEEDED_STATE: |
| if(--rc.succeededRetention < 0) { |
| deleteSet.add(ci.id); |
| } |
| break; |
| default: |
| //do nothing to hanlde future RU/D where we may want to add new state types |
| } |
| } |
| |
| /** |
| * For any given compactable entity (partition; table if not partitioned) the history of compactions |
| * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the |
| * history such that a configurable number of each type of state is present. Any other entries |
| * can be purged. This scheme has advantage of always retaining the last failure/success even if |
| * it's not recent. |
| * @throws MetaException |
| */ |
| @Override |
| @RetrySemantics.SafeToRetry |
| public void purgeCompactionHistory() throws MetaException { |
| Connection dbConn = null; |
| Statement stmt = null; |
| PreparedStatement pStmt = null; |
| ResultSet rs = null; |
| List<Long> deleteSet = new ArrayList<>(); |
| RetentionCounters rc = null; |
| try { |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| /*cc_id is monotonically increasing so for any entity sorts in order of compaction history, |
| thus this query groups by entity and withing group sorts most recent first*/ |
| rs = stmt.executeQuery("SELECT \"CC_ID\", \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\" " |
| + "FROM \"COMPLETED_COMPACTIONS\" ORDER BY \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_ID\" DESC"); |
| String lastCompactedEntity = null; |
| /*In each group, walk from most recent and count occurences of each state type. Once you |
| * have counted enough (for each state) to satisfy retention policy, delete all other |
| * instances of this status.*/ |
| while(rs.next()) { |
| CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0)); |
| if(!ci.getFullPartitionName().equals(lastCompactedEntity)) { |
| lastCompactedEntity = ci.getFullPartitionName(); |
| rc = new RetentionCounters(MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), |
| getFailedCompactionRetention(), |
| MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED)); |
| } |
| checkForDeletion(deleteSet, ci, rc); |
| } |
| close(rs); |
| |
| if (deleteSet.size() <= 0) { |
| return; |
| } |
| |
| List<String> queries = new ArrayList<>(); |
| |
| StringBuilder prefix = new StringBuilder(); |
| StringBuilder suffix = new StringBuilder(); |
| |
| prefix.append("DELETE FROM \"COMPLETED_COMPACTIONS\" WHERE "); |
| suffix.append(""); |
| |
| List<String> questions = new ArrayList<>(deleteSet.size()); |
| for (int i = 0; i < deleteSet.size(); i++) { |
| questions.add("?"); |
| } |
| List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, |
| "\"CC_ID\"", false, false); |
| int totalCount = 0; |
| for (int i = 0; i < queries.size(); i++) { |
| String query = queries.get(i); |
| long insertCount = counts.get(i); |
| LOG.debug("Going to execute update <" + query + ">"); |
| pStmt = dbConn.prepareStatement(query); |
| for (int j = 0; j < insertCount; j++) { |
| pStmt.setLong(j + 1, deleteSet.get(totalCount + j)); |
| } |
| totalCount += insertCount; |
| int count = pStmt.executeUpdate(); |
| LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS"); |
| } |
| dbConn.commit(); |
| } catch (SQLException e) { |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "purgeCompactionHistory()"); |
| throw new MetaException("Unable to connect to transaction database " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| close(rs, stmt, dbConn); |
| closeStmt(pStmt); |
| } |
| } catch (RetryException ex) { |
| purgeCompactionHistory(); |
| } |
| } |
| /** |
| * this ensures that the number of failed compaction entries retained is > than number of failed |
| * compaction threshold which prevents new compactions from being scheduled. |
| */ |
| private int getFailedCompactionRetention() { |
| int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); |
| int failedRetention = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED); |
| if(failedRetention < failedThreshold) { |
| LOG.warn("Invalid configuration " + ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname() + |
| "=" + failedRetention + " < " + ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" + |
| failedRetention + ". Will use " + ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname() + |
| "=" + failedRetention); |
| failedRetention = failedThreshold; |
| } |
| return failedRetention; |
| } |
| /** |
| * Returns {@code true} if there already exists sufficient number of consecutive failures for |
| * this table/partition so that no new automatic compactions will be scheduled. |
| * User initiated compactions don't do this check. |
| * |
| * Do we allow compacting whole table (when it's partitioned)? No, though perhaps we should. |
| * That would be a meta operations, i.e. first find all partitions for this table (which have |
| * txn info) and schedule each compaction separately. This avoids complications in this logic. |
| */ |
| @Override |
| @RetrySemantics.ReadOnly |
| public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { |
| Connection dbConn = null; |
| PreparedStatement pStmt = null; |
| ResultSet rs = null; |
| try { |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| pStmt = dbConn.prepareStatement("SELECT \"CC_STATE\" FROM \"COMPLETED_COMPACTIONS\" WHERE " + |
| "\"CC_DATABASE\" = ? AND " + |
| "\"CC_TABLE\" = ? " + |
| (ci.partName != null ? "AND \"CC_PARTITION\" = ?" : "") + |
| " AND \"CC_STATE\" != " + quoteChar(ATTEMPTED_STATE) + " ORDER BY \"CC_ID\" DESC"); |
| pStmt.setString(1, ci.dbname); |
| pStmt.setString(2, ci.tableName); |
| if (ci.partName != null) { |
| pStmt.setString(3, ci.partName); |
| } |
| rs = pStmt.executeQuery(); |
| int numFailed = 0; |
| int numTotal = 0; |
| int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); |
| while(rs.next() && ++numTotal <= failedThreshold) { |
| if(rs.getString(1).charAt(0) == FAILED_STATE) { |
| numFailed++; |
| } |
| else { |
| numFailed--; |
| } |
| } |
| return numFailed == failedThreshold; |
| } |
| catch (SQLException e) { |
| LOG.error("Unable to check for failed compactions " + e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")"); |
| LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); |
| return false;//weren't able to check |
| } finally { |
| close(rs, pStmt, dbConn); |
| } |
| } catch (RetryException e) { |
| return checkFailedCompactions(ci); |
| } |
| } |
| /** |
| * If there is an entry in compaction_queue with ci.id, remove it |
| * Make entry in completed_compactions with status 'f'. |
| * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction, |
| * which we record as ATTEMPTED_STATE entry in history. |
| */ |
| @Override |
| @RetrySemantics.CannotRetry |
| public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw |
| //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure |
| try { |
| Connection dbConn = null; |
| Statement stmt = null; |
| PreparedStatement pStmt = null; |
| ResultSet rs = null; |
| // the error message related to the failure is wrapped inside CompactionInfo |
| // fetch this info, since ci will be reused in subsequent queries |
| String errorMessage = ci.errorMessage; |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " |
| + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", " |
| + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\" " |
| + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); |
| pStmt.setLong(1, ci.id); |
| rs = pStmt.executeQuery(); |
| if(rs.next()) { |
| ci = CompactionInfo.loadFullFromCompactionQueue(rs); |
| String s = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"; |
| pStmt = dbConn.prepareStatement(s); |
| pStmt.setLong(1, ci.id); |
| LOG.debug("Going to execute update <" + s + ">"); |
| int updCnt = pStmt.executeUpdate(); |
| } |
| else { |
| if(ci.id > 0) { |
| //the record with valid CQ_ID has disappeared - this is a sign of something wrong |
| throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE"); |
| } |
| } |
| if(ci.id == 0) { |
| //The failure occurred before we even made an entry in COMPACTION_QUEUE |
| //generate ID so that we can make an entry in COMPLETED_COMPACTIONS |
| ci.id = generateCompactionQueueId(stmt); |
| //mostly this indicates that the Initiator is paying attention to some table even though |
| //compactions are not happening. |
| ci.state = ATTEMPTED_STATE; |
| //this is not strictly accurate, but 'type' cannot be null. |
| if(ci.type == null) { ci.type = CompactionType.MINOR; } |
| ci.start = getDbTime(dbConn); |
| } |
| else { |
| ci.state = FAILED_STATE; |
| } |
| close(rs, stmt, null); |
| closeStmt(pStmt); |
| |
| pStmt = dbConn.prepareStatement("INSERT INTO \"COMPLETED_COMPACTIONS\" " |
| + "(\"CC_ID\", \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", " |
| + "\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", \"CC_START\", \"CC_END\", \"CC_RUN_AS\", " |
| + "\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", \"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\") " |
| + "VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); |
| if (errorMessage != null) { |
| ci.errorMessage = errorMessage; |
| } |
| CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); |
| int updCount = pStmt.executeUpdate(); |
| LOG.debug("Going to commit"); |
| closeStmt(pStmt); |
| dbConn.commit(); |
| } catch (SQLException e) { |
| LOG.warn("markFailed(" + ci.id + "):" + e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| try { |
| checkRetryable(dbConn, e, "markFailed(" + ci + ")"); |
| } |
| catch(MetaException ex) { |
| LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); |
| } |
| LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e); |
| } finally { |
| close(rs, stmt, null); |
| close(null, pStmt, dbConn); |
| } |
| } catch (RetryException e) { |
| markFailed(ci); |
| } |
| } |
| @Override |
| @RetrySemantics.Idempotent |
| public void setHadoopJobId(String hadoopJobId, long id) { |
| try { |
| Connection dbConn = null; |
| Statement stmt = null; |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HADOOP_JOB_ID\" = " + quoteString(hadoopJobId) |
| + " WHERE \"CQ_ID\" = " + id; |
| LOG.debug("Going to execute <" + s + ">"); |
| int updateCount = stmt.executeUpdate(s); |
| LOG.debug("Going to commit"); |
| closeStmt(stmt); |
| dbConn.commit(); |
| } catch (SQLException e) { |
| LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage()); |
| LOG.debug("Going to rollback"); |
| rollbackDBConn(dbConn); |
| try { |
| checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")"); |
| } |
| catch(MetaException ex) { |
| LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); |
| } |
| LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e); |
| } finally { |
| close(null, stmt, dbConn); |
| } |
| } catch (RetryException e) { |
| setHadoopJobId(hadoopJobId, id); |
| } |
| } |
| |
| @Override |
| @RetrySemantics.Idempotent |
| public long findMinOpenTxnIdForCleaner() throws MetaException{ |
| Connection dbConn = null; |
| Statement stmt = null; |
| ResultSet rs = null; |
| try { |
| try { |
| dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); |
| stmt = dbConn.createStatement(); |
| String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); |
| LOG.debug("Going to execute query <" + query + ">"); |
| rs = stmt.executeQuery(query); |
| if (!rs.next()) { |
| throw new MetaException("Transaction tables not properly initialized."); |
| } |
| long numOpenTxns = rs.getLong(1); |
| if (numOpenTxns > 0) { |
| query = "SELECT MIN(\"RES\".\"ID\") FROM (" + |
| "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + |
| " UNION " + |
| "SELECT MAX(\"CQ_NEXT_TXN_ID\") AS \"ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = " |
| + quoteChar(READY_FOR_CLEANING) + |
| ") \"RES\""; |
| } else { |
| query = "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\""; |
| } |
| LOG.debug("Going to execute query <" + query + ">"); |
| rs = stmt.executeQuery(query); |
| if (!rs.next()) { |
| throw new MetaException("Transaction tables not properly initialized, no record found in TXNS"); |
| } |
| return rs.getLong(1); |
| } catch (SQLException e) { |
| LOG.error("Unable to getMinOpenTxnIdForCleaner", e); |
| rollbackDBConn(dbConn); |
| checkRetryable(dbConn, e, "getMinOpenTxnForCleaner"); |
| throw new MetaException("Unable to execute getMinOpenTxnIfForCleaner() " + |
| StringUtils.stringifyException(e)); |
| } finally { |
| close(rs, stmt, dbConn); |
| } |
| } catch (RetryException e) { |
| return findMinOpenTxnIdForCleaner(); |
| } |
| } |
| } |
| |
| |