blob: 6265df1de70d7666d3a8ebdaf97fde320a677c9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec.repl;
import com.google.common.collect.Collections2;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc;
import org.apache.hadoop.hive.ql.ddl.view.create.CreateViewDesc;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.FSTableEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.HiveTableName;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
private static final long serialVersionUID = 1L;
private final static int ZERO_TASKS = 0;
@Override
public String getName() {
return (work.isIncrementalLoad() ? "REPL_INCREMENTAL_LOAD" : "REPL_BOOTSTRAP_LOAD");
}
@Override
public StageType getType() {
return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : StageType.REPL_BOOTSTRAP_LOAD;
}
/**
* Provides the root Tasks created as a result of this loadTask run which will be executed
* by the driver. It does not track details across multiple runs of LoadTask.
*/
private static class Scope {
boolean database = false, table = false;
List<Task<?>> rootTasks = new ArrayList<>();
}
@Override
public int execute() {
try {
SecurityUtils.reloginExpiringKeytabUser();
Task<?> rootTask = work.getRootTask();
if (rootTask != null) {
rootTask.setChildTasks(null);
}
work.setRootTask(this);
this.parentTasks = null;
if (shouldLoadAtlasMetadata()) {
addAtlasLoadTask();
}
if (shouldLoadAuthorizationMetadata()) {
initiateAuthorizationLoadTask();
}
LOG.info("Data copy at load enabled : {}", conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET));
if (work.isIncrementalLoad()) {
return executeIncrementalLoad();
} else {
return executeBootStrapLoad();
}
} catch (RuntimeException e) {
LOG.error("replication failed with run time exception", e);
try {
work.getMetricCollector().reportEnd(Status.FAILED);
} catch (SemanticException ex) {
LOG.error("Failed to collect Metrics ", ex);
}
throw e;
} catch (Exception e) {
LOG.error("replication failed", e);
setException(e);
int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
try {
if (errorCode > 40000) {
Path nonRecoverableMarker = new Path(new Path(work.dumpDirectory).getParent(),
ReplAck.NON_RECOVERABLE_MARKER.toString());
Utils.writeStackTrace(e, nonRecoverableMarker, conf);
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString());
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
}
}
private boolean shouldLoadAuthorizationMetadata() {
return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA);
}
private void initiateAuthorizationLoadTask() throws SemanticException {
if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) {
Path rangerLoadRoot = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_RANGER_BASE_DIR);
LOG.info("Adding Import Ranger Metadata Task from {} ", rangerLoadRoot);
RangerLoadWork rangerLoadWork = new RangerLoadWork(rangerLoadRoot, work.getSourceDbName(), work.dbNameToLoadIn,
work.getMetricCollector());
Task<RangerLoadWork> rangerLoadTask = TaskFactory.get(rangerLoadWork, conf);
if (childTasks == null) {
childTasks = new ArrayList<>();
}
childTasks.add(rangerLoadTask);
} else {
throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Authorizer " +
conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
+ " not supported for replication ", ReplUtils.REPL_RANGER_SERVICE));
}
}
private void addAtlasLoadTask() throws HiveException {
Path atlasDumpDir = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_ATLAS_BASE_DIR);
LOG.info("Adding task to load Atlas metadata from {} ", atlasDumpDir);
AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), work.dbNameToLoadIn, atlasDumpDir,
work.getMetricCollector());
Task<?> atlasLoadTask = TaskFactory.get(atlasLoadWork, conf);
if (childTasks == null) {
childTasks = new ArrayList<>();
}
childTasks.add(atlasLoadTask);
}
private boolean shouldLoadAtlasMetadata() {
return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA);
}
private int executeBootStrapLoad() throws Exception {
int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
Context loadContext = new Context(work.dumpDirectory, conf, getHive(),
work.sessionStateLineageState, context);
TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
addLazyDataCopyTask(loadTaskTracker);
/*
for now for simplicity we are doing just one directory ( one database ), come back to use
of multiple databases once we have the basic flow to chain creating of tasks in place for
a database ( directory )
*/
BootstrapEventsIterator iterator = work.bootstrapIterator();
ConstraintEventsIterator constraintIterator = work.constraintsIterator();
/*
This is used to get hold of a reference during the current creation of tasks and is initialized
with "0" tasks such that it will be non consequential in any operations done with task tracker
compositions.
*/
TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
Scope scope = new Scope();
boolean loadingConstraint = false;
if (!iterator.hasNext() && constraintIterator.hasNext()) {
loadingConstraint = true;
}
while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()))
&& loadTaskTracker.canAddMoreTasks()) {
BootstrapEvent next;
if (!loadingConstraint) {
next = iterator.next();
} else {
next = constraintIterator.next();
}
switch (next.eventType()) {
case Database:
DatabaseEvent dbEvent = (DatabaseEvent) next;
dbTracker = new LoadDatabase(loadContext, dbEvent, work.dbNameToLoadIn, loadTaskTracker).tasks();
loadTaskTracker.update(dbTracker);
if (work.hasDbState()) {
loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope));
} else {
// Scope might have set to database in some previous iteration of loop, so reset it to false if database
// tracker has no tasks.
scope.database = false;
}
work.updateDbEventState(dbEvent.toState());
if (dbTracker.hasTasks()) {
scope.rootTasks.addAll(dbTracker.tasks());
scope.database = true;
}
dbTracker.debugLog("database");
break;
case Table:
/*
Implicit assumption here is that database level is processed first before table level,
which will depend on the iterator used since it should provide the higher level directory
listing before providing the lower level listing. This is also required such that
the dbTracker / tableTracker are setup correctly always.
*/
TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn);
FSTableEvent tableEvent = (FSTableEvent) next;
if (TableType.VIRTUAL_VIEW.name().equals(tableEvent.getMetaData().getTable().getTableType())) {
tableTracker = new TaskTracker(1);
tableTracker.addTask(createViewTask(tableEvent.getMetaData(), work.dbNameToLoadIn, conf));
} else {
LoadTable loadTable = new LoadTable(tableEvent, loadContext, iterator.replLogger(), tableContext,
loadTaskTracker, work.getMetricCollector());
tableTracker = loadTable.tasks(work.isIncrementalLoad());
}
setUpDependencies(dbTracker, tableTracker);
if (!scope.database && tableTracker.hasTasks()) {
scope.rootTasks.addAll(tableTracker.tasks());
scope.table = true;
} else {
// Scope might have set to table in some previous iteration of loop, so reset it to false if table
// tracker has no tasks.
scope.table = false;
}
if (!TableType.VIRTUAL_VIEW.name().equals(tableEvent.getMetaData().getTable().getTableType())) {
/*
for table replication if we reach the max number of tasks then for the next run we will
try to reload the same table again, this is mainly for ease of understanding the code
as then we can avoid handling == > loading partitions for the table given that
the creation of table lead to reaching max tasks vs, loading next table since current
one does not have partitions.
*/
// for a table we explicitly try to load partitions as there is no separate partitions events.
LoadPartitions loadPartitions =
new LoadPartitions(loadContext, iterator.replLogger(), loadTaskTracker, tableEvent,
work.dbNameToLoadIn, tableContext, work.getMetricCollector());
TaskTracker partitionsTracker = loadPartitions.tasks();
partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
partitionsTracker);
tableTracker.debugLog("table");
partitionsTracker.debugLog("partitions for table");
}
break;
case Partition:
/*
This will happen only when loading tables and we reach the limit of number of tasks we can create;
hence we know here that the table should exist and there should be a lastPartitionName
*/
addLoadPartitionTasks(loadContext, next, dbTracker, iterator, scope, loadTaskTracker, tableTracker);
break;
case Function:
loadTaskTracker.update(addLoadFunctionTasks(loadContext, iterator, next, dbTracker, scope));
break;
case Constraint:
loadTaskTracker.update(addLoadConstraintsTasks(loadContext, next, dbTracker, scope));
break;
default:
break;
}
if (!loadingConstraint && !iterator.currentDbHasNext()) {
createEndReplLogTask(loadContext, scope, iterator.replLogger());
}
}
boolean addAnotherLoadTask = iterator.hasNext()
|| loadTaskTracker.hasReplicationState()
|| constraintIterator.hasNext();
if (addAnotherLoadTask) {
createBuilderTask(scope.rootTasks);
}
// Update last repl ID of the database only if the current dump is not incremental. If bootstrap
// is combined with incremental dump, it contains only tables to bootstrap. So, needn't change
// last repl ID of the database.
if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) {
loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope));
work.updateDbEventState(null);
}
if (childTasks == null) {
childTasks = new ArrayList<>();
}
childTasks.addAll(scope.rootTasks);
/*
Since there can be multiple rounds of this run all of which will be tied to the same
query id -- generated in compile phase , adding a additional UUID to the end to print each run
in separate files.
*/
LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks());
// Populate the driver context with the scratch dir info from the repl context, so that the
// temp dirs will be cleaned up later
context.getFsScratchDirs().putAll(loadContext.pathInfo.getFsScratchDirs());
if (!HiveConf.getBoolVar(conf, REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
createReplLoadCompleteAckTask();
}
LOG.info("completed load task run : {}", work.executedLoadTask());
return 0;
}
private void addLazyDataCopyTask(TaskTracker loadTaskTracker) throws IOException {
boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
if (dataCopyAtLoad) {
if (work.getExternalTableDataCopyItr() == null) {
Path extTableBackingFile = new Path(work.dumpDirectory, EximUtil.FILE_LIST_EXTERNAL);
try(FileList fileList = new FileList(extTableBackingFile, 0, conf)) {
work.setExternalTableDataCopyItr(fileList);
}
}
if (childTasks == null) {
childTasks = new ArrayList<>();
}
childTasks.addAll(work.externalTableCopyTasks(loadTaskTracker, conf));
}
}
private TaskTracker addLoadPartitionTasks(Context loadContext, BootstrapEvent next, TaskTracker dbTracker,
BootstrapEventsIterator iterator, Scope scope, TaskTracker loadTaskTracker,
TaskTracker tableTracker) throws Exception {
PartitionEvent event = (PartitionEvent) next;
TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn);
LoadPartitions loadPartitions =
new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker,
event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector());
/*
the tableTracker here should be a new instance and not an existing one as this can
only happen when we break in between loading partitions.
*/
TaskTracker partitionsTracker = loadPartitions.tasks();
partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
partitionsTracker);
partitionsTracker.debugLog("partitions");
return partitionsTracker;
}
private TaskTracker addLoadConstraintsTasks(Context loadContext,
BootstrapEvent next,
TaskTracker dbTracker,
Scope scope) throws IOException, SemanticException {
LoadConstraint loadConstraint =
new LoadConstraint(loadContext, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker);
TaskTracker constraintTracker = loadConstraint.tasks();
scope.rootTasks.addAll(constraintTracker.tasks());
constraintTracker.debugLog("constraints");
return constraintTracker;
}
private TaskTracker addLoadFunctionTasks(Context loadContext, BootstrapEventsIterator iterator, BootstrapEvent next,
TaskTracker dbTracker, Scope scope) throws IOException, SemanticException {
LoadFunction loadFunction = new LoadFunction(loadContext, iterator.replLogger(),
(FunctionEvent) next, work.dbNameToLoadIn, dbTracker, work.getMetricCollector());
TaskTracker functionsTracker = loadFunction.tasks();
if (!scope.database) {
scope.rootTasks.addAll(functionsTracker.tasks());
} else {
setUpDependencies(dbTracker, functionsTracker);
}
functionsTracker.debugLog("functions");
return functionsTracker;
}
public static Task<?> createViewTask(MetaData metaData, String dbNameToLoadIn, HiveConf conf)
throws SemanticException {
Table table = new Table(metaData.getTable());
String dbName = dbNameToLoadIn == null ? table.getDbName() : dbNameToLoadIn;
TableName tableName = HiveTableName.ofNullable(table.getTableName(), dbName);
String dbDotView = tableName.getNotEmptyDbTable();
String viewOriginalText = table.getViewOriginalText();
String viewExpandedText = table.getViewExpandedText();
if (!dbName.equals(table.getDbName())) {
// TODO: If the DB name doesn't match with the metadata from dump, then need to rewrite the original and expanded
// texts using new DB name. Currently it refers to the source database name.
}
CreateViewDesc desc = new CreateViewDesc(dbDotView, table.getAllCols(), null, table.getParameters(),
table.getPartColNames(), false, false, viewOriginalText, viewExpandedText, table.getPartCols());
desc.setReplicationSpec(metaData.getReplicationSpec());
desc.setOwnerName(table.getOwner());
return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), desc), conf);
}
/**
* If replication policy is changed between previous and current load, then the excluded tables in
* the new replication policy will be dropped.
*
* @throws HiveException Failed to get/drop the tables.
*/
private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveException {
// If all tables are included in replication scope, then nothing to be dropped.
if ((replScope == null) || replScope.includeAllTables()) {
return;
}
Hive db = getHive();
String dbName = replScope.getDbName();
// List all the tables that are excluded in the current repl scope.
Iterable<String> tableNames = Collections2.filter(db.getAllTables(dbName),
tableName -> {
assert (tableName != null);
return !tableName.toLowerCase().startsWith(
SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())
&& !replScope.tableIncludedInReplScope(tableName);
});
for (String table : tableNames) {
db.dropTable(dbName + "." + table, true);
}
LOG.info("Tables in the Database: {} that are excluded in the replication scope are dropped.",
dbName);
}
private void createReplLoadCompleteAckTask() {
if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
|| (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
//All repl load tasks are executed and status is 0, create the task to add the acknowledgement
AckWork replLoadAckWork = new AckWork(
new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString()));
Task<AckWork> loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf);
if (childTasks.isEmpty()) {
childTasks.add(loadAckWorkTask);
} else {
DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(Collections.singletonList(loadAckWorkTask)));
}
}
}
private void createEndReplLogTask(Context context, Scope scope,
ReplLogger replLogger) throws SemanticException {
Map<String, String> dbProps;
if (work.isIncrementalLoad()) {
dbProps = new HashMap<>();
dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(),
work.incrementalLoadTasksBuilder().eventTo().toString());
} else {
Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
dbProps = dbInMetadata.getParameters();
}
ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbProps, work.getMetricCollector());
Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, conf);
if (scope.rootTasks.isEmpty()) {
scope.rootTasks.add(replLogTask);
} else {
DAGTraversal.traverse(scope.rootTasks, new AddDependencyToLeaves(Collections.singletonList(replLogTask)));
}
}
/**
* There was a database update done before and we want to make sure we update the last repl
* id on this database as we are now going to switch to processing a new database.
* This has to be last task in the graph since if there are intermediate tasks and the last.repl.id
* is a root level task then in the execution phase the root level tasks will get executed first,
* however if any of the child tasks of the bootstrap load failed then even though the bootstrap has failed
* the last repl status of the target database will return a valid value, which will not represent
* the state of the database.
*/
private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope)
throws SemanticException {
/*
we don't want to put any limits on this task as this is essential before we start
processing new database events.
*/
TaskTracker taskTracker =
new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn,
new TaskTracker(maxTasks)).tasks();
AddDependencyToLeaves function = new AddDependencyToLeaves(taskTracker.tasks());
DAGTraversal.traverse(scope.rootTasks, function);
return taskTracker;
}
private void partitionsPostProcessing(BootstrapEventsIterator iterator,
Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
TaskTracker partitionsTracker) {
setUpDependencies(tableTracker, partitionsTracker);
if (!scope.database && !scope.table) {
scope.rootTasks.addAll(partitionsTracker.tasks());
}
loadTaskTracker.update(tableTracker);
loadTaskTracker.update(partitionsTracker);
if (partitionsTracker.hasReplicationState()) {
iterator.setReplicationState(partitionsTracker.replicationState());
}
}
/*
This sets up dependencies such that a child task is dependant on the parent to be complete.
*/
private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) {
if (parentTasks.hasTasks()) {
for (Task<?> parentTask : parentTasks.tasks()) {
for (Task<?> childTask : childTasks.tasks()) {
parentTask.addDependentTask(childTask);
}
}
} else {
for (Task<?> childTask : childTasks.tasks()) {
parentTasks.addTask(childTask);
}
}
}
private void createBuilderTask(List<Task<?>> rootTasks) {
// Use loadTask as dependencyCollection
Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf);
DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask));
}
private int executeIncrementalLoad() throws Exception {
// If replication policy is changed between previous and current repl load, then drop the tables
// that are excluded in the new replication policy.
dropTablesExcludedInReplScope(work.currentReplScope);
IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
// If incremental events are already applied, then check and perform if need to bootstrap any tables.
if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) {
if (work.hasBootstrapLoadTasks()) {
LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap "
+ "mode after applying all events.");
return executeBootStrapLoad();
}
}
List<Task<?>> childTasks = new ArrayList<>();
int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
TaskTracker tracker = new TaskTracker(maxTasks);
addLazyDataCopyTask(tracker);
childTasks.add(builder.build(context, getHive(), LOG, tracker));
// If there are no more events to be applied, add a task to update the last.repl.id of the
// target database to the event id of the last event considered by the dump. Next
// incremental cycle won't consider the events in this dump again if it starts from this id.
if (!builder.hasMoreWork()) {
// The name of the database to be loaded into is either specified directly in REPL LOAD
// command i.e. when dbNameToLoadIn has a valid dbname or is available through dump
// metadata during table level replication.
String dbName = work.dbNameToLoadIn;
if (dbName == null || StringUtils.isBlank(dbName)) {
if (work.currentReplScope != null) {
String replScopeDbName = work.currentReplScope.getDbName();
if (replScopeDbName != null && !"*".equals(replScopeDbName)) {
dbName = replScopeDbName;
}
}
}
// If we are replicating to multiple databases at a time, it's not
// possible to know which all databases we are replicating into and hence we can not
// update repl id in all those databases.
if (StringUtils.isNotBlank(dbName)) {
String lastEventid = builder.eventTo().toString();
Map<String, String> mapProp = new HashMap<>();
mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastEventid);
AlterDatabaseSetPropertiesDesc alterDbDesc =
new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
new ReplicationSpec(lastEventid, lastEventid));
Task<?> updateReplIdTask =
TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf);
DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(updateReplIdTask));
work.setLastReplIDUpdated(true);
LOG.debug("Added task to set last repl id of db " + dbName + " to " + lastEventid);
}
}
// Once all the incremental events are applied, enable bootstrap of tables if exist.
if (builder.hasMoreWork() || work.hasBootstrapLoadTasks()) {
DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf)));
}
if (this.childTasks == null) {
this.childTasks = new ArrayList<>();
}
this.childTasks.addAll(childTasks);
createReplLoadCompleteAckTask();
return 0;
}
}