| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hive.ql.exec.repl; |
| |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.common.ValidTxnList; |
| import org.apache.hadoop.hive.common.ValidWriteIdList; |
| import org.apache.hadoop.hive.conf.Constants; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.ReplChangeManager; |
| import org.apache.hadoop.hive.metastore.TableType; |
| import org.apache.hadoop.hive.metastore.api.Database; |
| import org.apache.hadoop.hive.metastore.api.Function; |
| import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; |
| import org.apache.hadoop.hive.metastore.api.NotificationEvent; |
| import org.apache.hadoop.hive.metastore.api.SQLForeignKey; |
| import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; |
| import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; |
| import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; |
| import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; |
| import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; |
| import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; |
| import org.apache.hadoop.hive.metastore.api.TxnType; |
| import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; |
| import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter; |
| import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; |
| import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; |
| import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; |
| import org.apache.hadoop.hive.metastore.utils.SecurityUtils; |
| import org.apache.hadoop.hive.ql.ErrorMsg; |
| import org.apache.hadoop.hive.ql.exec.Task; |
| import org.apache.hadoop.hive.ql.exec.TaskFactory; |
| import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; |
| import org.apache.hadoop.hive.ql.exec.repl.util.FileList; |
| import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; |
| import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; |
| import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; |
| import org.apache.hadoop.hive.ql.exec.util.Retryable; |
| import org.apache.hadoop.hive.ql.io.AcidUtils; |
| import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; |
| import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; |
| import org.apache.hadoop.hive.ql.lockmgr.LockException; |
| import org.apache.hadoop.hive.ql.metadata.Hive; |
| import org.apache.hadoop.hive.ql.metadata.HiveException; |
| import org.apache.hadoop.hive.ql.metadata.InvalidTableException; |
| import org.apache.hadoop.hive.ql.metadata.Table; |
| import org.apache.hadoop.hive.ql.metadata.events.EventUtils; |
| import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; |
| import org.apache.hadoop.hive.ql.parse.EximUtil; |
| import org.apache.hadoop.hive.ql.parse.ReplicationSpec; |
| import org.apache.hadoop.hive.ql.parse.SemanticException; |
| import org.apache.hadoop.hive.ql.parse.repl.DumpType; |
| import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.io.ConstraintsSerializer; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector; |
| import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector; |
| import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; |
| import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; |
| import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; |
| import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; |
| import org.apache.hadoop.hive.ql.plan.api.StageType; |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.Serializable; |
| import java.io.UnsupportedEncodingException; |
| import java.nio.charset.Charset; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Set; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Base64; |
| import java.util.UUID; |
| import java.util.ArrayList; |
| import java.util.Map; |
| import java.util.HashMap; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT; |
| import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; |
| import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; |
| import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER; |
| |
| public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { |
| private static final long serialVersionUID = 1L; |
| private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; |
| private static final String FUNCTION_METADATA_FILE_NAME = EximUtil.METADATA_NAME; |
| private static final long SLEEP_TIME = 60000; |
| private Set<String> tablesForBootstrap = new HashSet<>(); |
| |
| public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_"); |
| private final String name; |
| private final String prefix; |
| ConstraintFileType(String name, String prefix) { |
| this.name = name; |
| this.prefix = prefix; |
| } |
| public String getName() { |
| return this.name; |
| } |
| |
| public String getPrefix() { |
| return prefix; |
| } |
| } |
| |
| private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); |
| private ReplLogger replLogger; |
| |
| @Override |
| public String getName() { |
| return "REPL_DUMP"; |
| } |
| |
| @Override |
| public int execute() { |
| try { |
| SecurityUtils.reloginExpiringKeytabUser(); |
| if (work.dataCopyIteratorsInitialized()) { |
| initiateDataCopyTasks(); |
| } else { |
| Path dumpRoot = ReplUtils.getEncodedDumpRootPath(conf, work.dbNameOrPattern.toLowerCase()); |
| if (ReplUtils.failedWithNonRecoverableError(ReplUtils.getLatestDumpPath(dumpRoot, conf), conf)) { |
| LOG.error("Previous dump failed with non recoverable error. Needs manual intervention. "); |
| setException(new SemanticException(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.format())); |
| return ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(); |
| } |
| Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); |
| boolean isBootstrap = (previousValidHiveDumpPath == null); |
| //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. |
| if (shouldDump(previousValidHiveDumpPath)) { |
| Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap); |
| Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); |
| work.setCurrentDumpPath(currentDumpPath); |
| work.setMetricCollector(initMetricCollection(isBootstrap, hiveDumpRoot)); |
| if (shouldDumpAtlasMetadata()) { |
| addAtlasDumpTask(isBootstrap, previousValidHiveDumpPath); |
| LOG.info("Added task to dump atlas metadata."); |
| } |
| if (shouldDumpAuthorizationMetadata()) { |
| initiateAuthorizationDumpTask(); |
| } |
| DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); |
| // Initialize ReplChangeManager instance since we will require it to encode file URI. |
| ReplChangeManager.getInstance(conf); |
| Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); |
| Long lastReplId; |
| LOG.info("Data copy at load enabled : {}", conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET)); |
| if (isBootstrap) { |
| lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, getHive()); |
| } else { |
| work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath)); |
| lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, getHive()); |
| } |
| work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); |
| initiateDataCopyTasks(); |
| } else { |
| LOG.info("Previous Dump is not yet loaded"); |
| } |
| } |
| } catch (Exception e) { |
| LOG.error("failed", e); |
| setException(e); |
| int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); |
| try { |
| if (errorCode > 40000) { |
| Path nonRecoverableMarker = new Path(work.getCurrentDumpPath(), |
| ReplAck.NON_RECOVERABLE_MARKER.toString()); |
| Utils.writeStackTrace(e, nonRecoverableMarker, conf); |
| work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); |
| } else { |
| work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); |
| } |
| } catch (Exception ex) { |
| LOG.error("Failed to collect Metrics", ex); |
| } |
| return errorCode; |
| } |
| return 0; |
| } |
| |
| private void initiateAuthorizationDumpTask() throws SemanticException { |
| if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) { |
| Path rangerDumpRoot = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_RANGER_BASE_DIR); |
| LOG.info("Exporting Authorization Metadata from {} at {} ", RANGER_AUTHORIZER, rangerDumpRoot); |
| RangerDumpWork rangerDumpWork = new RangerDumpWork(rangerDumpRoot, work.dbNameOrPattern, |
| work.getMetricCollector()); |
| Task<RangerDumpWork> rangerDumpTask = TaskFactory.get(rangerDumpWork, conf); |
| if (childTasks == null) { |
| childTasks = new ArrayList<>(); |
| } |
| childTasks.add(rangerDumpTask); |
| } else { |
| throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Authorizer " |
| + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE) |
| + " not supported for replication ", ReplUtils.REPL_RANGER_SERVICE)); |
| } |
| } |
| |
| private boolean shouldDumpAuthorizationMetadata() { |
| return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA); |
| } |
| |
| private boolean shouldDumpAtlasMetadata() { |
| return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA); |
| } |
| |
| private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws IOException { |
| Path lastDumpPath = ReplUtils.getLatestDumpPath(dumpRoot, conf); |
| if (lastDumpPath != null && shouldResumePreviousDump(lastDumpPath, isBootstrap)) { |
| //Resume previous dump |
| LOG.info("Resuming the dump with existing dump directory {}", lastDumpPath); |
| work.setShouldOverwrite(true); |
| return lastDumpPath; |
| } else { |
| return new Path(dumpRoot, getNextDumpDir()); |
| } |
| } |
| |
| private void initiateDataCopyTasks() throws SemanticException { |
| TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); |
| if (childTasks == null) { |
| childTasks = new ArrayList<>(); |
| } |
| childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf)); |
| childTasks.addAll(work.managedTableCopyTasks(taskTracker, conf)); |
| childTasks.addAll(work.functionsBinariesCopyTasks(taskTracker, conf)); |
| if (childTasks.isEmpty()) { |
| //All table data copy work finished. |
| finishRemainingTasks(); |
| } else { |
| DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); |
| } |
| } |
| |
| private void addAtlasDumpTask(boolean bootstrap, Path prevHiveDumpDir) { |
| Path atlasDumpDir = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_ATLAS_BASE_DIR); |
| Path prevAtlasDumpDir = prevHiveDumpDir == null ? null |
| : new Path(prevHiveDumpDir.getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); |
| AtlasDumpWork atlasDumpWork = new AtlasDumpWork(work.dbNameOrPattern, atlasDumpDir, bootstrap, prevAtlasDumpDir, |
| work.getMetricCollector()); |
| Task<?> atlasDumpTask = TaskFactory.get(atlasDumpWork, conf); |
| childTasks = new ArrayList<>(); |
| childTasks.add(atlasDumpTask); |
| } |
| |
| |
| private void finishRemainingTasks() throws SemanticException { |
| Path dumpAckFile = new Path(work.getCurrentDumpPath(), |
| ReplUtils.REPL_HIVE_BASE_DIR + File.separator |
| + ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); |
| Utils.create(dumpAckFile, conf); |
| prepareReturnValues(work.getResultValues()); |
| work.getMetricCollector().reportEnd(Status.SUCCESS); |
| deleteAllPreviousDumpMeta(work.getCurrentDumpPath()); |
| } |
| |
| private void prepareReturnValues(List<String> values) throws SemanticException { |
| LOG.debug("prepareReturnValues : " + dumpSchema); |
| for (String s : values) { |
| LOG.debug(" > " + s); |
| } |
| Utils.writeOutput(Collections.singletonList(values), new Path(work.resultTempPath), conf); |
| } |
| |
| private void deleteAllPreviousDumpMeta(Path currentDumpPath) { |
| try { |
| Path dumpRoot = getDumpRoot(currentDumpPath); |
| FileSystem fs = dumpRoot.getFileSystem(conf); |
| if (fs.exists(dumpRoot)) { |
| FileStatus[] statuses = fs.listStatus(dumpRoot, |
| path -> !path.equals(currentDumpPath) && !path.toUri().getPath().equals(currentDumpPath.toString())); |
| |
| int retainPrevDumpDirCount = conf.getIntVar(HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT); |
| int numDumpDirs = statuses.length; |
| if(shouldRetainPrevDumpDirs()) { |
| Arrays.sort(statuses, (Comparator.<FileStatus> |
| comparingLong(fileStatus1 -> fileStatus1.getModificationTime()) |
| .thenComparingLong(fileStatus2 -> fileStatus2.getModificationTime()))); |
| } |
| for (FileStatus status : statuses) { |
| //based on config, either delete all previous dump-dirs |
| //or delete a minimum number of oldest dump-directories |
| if(!shouldRetainPrevDumpDirs() || numDumpDirs > retainPrevDumpDirCount){ |
| fs.delete(status.getPath(), true); |
| numDumpDirs--; |
| |
| } |
| } |
| } |
| } catch (Exception ex) { |
| LOG.warn("Possible leak on disk, could not delete the previous dump directory:" + currentDumpPath, ex); |
| } |
| } |
| |
| private Path getDumpRoot(Path currentDumpPath) { |
| if (ReplDumpWork.testDeletePreviousDumpMetaPath |
| && (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) |
| || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL))) { |
| //testDeleteDumpMetaDumpPath to be used only for test. |
| return null; |
| } else { |
| return currentDumpPath.getParent(); |
| } |
| } |
| |
| private Long getEventFromPreviousDumpMetadata(Path previousDumpPath) throws SemanticException { |
| if (previousDumpPath != null) { |
| DumpMetaData dmd = new DumpMetaData(previousDumpPath, conf); |
| if (dmd.isIncrementalDump()) { |
| return dmd.getEventTo(); |
| } |
| //bootstrap case return event from |
| return dmd.getEventFrom(); |
| } |
| return 0L; |
| } |
| |
| private Path getPreviousValidDumpMetadataPath(Path dumpRoot) throws IOException { |
| FileStatus latestValidStatus = null; |
| FileSystem fs = dumpRoot.getFileSystem(conf); |
| if (fs.exists(dumpRoot)) { |
| FileStatus[] statuses = fs.listStatus(dumpRoot); |
| for (FileStatus status : statuses) { |
| LOG.info("Evaluating previous dump dir path:{}", status.getPath()); |
| if (latestValidStatus == null) { |
| latestValidStatus = validDump(status.getPath()) ? status : null; |
| } else if (validDump(status.getPath()) |
| && status.getModificationTime() > latestValidStatus.getModificationTime()) { |
| latestValidStatus = status; |
| } |
| } |
| } |
| Path latestDumpDir = (latestValidStatus == null) |
| ? null : new Path(latestValidStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); |
| LOG.info("Selecting latest valid dump dir as {}", (latestDumpDir == null) ? "null" : latestDumpDir.toString()); |
| return latestDumpDir; |
| } |
| |
| private boolean validDump(Path dumpDir) throws IOException { |
| //Check if it was a successful dump |
| if (dumpDir != null) { |
| FileSystem fs = dumpDir.getFileSystem(conf); |
| Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR); |
| return fs.exists(new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString())); |
| } |
| return false; |
| } |
| |
| private boolean shouldDump(Path previousDumpPath) throws IOException { |
| //If no previous dump means bootstrap. So return true as there was no |
| //previous dump to load |
| if (previousDumpPath == null) { |
| return true; |
| } else { |
| FileSystem fs = previousDumpPath.getFileSystem(conf); |
| return fs.exists(new Path(previousDumpPath, LOAD_ACKNOWLEDGEMENT.toString())); |
| } |
| } |
| |
| /** |
| * Decide whether to examine all the tables to dump. We do this if |
| * 1. External tables are going to be part of the dump : In which case we need to list their |
| * locations. |
| * 2. External or ACID tables are being bootstrapped for the first time : so that we can dump |
| * those tables as a whole. |
| * 3. If replication policy is changed/replaced, then need to examine all the tables to see if |
| * any of them need to be bootstrapped as old policy doesn't include it but new one does. |
| * 4. Some tables are renamed and the new name satisfies the table list filter while old name was not. |
| * @return true if need to examine tables for dump and false if not. |
| */ |
| private boolean shouldExamineTablesToDump() { |
| return (work.oldReplScope != null) |
| || !tablesForBootstrap.isEmpty() |
| || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) |
| || conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); |
| } |
| |
| /** |
| * Decide whether to dump external tables data. If external tables are enabled for replication, |
| * then need to dump it's data in all the incremental dumps. |
| * @return true if need to dump external table data and false if not. |
| */ |
| private boolean shouldDumpExternalTableLocation() { |
| return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) |
| && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) && |
| !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE)); |
| } |
| |
| /** |
| * Decide whether to dump external tables. |
| * @param tableName - Name of external table to be replicated |
| * @return true if need to bootstrap dump external table and false if not. |
| */ |
| private boolean shouldBootstrapDumpExternalTable(String tableName) { |
| // Note: If repl policy is replaced, then need to dump external tables if table is getting replicated |
| // for the first time in current dump. So, need to check if table is included in old policy. |
| return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) |
| && (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES) |
| || !ReplUtils.tableIncludedInReplScope(work.oldReplScope, tableName)); |
| } |
| |
| /** |
| * Decide whether to dump materialized views. |
| */ |
| private boolean isMaterializedViewsReplEnabled() { |
| return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_MATERIALIZED_VIEWS); |
| } |
| |
| /** |
| * Decide whether to retain previous dump-directories after repl-dump |
| */ |
| private boolean shouldRetainPrevDumpDirs() { |
| return conf.getBoolVar(HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR); |
| } |
| |
| /** |
| * Decide whether to dump ACID tables. |
| * @param tableName - Name of ACID table to be replicated |
| * @return true if need to bootstrap dump ACID table and false if not. |
| */ |
| private boolean shouldBootstrapDumpAcidTable(String tableName) { |
| // Note: If repl policy is replaced, then need to dump ACID tables if table is getting replicated |
| // for the first time in current dump. So, need to check if table is included in old policy. |
| return ReplUtils.includeAcidTableInDump(conf) |
| && (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) |
| || !ReplUtils.tableIncludedInReplScope(work.oldReplScope, tableName)); |
| } |
| |
| private boolean shouldBootstrapDumpTable(Table table) { |
| // Note: If control reaches here, it means, table is already included in new replication policy. |
| if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) |
| && shouldBootstrapDumpExternalTable(table.getTableName())) { |
| return true; |
| } |
| |
| if (AcidUtils.isTransactionalTable(table) |
| && shouldBootstrapDumpAcidTable(table.getTableName())) { |
| return true; |
| } |
| |
| // If the table is renamed and the new name satisfies the filter but the old name does not then the table needs to |
| // be bootstrapped. |
| if (tablesForBootstrap.contains(table.getTableName().toLowerCase())) { |
| return true; |
| } |
| |
| // If replication policy is changed with new included/excluded tables list, then tables which |
| // are not included in old policy but included in new policy should be bootstrapped along with |
| // the current incremental replication dump. |
| // Control reaches for Non-ACID tables. |
| return !ReplUtils.tableIncludedInReplScope(work.oldReplScope, table.getTableName()); |
| } |
| |
| private boolean isTableSatifiesConfig(Table table) { |
| if (table == null) { |
| return false; |
| } |
| |
| if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) |
| && !conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)) { |
| return false; |
| } |
| |
| if (AcidUtils.isTransactionalTable(table) |
| && !ReplUtils.includeAcidTableInDump(conf)) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { |
| Long lastReplId;// get list of events matching dbPattern & tblPattern |
| // go through each event, and dump out each event to a event-level dump dir inside dumproot |
| String validTxnList = null; |
| long waitUntilTime = 0; |
| long bootDumpBeginReplId = -1; |
| |
| List<String> tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); |
| |
| // If we are bootstrapping ACID tables, we need to perform steps similar to a regular |
| // bootstrap (See bootstrapDump() for more details. Only difference here is instead of |
| // waiting for the concurrent transactions to finish, we start dumping the incremental events |
| // and wait only for the remaining time if any. |
| if (needBootstrapAcidTablesDuringIncrementalDump()) { |
| bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); |
| assert (bootDumpBeginReplId >= 0); |
| LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {}", |
| work.dbNameOrPattern); |
| long timeoutInMs = HiveConf.getTimeVar(conf, |
| HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); |
| waitUntilTime = System.currentTimeMillis() + timeoutInMs; |
| } |
| // TODO : instead of simply restricting by message format, we should eventually |
| // move to a jdbc-driver-stype registering of message format, and picking message |
| // factory per event to decode. For now, however, since all messages have the |
| // same factory, restricting by message format is effectively a guard against |
| // older leftover data that would cause us problems. |
| work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId); |
| IMetaStoreClient.NotificationFilter evFilter = new AndFilter( |
| new ReplEventFilter(work.replScope), |
| new CatalogFilter(MetaStoreUtils.getDefaultCatalog(conf)), |
| new EventBoundaryFilter(work.eventFrom, work.eventTo)); |
| EventUtils.MSClientNotificationFetcher evFetcher |
| = new EventUtils.MSClientNotificationFetcher(hiveDb); |
| int maxEventLimit = getMaxEventAllowed(work.maxEventLimit()); |
| EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( |
| evFetcher, work.eventFrom, maxEventLimit, evFilter); |
| lastReplId = work.eventTo; |
| Path ackFile = new Path(dumpRoot, ReplAck.EVENTS_DUMP.toString()); |
| long resumeFrom = Utils.fileExists(ackFile, conf) ? getResumeFrom(ackFile) : work.eventFrom; |
| |
| // Right now the only pattern allowed to be specified is *, which matches all the database |
| // names. So passing dbname as is works since getDbNotificationEventsCount can exclude filter |
| // on database name when it's *. In future, if we support more elaborate patterns, we will |
| // have to pass DatabaseAndTableFilter created above to getDbNotificationEventsCount() to get |
| // correct event count. |
| String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) |
| ? work.dbNameOrPattern |
| : "?"; |
| long estimatedNumEvents = evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, |
| maxEventLimit); |
| replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), estimatedNumEvents, |
| work.eventFrom, work.eventTo, maxEventLimit); |
| replLogger.startLog(); |
| Map<String, Long> metricMap = new HashMap<>(); |
| metricMap.put(ReplUtils.MetricName.EVENTS.name(), estimatedNumEvents); |
| work.getMetricCollector().reportStageStart(getName(), metricMap); |
| long dumpedCount = resumeFrom - work.eventFrom; |
| if (dumpedCount > 0) { |
| LOG.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount); |
| } |
| cleanFailedEventDirIfExists(dumpRoot, resumeFrom); |
| while (evIter.hasNext()) { |
| NotificationEvent ev = evIter.next(); |
| lastReplId = ev.getEventId(); |
| if (ev.getEventId() <= resumeFrom) { |
| continue; |
| } |
| |
| //disable materialized-view replication if not configured |
| if(!isMaterializedViewsReplEnabled()){ |
| String tblName = ev.getTableName(); |
| if(tblName != null) { |
| try { |
| Table table = hiveDb.getTable(dbName, tblName); |
| if (table != null && TableType.MATERIALIZED_VIEW.equals(table.getTableType())){ |
| LOG.info("Attempt to dump materialized view : " + tblName); |
| continue; |
| } |
| } catch (InvalidTableException te) { |
| LOG.debug(te.getMessage()); |
| } |
| } |
| } |
| |
| Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); |
| dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); |
| Utils.writeOutput(String.valueOf(lastReplId), ackFile, conf); |
| } |
| replLogger.endLog(lastReplId.toString()); |
| LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); |
| long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); |
| dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId); |
| // If repl policy is changed (oldReplScope is set), then pass the current replication policy, |
| // so that REPL LOAD would drop the tables which are not included in current policy. |
| if (work.oldReplScope != null) { |
| dmd.setReplScope(work.replScope); |
| } |
| dmd.write(true); |
| int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); |
| try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); |
| FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) { |
| // Examine all the tables if required. |
| if (shouldExamineTablesToDump() || (tableList != null)) { |
| // If required wait more for any transactions open at the time of starting the ACID bootstrap. |
| if (needBootstrapAcidTablesDuringIncrementalDump()) { |
| assert (waitUntilTime > 0); |
| validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); |
| } |
| /* When same dump dir is resumed because of check-pointing, we need to clear the existing metadata. |
| We need to rewrite the metadata as the write id list will be changed. |
| We can't reuse the previous write id as it might be invalid due to compaction. */ |
| Path bootstrapRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); |
| Path metadataPath = new Path(bootstrapRoot, EximUtil.METADATA_PATH_NAME); |
| FileSystem fs = FileSystem.get(metadataPath.toUri(), conf); |
| try { |
| fs.delete(metadataPath, true); |
| } catch (FileNotFoundException e) { |
| // no worries |
| } |
| Path dbRootMetadata = new Path(metadataPath, dbName); |
| Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName); |
| boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); |
| try (Writer writer = new Writer(dumpRoot, conf)) { |
| for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { |
| try { |
| Table table = hiveDb.getTable(dbName, tableName); |
| |
| // Dump external table locations if required. |
| if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) |
| && shouldDumpExternalTableLocation()) { |
| writer.dataLocationDump(table, extTableFileList, conf); |
| } |
| |
| // Dump the table to be bootstrapped if required. |
| if (shouldBootstrapDumpTable(table)) { |
| HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table); |
| dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, |
| hiveDb, tableTuple, managedTblList, dataCopyAtLoad); |
| } |
| if (tableList != null && isTableSatifiesConfig(table)) { |
| tableList.add(tableName); |
| } |
| } catch (InvalidTableException te) { |
| // Repl dump shouldn't fail if the table is dropped/renamed while dumping it. |
| // Just log a debug message and skip it. |
| LOG.debug(te.getMessage()); |
| } |
| } |
| } |
| dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); |
| } |
| setDataCopyIterators(extTableFileList, managedTblList); |
| work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId); |
| return lastReplId; |
| } |
| } |
| |
| private void setDataCopyIterators(FileList extTableFileList, FileList managedTableFileList) { |
| boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); |
| if (dataCopyAtLoad) { |
| work.setManagedTableCopyPathIterator(Collections.<String>emptyList().iterator()); |
| work.setExternalTblCopyPathIterator(Collections.<String>emptyList().iterator()); |
| LOG.info("Deferring table/partition data copy during dump. It should be done at load."); |
| } else { |
| work.setManagedTableCopyPathIterator(managedTableFileList); |
| work.setExternalTblCopyPathIterator(extTableFileList); |
| } |
| } |
| |
| private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot) { |
| ReplicationMetricCollector collector; |
| if (isBootstrap) { |
| collector = new BootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf); |
| } else { |
| collector = new IncrementalDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf); |
| } |
| return collector; |
| } |
| |
| private int getMaxEventAllowed(int currentEventMaxLimit) { |
| int maxDirItems = Integer.parseInt(conf.get(ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG, "0")); |
| if (maxDirItems > 0) { |
| maxDirItems = maxDirItems - ReplUtils.RESERVED_DIR_ITEMS_COUNT; |
| if (maxDirItems < currentEventMaxLimit) { |
| LOG.warn("Changing the maxEventLimit from {} to {} as the '" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG |
| + "' limit encountered. Set this config appropriately to increase the maxEventLimit", |
| currentEventMaxLimit, maxDirItems); |
| currentEventMaxLimit = maxDirItems; |
| } |
| } |
| return currentEventMaxLimit; |
| } |
| |
| private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws SemanticException { |
| Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1)); |
| Retryable retryable = Retryable.builder() |
| .withHiveConf(conf) |
| .withRetryOnException(IOException.class).build(); |
| try { |
| retryable.executeCallable((Callable<Void>) () -> { |
| FileSystem fs = FileSystem.get(nextEventRoot.toUri(), conf); |
| try { |
| fs.delete(nextEventRoot, true); |
| } catch (FileNotFoundException e) { |
| // no worries |
| } |
| return null; |
| }); |
| } catch (Exception e) { |
| throw new SemanticException(e); |
| } |
| } |
| |
| private long getResumeFrom(Path ackFile) throws SemanticException { |
| Retryable retryable = Retryable.builder() |
| .withHiveConf(conf) |
| .withRetryOnException(Exception.class).build(); |
| try { |
| return retryable.executeCallable(() -> { |
| BufferedReader br = null; |
| try { |
| FileSystem fs = ackFile.getFileSystem(conf); |
| br = new BufferedReader(new InputStreamReader(fs.open(ackFile), Charset.defaultCharset())); |
| long lastEventID = Long.parseLong(br.readLine()); |
| return lastEventID; |
| } finally { |
| if (br != null) { |
| try { |
| br.close(); |
| } catch (Exception e) { |
| //Do nothing |
| } |
| } |
| } |
| }); |
| } catch (Exception e) { |
| throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); |
| } |
| } |
| |
| private boolean needBootstrapAcidTablesDuringIncrementalDump() { |
| // If acid table dump is not enabled, then no neeed to check further. |
| if (!ReplUtils.includeAcidTableInDump(conf)) { |
| return false; |
| } |
| |
| // If old table level policy is available or the policy has filter based on table name then it is possible that some |
| // of the ACID tables might be included for bootstrap during incremental dump. For old policy, its because the table |
| // may not satisfying the old policy but satisfying the new policy. For filter, it may happen that the table |
| // is renamed and started satisfying the policy. |
| return ((!work.replScope.includeAllTables()) |
| || (work.oldReplScope != null) |
| || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)); |
| } |
| |
| private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cmRoot, Hive db) throws Exception { |
| EventHandler.Context context = new EventHandler.Context( |
| evRoot, |
| dumpRoot, |
| cmRoot, |
| db, |
| conf, |
| getNewEventOnlyReplicationSpec(ev.getEventId()), |
| work.replScope, |
| work.oldReplScope, |
| tablesForBootstrap |
| ); |
| EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); |
| eventHandler.handle(context); |
| work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.EVENTS.name(), 1); |
| replLogger.eventLog(String.valueOf(ev.getEventId()), eventHandler.dumpType().toString()); |
| } |
| |
| private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) { |
| ReplicationSpec rspec = |
| getNewReplicationSpec(eventId.toString(), eventId.toString(), conf.getBoolean( |
| HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, false)); |
| rspec.setReplSpecType(ReplicationSpec.Type.INCREMENTAL_DUMP); |
| return rspec; |
| } |
| |
| private void dumpTableListToDumpLocation(List<String> tableList, Path dbRoot, String dbName, |
| HiveConf hiveConf) throws Exception { |
| // Empty list will create an empty file to distinguish it from db level replication. If no file is there, that means |
| // db level replication. If empty file is there, means no table satisfies the policy. |
| if (tableList == null) { |
| LOG.debug("Table list file is not created for db level replication."); |
| return; |
| } |
| |
| // The table list is dumped in _tables/dbname file |
| Retryable retryable = Retryable.builder() |
| .withHiveConf(conf) |
| .withRetryOnException(IOException.class).build(); |
| try { |
| retryable.executeCallable((Callable<Void>) () -> { |
| Path tableListFile = new Path(dbRoot, ReplUtils.REPL_TABLE_LIST_DIR_NAME); |
| tableListFile = new Path(tableListFile, dbName.toLowerCase()); |
| FSDataOutputStream writer = FileSystem.get(hiveConf).create(tableListFile); |
| for (String tableName : tableList) { |
| String line = tableName.toLowerCase().concat("\n"); |
| writer.write(line.getBytes(StandardCharsets.UTF_8)); |
| } |
| // Close is called explicitly as close also calls the actual file system write, |
| // so there is chance of i/o exception thrown by close. |
| writer.close(); |
| LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); |
| return null; |
| }); |
| } catch (Exception e) { |
| FileSystem.closeAllForUGI(org.apache.hadoop.hive.shims.Utils.getUGI()); |
| throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); |
| } |
| } |
| |
| Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) |
| throws Exception { |
| // bootstrap case |
| // Last repl id would've been captured during compile phase in queryState configs before opening txn. |
| // This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from |
| // concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore. |
| Long bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); |
| assert (bootDumpBeginReplId >= 0L); |
| List<String> tableList; |
| |
| LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); |
| long timeoutInMs = HiveConf.getTimeVar(conf, |
| HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); |
| long waitUntilTime = System.currentTimeMillis() + timeoutInMs; |
| String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); |
| Path metadataPath = new Path(dumpRoot, EximUtil.METADATA_PATH_NAME); |
| if (shouldResumePreviousDump(dmd)) { |
| //clear the metadata. We need to rewrite the metadata as the write id list will be changed |
| //We can't reuse the previous write id as it might be invalid due to compaction |
| metadataPath.getFileSystem(conf).delete(metadataPath, true); |
| } |
| List<EximUtil.DataCopyPath> functionsBinaryCopyPaths = Collections.emptyList(); |
| int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); |
| try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); |
| FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) { |
| for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { |
| LOG.debug("Dumping db: " + dbName); |
| // TODO : Currently we don't support separate table list for each database. |
| tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); |
| Database db = hiveDb.getDatabase(dbName); |
| if ((db != null) && (ReplUtils.isFirstIncPending(db.getParameters()))) { |
| // For replicated (target) database, until after first successful incremental load, the database will not be |
| // in a consistent state. Avoid allowing replicating this database to a new target. |
| throw new HiveException("Replication dump not allowed for replicated database" + |
| " with first incremental dump pending : " + dbName); |
| } |
| int estimatedNumTables = Utils.getAllTables(hiveDb, dbName, work.replScope).size(); |
| int estimatedNumFunctions = hiveDb.getAllFunctions().size(); |
| replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), |
| estimatedNumTables, |
| estimatedNumFunctions); |
| replLogger.startLog(); |
| Map<String, Long> metricMap = new HashMap<>(); |
| metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) estimatedNumTables); |
| metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) estimatedNumFunctions); |
| work.getMetricCollector().reportStageStart(getName(), metricMap); |
| Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb); |
| Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName); |
| boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); |
| functionsBinaryCopyPaths = dumpFunctionMetadata(dbName, dbRoot, dbDataRoot, hiveDb, dataCopyAtLoad); |
| |
| String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); |
| Exception caught = null; |
| try (Writer writer = new Writer(dbRoot, conf)) { |
| for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { |
| Table table = null; |
| try { |
| HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf); |
| table = tableTuple != null ? tableTuple.object : null; |
| |
| //disable materialized-view replication if not configured |
| if(tableTuple != null && !isMaterializedViewsReplEnabled() |
| && TableType.MATERIALIZED_VIEW.equals(tableTuple.object.getTableType())){ |
| LOG.info("Attempt to dump materialized view : " + tblName); |
| continue; |
| } |
| |
| LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); |
| if (shouldDumpExternalTableLocation() |
| && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { |
| LOG.debug("Adding table {} to external tables list", tblName); |
| writer.dataLocationDump(tableTuple.object, extTableFileList, conf); |
| } |
| dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot, |
| bootDumpBeginReplId, |
| hiveDb, tableTuple, managedTblList, dataCopyAtLoad); |
| } catch (InvalidTableException te) { |
| // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. |
| // Just log a debug message and skip it. |
| LOG.debug(te.getMessage()); |
| } |
| dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb); |
| if (tableList != null && isTableSatifiesConfig(table)) { |
| tableList.add(tblName); |
| } |
| } |
| dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); |
| } catch (Exception e) { |
| caught = e; |
| } finally { |
| try { |
| Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey); |
| } catch (Exception e) { |
| if (caught == null) { |
| throw e; |
| } else { |
| LOG.error("failed to reset the db state for " + uniqueKey |
| + " on failure of repl dump", e); |
| throw caught; |
| } |
| } |
| if (caught != null) { |
| throw caught; |
| } |
| } |
| replLogger.endLog(bootDumpBeginReplId.toString()); |
| work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId); |
| } |
| Long bootDumpEndReplId = currentNotificationId(hiveDb); |
| LOG.info("Preparing to return {},{}->{}", |
| dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); |
| long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); |
| dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId); |
| dmd.write(true); |
| work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator()); |
| setDataCopyIterators(extTableFileList, managedTblList); |
| return bootDumpBeginReplId; |
| } |
| } |
| |
| private FileList createTableFileList(Path dumpRoot, String fileName, int cacheSize) { |
| Path backingFile = new Path(dumpRoot, fileName); |
| return new FileList(backingFile, cacheSize, conf); |
| } |
| |
| |
| private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) { |
| try { |
| return dumpMetaData.getEventFrom() != null; |
| } catch (Exception e) { |
| LOG.info("No previous dump present"); |
| return false; |
| } |
| } |
| |
| private boolean shouldResumePreviousDump(Path lastDumpPath, boolean isBootStrap) throws IOException { |
| if (validDump(lastDumpPath)) { |
| return false; |
| } |
| Path hiveDumpPath = new Path(lastDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); |
| if (isBootStrap) { |
| return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf)); |
| } |
| // In case of incremental we should resume if _events_dump file is present and is valid |
| Path lastEventFile = new Path(hiveDumpPath, ReplAck.EVENTS_DUMP.toString()); |
| long resumeFrom = 0; |
| try { |
| resumeFrom = getResumeFrom(lastEventFile); |
| } catch (SemanticException ex) { |
| LOG.info("Could not get last repl id from {}, because of:", lastEventFile, ex.getMessage()); |
| } |
| return resumeFrom > 0L; |
| } |
| |
| long currentNotificationId(Hive hiveDb) throws TException { |
| return hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); |
| } |
| |
| Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) throws Exception { |
| // TODO : instantiating FS objects are generally costly. Refactor |
| Path dbRoot = new Path(metadataRoot, dbName); |
| FileSystem fs = dbRoot.getFileSystem(conf); |
| Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); |
| HiveWrapper.Tuple<Database> database = new HiveWrapper(hiveDb, dbName, lastReplId).database(); |
| EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); |
| return dbRoot; |
| } |
| |
| void dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata, |
| Path dbRootData, long lastReplId, Hive hiveDb, |
| HiveWrapper.Tuple<Table> tuple, FileList managedTbleList, boolean dataCopyAtLoad) |
| throws Exception { |
| LOG.info("Bootstrap Dump for table " + tblName); |
| TableSpec tableSpec = new TableSpec(tuple.object); |
| TableExport.Paths exportPaths = |
| new TableExport.Paths(work.astRepresentationForErrorMsg, dbRootMetadata, dbRootData, tblName, conf, true); |
| String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); |
| tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false |
| if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) { |
| tuple.replicationSpec.setValidTxnList(validTxnList); |
| tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList)); |
| |
| // For transactional table, data would be valid snapshot for current txn and doesn't include data |
| // added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table |
| // as bootstrap dump's last repl Id. |
| tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId)); |
| } |
| MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); |
| tuple.replicationSpec.setRepl(true); |
| new TableExport(exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write( |
| false, managedTbleList, dataCopyAtLoad); |
| work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.TABLES.name(), 1); |
| replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); |
| } |
| |
| private boolean dataCopyRequired(TableSpec tableSpec) { |
| if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE) |
| || Utils.shouldDumpMetaDataOnly(conf)) { |
| return false; |
| } |
| return true; |
| } |
| |
| private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException { |
| if ((validTxnString == null) || validTxnString.isEmpty()) { |
| return null; |
| } |
| String fullTableName = AcidUtils.getFullTableName(dbName, tblName); |
| ValidWriteIdList validWriteIds = getTxnMgr() |
| .getValidWriteIds(Collections.singletonList(fullTableName), validTxnString) |
| .getTableValidWriteIdList(fullTableName); |
| return ((validWriteIds != null) ? validWriteIds.toString() : null); |
| } |
| |
| private List<Long> getOpenTxns(ValidTxnList validTxnList) { |
| long[] invalidTxns = validTxnList.getInvalidTransactions(); |
| List<Long> openTxns = new ArrayList<>(); |
| for (long invalidTxn : invalidTxns) { |
| if (!validTxnList.isTxnAborted(invalidTxn)) { |
| openTxns.add(invalidTxn); |
| } |
| } |
| return openTxns; |
| } |
| |
| List<Long> getOpenTxns(ValidTxnList validTxnList, String dbName) throws LockException { |
| HiveLockManager lockManager = getTxnMgr().getLockManager(); |
| long[] invalidTxns = validTxnList.getInvalidTransactions(); |
| List<Long> openTxns = new ArrayList<>(); |
| Set<Long> dbTxns = new HashSet<>(); |
| if (lockManager instanceof DbLockManager) { |
| ShowLocksRequest request = new ShowLocksRequest(); |
| request.setDbname(dbName.toLowerCase()); |
| ShowLocksResponse showLocksResponse = ((DbLockManager)lockManager).getLocks(request); |
| for (ShowLocksResponseElement showLocksResponseElement : showLocksResponse.getLocks()) { |
| dbTxns.add(showLocksResponseElement.getTxnid()); |
| } |
| for (long invalidTxn : invalidTxns) { |
| if (dbTxns.contains(invalidTxn) && !validTxnList.isTxnAborted(invalidTxn)) { |
| openTxns.add(invalidTxn); |
| } |
| } |
| } else { |
| for (long invalidTxn : invalidTxns) { |
| if (!validTxnList.isTxnAborted(invalidTxn)) { |
| openTxns.add(invalidTxn); |
| } |
| } |
| } |
| return openTxns; |
| } |
| |
| // Get list of valid transactions for Repl Dump. Also wait for a given amount of time for the |
| // open transactions to finish. Abort any open transactions after the wait is over. |
| String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveException { |
| // Key design point for REPL DUMP is to not have any txns older than current txn in which |
| // dump runs. This is needed to ensure that Repl dump doesn't copy any data files written by |
| // any open txns mainly for streaming ingest case where one delta file shall have data from |
| // committed/aborted/open txns. It may also have data inconsistency if the on-going txns |
| // doesn't have corresponding open/write events captured which means, catch-up incremental |
| // phase won't be able to replicate those txns. So, the logic is to wait for the given amount |
| // of time to see if all open txns < current txn is getting aborted/committed. If not, then |
| // we forcefully abort those txns just like AcidHouseKeeperService. |
| //Exclude readonly and repl created tranasactions |
| List<TxnType> excludedTxns = Arrays.asList(TxnType.READ_ONLY, TxnType.REPL_CREATED); |
| ValidTxnList validTxnList = getTxnMgr().getValidTxns(excludedTxns); |
| while (System.currentTimeMillis() < waitUntilTime) { |
| // If there are no txns which are open for the given ValidTxnList snapshot, then just return it. |
| if (getOpenTxns(validTxnList).isEmpty()) { |
| return validTxnList.toString(); |
| } |
| |
| // Wait for 1 minute and check again. |
| try { |
| Thread.sleep(SLEEP_TIME); |
| } catch (InterruptedException e) { |
| LOG.info("REPL DUMP thread sleep interrupted", e); |
| } |
| validTxnList = getTxnMgr().getValidTxns(excludedTxns); |
| } |
| |
| // After the timeout just force abort the open txns |
| if (conf.getBoolVar(REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT)) { |
| List<Long> openTxns = getOpenTxns(validTxnList, work.dbNameOrPattern); |
| if (!openTxns.isEmpty()) { |
| //abort only write transactions for the db under replication if abort transactions is enabled. |
| hiveDb.abortTransactions(openTxns); |
| validTxnList = getTxnMgr().getValidTxns(excludedTxns); |
| openTxns = getOpenTxns(validTxnList, work.dbNameOrPattern); |
| if (!openTxns.isEmpty()) { |
| LOG.warn("REPL DUMP unable to force abort all the open txns: {} after timeout due to unknown reasons. " + |
| "However, this is rare case that shouldn't happen.", openTxns); |
| throw new IllegalStateException("REPL DUMP triggered abort txns failed for unknown reasons."); |
| } |
| } |
| } else { |
| LOG.warn("Force abort all the open txns is disabled after timeout"); |
| throw new IllegalStateException("REPL DUMP cannot proceed. Force abort all the open txns is disabled. Enable " + |
| "hive.repl.bootstrap.dump.abort.write.txn.after.timeout to proceed."); |
| } |
| return validTxnList.toString(); |
| } |
| |
| private ReplicationSpec getNewReplicationSpec(String evState, String objState, |
| boolean isMetadataOnly) { |
| return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true); |
| } |
| |
| private String getNextDumpDir() { |
| if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { |
| // make it easy to write .q unit tests, instead of unique id generation. |
| // however, this does mean that in writing tests, we have to be aware that |
| // repl dump will clash with prior dumps, and thus have to clean up properly. |
| String nextDump = ReplDumpWork.getInjectNextDumpDirForTest(); |
| if (nextDump == null) { |
| return "next"; |
| } else { |
| return nextDump; |
| } |
| } else { |
| return UUID.randomUUID().toString(); |
| // TODO: time good enough for now - we'll likely improve this. |
| // We may also work in something the equivalent of pid, thrid and move to nanos to ensure |
| // uniqueness. |
| } |
| } |
| |
| List<EximUtil.DataCopyPath> dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Path dbDataRoot, |
| Hive hiveDb, boolean copyAtLoad) throws Exception { |
| List<EximUtil.DataCopyPath> functionsBinaryCopyPaths = new ArrayList<>(); |
| Path functionsMetaRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); |
| Path functionsDataRoot = new Path(dbDataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); |
| List<String> functionNames = hiveDb.getFunctions(dbName, "*"); |
| for (String functionName : functionNames) { |
| HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName, hiveDb); |
| if (tuple == null) { |
| continue; |
| } |
| Path functionMetaRoot = new Path(functionsMetaRoot, functionName); |
| Path functionMetadataFile = new Path(functionMetaRoot, FUNCTION_METADATA_FILE_NAME); |
| Path functionDataRoot = new Path(functionsDataRoot, functionName); |
| try (JsonWriter jsonWriter = |
| new JsonWriter(functionMetadataFile.getFileSystem(conf), functionMetadataFile)) { |
| FunctionSerializer serializer = new FunctionSerializer(tuple.object, functionDataRoot, copyAtLoad, conf); |
| serializer.writeTo(jsonWriter, tuple.replicationSpec); |
| functionsBinaryCopyPaths.addAll(serializer.getFunctionBinaryCopyPaths()); |
| } |
| work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.FUNCTIONS.name(), 1); |
| replLogger.functionLog(functionName); |
| } |
| return functionsBinaryCopyPaths; |
| } |
| |
| void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) throws Exception { |
| try { |
| Path constraintsRoot = new Path(dbRoot, ReplUtils.CONSTRAINTS_ROOT_DIR_NAME); |
| Path commonConstraintsFile = new Path(constraintsRoot, ConstraintFileType.COMMON.getPrefix() + tblName); |
| Path fkConstraintsFile = new Path(constraintsRoot, ConstraintFileType.FOREIGNKEY.getPrefix() + tblName); |
| List<SQLPrimaryKey> pks = hiveDb.getPrimaryKeyList(dbName, tblName); |
| List<SQLForeignKey> fks = hiveDb.getForeignKeyList(dbName, tblName); |
| List<SQLUniqueConstraint> uks = hiveDb.getUniqueConstraintList(dbName, tblName); |
| List<SQLNotNullConstraint> nns = hiveDb.getNotNullConstraintList(dbName, tblName); |
| if ((pks != null && !pks.isEmpty()) || (uks != null && !uks.isEmpty()) |
| || (nns != null && !nns.isEmpty())) { |
| try (JsonWriter jsonWriter = |
| new JsonWriter(commonConstraintsFile.getFileSystem(conf), commonConstraintsFile)) { |
| ConstraintsSerializer serializer = new ConstraintsSerializer(pks, null, uks, nns, conf); |
| serializer.writeTo(jsonWriter, null); |
| } |
| } |
| if (fks != null && !fks.isEmpty()) { |
| try (JsonWriter jsonWriter = |
| new JsonWriter(fkConstraintsFile.getFileSystem(conf), fkConstraintsFile)) { |
| ConstraintsSerializer serializer = new ConstraintsSerializer(null, fks, null, null, conf); |
| serializer.writeTo(jsonWriter, null); |
| } |
| } |
| } catch (NoSuchObjectException e) { |
| // Bootstrap constraint dump shouldn't fail if the table is dropped/renamed while dumping it. |
| // Just log a debug message and skip it. |
| LOG.debug(e.getMessage()); |
| } |
| } |
| |
| private HiveWrapper.Tuple<Function> functionTuple(String functionName, String dbName, Hive hiveDb) { |
| try { |
| HiveWrapper.Tuple<Function> tuple = new HiveWrapper(hiveDb, dbName).function(functionName); |
| if (tuple.object.getResourceUris().isEmpty()) { |
| LOG.warn("Not replicating function: " + functionName + " as it seems to have been created " |
| + "without USING clause"); |
| return null; |
| } |
| return tuple; |
| } catch (HiveException e) { |
| //This can happen as we are querying the getFunctions before we are getting the actual function |
| //in between there can be a drop function by a user in which case our call will fail. |
| LOG.info("Function " + functionName |
| + " could not be found, we are ignoring it as it can be a valid state ", e); |
| return null; |
| } |
| } |
| |
| @Override |
| public StageType getType() { |
| return StageType.REPL_DUMP; |
| } |
| |
| @Override |
| public boolean canExecuteInParallel() { |
| return false; |
| } |
| } |