blob: 4b29ff572acc6efdad158389a00a169b40315d0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
CommitTxnHandler(NotificationEvent event) {
super(event);
}
@Override
CommitTxnMessage eventMessage(String stringRepresentation) {
return deserializer.getCommitTxnMessage(stringRepresentation);
}
private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext,
Path dataPath)
throws IOException, LoginException, MetaException, HiveFatalException, SemanticException {
boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
if (copyAtLoad) {
// encoded filename/checksum of files, write into _files
writeEncodedDumpFiles(withinContext, files, dataPath);
} else {
for (String file : files) {
writeFileEntry(qlMdTable, ptn, file, withinContext);
}
}
}
private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable,
List<Partition> qlPtns, List<List<String>> fileListArray)
throws IOException, SemanticException, LoginException, MetaException, HiveFatalException {
if (fileListArray == null || fileListArray.isEmpty()) {
return;
}
Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
// In case of ACID operations, same directory may have many other sub directory for different write id stmt id
// combination. So we can not set isreplace to true.
withinContext.replicationSpec.setIsReplace(false);
EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
qlMdTable, qlPtns,
withinContext.replicationSpec,
withinContext.hiveConf);
if ((null == qlPtns) || qlPtns.isEmpty()) {
Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext, dataPath);
} else {
for (int idx = 0; idx < qlPtns.size(); idx++) {
Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator
+ qlPtns.get(idx).getName());
writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext, dataPath);
}
}
}
private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable,
List<Partition> qlPtns, List<List<String>> fileListArray)
throws IOException, SemanticException, LoginException, MetaException, HiveFatalException {
Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot, qlMdTable.getDbName(), qlMdTable.getTableName());
Context context = new Context(withinContext);
context.setEventRoot(newPath);
createDumpFile(context, qlMdTable, qlPtns, fileListArray);
}
private List<WriteEventInfo> getAllWriteEventInfo(Context withinContext) throws Exception {
String contextDbName = StringUtils.normalizeIdentifier(withinContext.replScope.getDbName());
RawStore rawStore = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf);
List<WriteEventInfo> writeEventInfoList
= rawStore.getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, null);
return ((writeEventInfoList == null)
? null
: new ArrayList<>(Collections2.filter(writeEventInfoList,
writeEventInfo -> {
assert(writeEventInfo != null);
// If replication policy is replaced with new included/excluded tables list, then events
// corresponding to tables which are included in both old and new policies should be dumped.
// If table is included in new policy but not in old policy, then it should be skipped.
// Those tables would be bootstrapped along with the current incremental
// replication dump. If the table is in the list of tables to be bootstrapped, then
// it should be skipped.
return (ReplUtils.tableIncludedInReplScope(withinContext.replScope, writeEventInfo.getTable())
&& ReplUtils.tableIncludedInReplScope(withinContext.oldReplScope, writeEventInfo.getTable())
&& !withinContext.getTablesForBootstrap().contains(writeEventInfo.getTable().toLowerCase()));
})));
}
@Override
public void handle(Context withinContext) throws Exception {
if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) {
return;
}
LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), eventMessageAsJSON);
String payload = eventMessageAsJSON;
if (!withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
boolean replicatingAcidEvents = true;
if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
// We do not dump ACID table related events when taking a bootstrap dump of ACID tables as
// part of an incremental dump. So we shouldn't be dumping any changes to ACID table as
// part of the commit. At the same time we need to dump the commit transaction event so
// that replication can end a transaction opened when replaying open transaction event.
LOG.debug("writeEventsInfoList will be removed from commit message because we are " +
"bootstrapping acid tables.");
replicatingAcidEvents = false;
} else if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) {
// Similar to the above condition, only for testing purposes, if the config doesn't allow
// ACID tables to be replicated, we don't dump any changes to the ACID tables as part of
// commit.
LOG.debug("writeEventsInfoList will be removed from commit message because we are " +
"not dumping acid tables.");
replicatingAcidEvents = false;
}
List<WriteEventInfo> writeEventInfoList = null;
if (replicatingAcidEvents) {
writeEventInfoList = getAllWriteEventInfo(withinContext);
}
int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0);
if (numEntry != 0) {
eventMessage.addWriteEventInfo(writeEventInfoList);
payload = jsonMessageEncoder.getSerializer().serialize(eventMessage);
LOG.debug("payload for commit txn event : " + eventMessageAsJSON);
}
org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null;
org.apache.hadoop.hive.ql.metadata.Table qlMdTable = null;
List<Partition> qlPtns = new ArrayList<>();
List<List<String>> filesTobeAdded = new ArrayList<>();
// The below loop creates dump directory for each table. It reads through the list of write notification events,
// groups the entries per table and creates the lists of files to be replicated. The event directory in the dump
// path will have subdirectory for each table. This folder will have metadata for the table and the list of files
// to be replicated. The entries are added in the table with txn id, db name,table name, partition name
// combination as primary key, so the entries with same table will come together. Only basic table metadata is
// used during import, so we need not dump the latest table metadata.
for (int idx = 0; idx < numEntry; idx++) {
qlMdTable = new org.apache.hadoop.hive.ql.metadata.Table(eventMessage.getTableObj(idx));
if (qlMdTablePrev == null) {
qlMdTablePrev = qlMdTable;
}
// one dump directory per table
if (!qlMdTablePrev.getCompleteName().equals(qlMdTable.getCompleteName())) {
createDumpFileForTable(withinContext, qlMdTablePrev, qlPtns, filesTobeAdded);
qlPtns = new ArrayList<>();
filesTobeAdded = new ArrayList<>();
qlMdTablePrev = qlMdTable;
}
if (qlMdTable.isPartitioned() && (null != eventMessage.getPartitionObj(idx))) {
qlPtns.add(new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable,
eventMessage.getPartitionObj(idx)));
}
filesTobeAdded.add(Lists.newArrayList(
ReplChangeManager.getListFromSeparatedString(eventMessage.getFiles(idx))));
}
//Dump last table in the list
if (qlMdTablePrev != null) {
createDumpFileForTable(withinContext, qlMdTablePrev, qlPtns, filesTobeAdded);
}
}
DumpMetaData dmd = withinContext.createDmd(this);
dmd.setPayload(payload);
dmd.write();
}
@Override
public DumpType dumpType() {
return DumpType.EVENT_COMMIT_TXN;
}
}