blob: c7656bc64d0f742d587cd13e0ee98ea7fad1d064 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse.repl.dump;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.io.IOUtils;
import com.google.common.collect.Collections2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
public class Utils {
private static Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final String BOOTSTRAP_DUMP_STATE_KEY_PREFIX = "bootstrap.dump.state.";
private static final int DEF_BUF_SIZE = 8 * 1024;
public enum ReplDumpState {
IDLE, ACTIVE
}
public static void writeOutput(List<List<String>> listValues, Path outputFile, HiveConf hiveConf)
throws SemanticException {
writeOutput(listValues, outputFile, hiveConf, false);
}
/**
* Given a ReplChangeManger's encoded uri, it replaces the nameservice and returns the modified encoded uri.
*/
public static String replaceNameserviceInEncodedURI(String cmEncodedURI, HiveConf hiveConf) throws SemanticException {
String newNS = hiveConf.get(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname);
if (StringUtils.isEmpty(newNS)) {
throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
.format("Configuration 'hive.repl.ha.datapath.replace.remote.nameservice.name' is not valid "
+ newNS == null ? "null" : newNS, ReplUtils.REPL_HIVE_SERVICE));
}
String[] decodedURISplits = ReplChangeManager.decodeFileUri(cmEncodedURI);
// replace both data path and repl cm root path and construct new URI. Checksum and subDir will be same as old.
String modifiedURI = ReplChangeManager.encodeFileUri(replaceHost(decodedURISplits[0], newNS), decodedURISplits[1],
replaceHost(decodedURISplits[2], newNS), decodedURISplits[3]);
LOG.debug("Modified encoded uri {}, to {} ", cmEncodedURI, modifiedURI);
return modifiedURI;
}
private static String replaceHost(String originalURIStr, String newHost) throws SemanticException {
if (StringUtils.isEmpty(originalURIStr)) {
return originalURIStr;
}
URI origUri = URI.create(originalURIStr);
try {
return new URI(origUri.getScheme(),
origUri.getUserInfo(), newHost, origUri.getPort(),
origUri.getPath(), origUri.getQuery(),
origUri.getFragment()).toString();
} catch (URISyntaxException ex) {
throw new SemanticException(ex);
}
}
public static void writeOutput(List<List<String>> listValues, Path outputFile, HiveConf hiveConf, boolean update)
throws SemanticException {
Retryable retryable = Retryable.builder()
.withHiveConf(hiveConf)
.withRetryOnException(IOException.class).build();
try {
retryable.executeCallable((Callable<Void>) () -> {
DataOutputStream outStream = null;
try {
FileSystem fs = outputFile.getFileSystem(hiveConf);
outStream = fs.create(outputFile, update);
for (List<String> values : listValues) {
outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
for (int i = 1; i < values.size(); i++) {
outStream.write(Utilities.tabCode);
outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i)));
}
outStream.write(Utilities.newLineCode);
}
} finally {
IOUtils.closeStream(outStream);
}
return null;
});
} catch (Exception e) {
throw new SemanticException(e);
}
}
public static long writeFile(FileSystem fs, Path exportFilePath, InputStream is,
HiveConf conf) throws SemanticException {
Retryable retryable = Retryable.builder()
.withHiveConf(conf)
.withRetryOnException(IOException.class).build();
try {
return retryable.executeCallable(() -> {
FSDataOutputStream fos = null;
try {
long bytesWritten;
fos = fs.create(exportFilePath);
byte[] buffer = new byte[DEF_BUF_SIZE];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
fos.write(buffer, 0, bytesRead);
}
bytesWritten = fos.getPos();
return bytesWritten;
} finally {
if (fos != null) {
fos.close();
}
}
});
} catch (Exception e) {
throw new SemanticException(e);
}
}
public static void writeStackTrace(Exception e, Path outputFile, HiveConf conf) throws SemanticException {
Retryable retryable = Retryable.builder()
.withHiveConf(conf)
.withRetryOnException(IOException.class).withFailOnException(FileNotFoundException.class).build();
try {
retryable.executeCallable((Callable<Void>) () -> {
PrintWriter pw = new PrintWriter(outputFile.getFileSystem(conf).create(outputFile));
e.printStackTrace(pw);
pw.close();
return null;
});
} catch (Exception ex) {
throw new SemanticException(ex);
}
}
public static void writeOutput(String content, Path outputFile, HiveConf hiveConf)
throws SemanticException {
Retryable retryable = Retryable.builder()
.withHiveConf(hiveConf)
.withRetryOnException(IOException.class).build();
try {
retryable.executeCallable((Callable<Void>) () -> {
DataOutputStream outStream = null;
try {
FileSystem fs = outputFile.getFileSystem(hiveConf);
outStream = fs.create(outputFile);
outStream.writeBytes(content);
outStream.write(Utilities.newLineCode);
} finally {
IOUtils.closeStream(outStream);
}
return null;
});
} catch (Exception e) {
throw new SemanticException(e);
}
}
public static void create(Path outputFile, HiveConf hiveConf)
throws SemanticException {
Retryable retryable = Retryable.builder()
.withHiveConf(hiveConf)
.withRetryOnException(IOException.class).build();
try {
retryable.executeCallable((Callable<Void>) () -> {
FileSystem fs = outputFile.getFileSystem(hiveConf);
fs.create(outputFile).close();
return null;
});
} catch (Exception e) {
throw new SemanticException(e);
}
}
public static boolean fileExists(Path filePath, HiveConf hiveConf) throws IOException {
FileSystem fs = filePath.getFileSystem(hiveConf);
if (fs.exists(filePath)) {
return true;
}
return false;
}
public static Iterable<String> matchesDb(Hive db, String dbPattern) throws HiveException {
if (dbPattern == null) {
return db.getAllDatabases();
} else {
return db.getDatabasesByPattern(dbPattern);
}
}
public static Iterable<String> matchesTbl(Hive db, String dbName, String tblPattern)
throws HiveException {
if (tblPattern == null) {
return getAllTables(db, dbName, null);
} else {
return db.getTablesByPattern(dbName, tblPattern);
}
}
public static Iterable<String> matchesTbl(Hive db, String dbName, ReplScope replScope)
throws HiveException {
return getAllTables(db, dbName, replScope);
}
public static Collection<String> getAllTables(Hive db, String dbName, ReplScope replScope) throws HiveException {
return Collections2.filter(db.getAllTables(dbName),
tableName -> {
assert(tableName != null);
return !tableName.toLowerCase().startsWith(
SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())
&& ((replScope == null) || replScope.tableIncludedInReplScope(tableName));
});
}
public static String setDbBootstrapDumpState(Hive hiveDb, String dbName) throws HiveException {
Database database = hiveDb.getDatabase(dbName);
if (database == null) {
return null;
}
Map<String, String> newParams = new HashMap<>();
String uniqueKey = BOOTSTRAP_DUMP_STATE_KEY_PREFIX + UUID.randomUUID().toString();
newParams.put(uniqueKey, ReplDumpState.ACTIVE.name());
Map<String, String> params = database.getParameters();
// if both old params are not null, merge them
if (params != null) {
params.putAll(newParams);
database.setParameters(params);
} else {
// if one of them is null, replace the old params with the new one
database.setParameters(newParams);
}
hiveDb.alterDatabase(dbName, database);
LOG.info("REPL DUMP:: Set property for Database: {}, Property: {}, Value: {}",
dbName, uniqueKey, Utils.ReplDumpState.ACTIVE.name());
return uniqueKey;
}
public static void resetDbBootstrapDumpState(Hive hiveDb, String dbName,
String uniqueKey) throws HiveException {
Database database = hiveDb.getDatabase(dbName);
if (database != null) {
Map<String, String> params = database.getParameters();
if ((params != null) && params.containsKey(uniqueKey)) {
params.remove(uniqueKey);
database.setParameters(params);
hiveDb.alterDatabase(dbName, database);
LOG.info("REPL DUMP:: Reset property for Database: {}, Property: {}", dbName, uniqueKey);
}
}
}
public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) throws HiveException {
Database database = hiveDb.getDatabase(dbName);
if (database == null) {
return false;
}
Map<String, String> params = database.getParameters();
if (params == null) {
return false;
}
for (String key : params.keySet()) {
if (key.startsWith(BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
&& params.get(key).equals(ReplDumpState.ACTIVE.name())) {
return true;
}
}
return false;
}
/**
* validates if a table can be exported, similar to EximUtil.shouldExport with few replication
* specific checks.
*/
public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle, boolean isEventDump,
Set<String> bootstrapTableList, ReplScope oldReplScope, HiveConf hiveConf) {
if (replicationSpec == null) {
replicationSpec = new ReplicationSpec();
}
if (replicationSpec.isNoop() || tableHandle == null) {
return false;
}
// if its metadata only, then dump metadata of non native tables also.
if (tableHandle.isNonNative() && !replicationSpec.isMetadataOnly()) {
return false;
}
if (replicationSpec.isInReplicationScope()) {
if (tableHandle.isTemporary()) {
return false;
}
if (MetaStoreUtils.isExternalTable(tableHandle.getTTable())) {
boolean shouldReplicateExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
|| replicationSpec.isMetadataOnly();
if (isEventDump) {
// Skip dumping of events related to external tables if bootstrap is enabled on it.
// Also, skip if current table is included only in new policy but not in old policy.
shouldReplicateExternalTables = shouldReplicateExternalTables
&& !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)
&& ReplUtils.tableIncludedInReplScope(oldReplScope, tableHandle.getTableName());
}
return shouldReplicateExternalTables;
}
if (AcidUtils.isTransactionalTable(tableHandle.getTTable())) {
if (!ReplUtils.includeAcidTableInDump(hiveConf)) {
return false;
}
// Skip dumping events related to ACID tables if bootstrap is enabled for ACID tables.
if (isEventDump && hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
return false;
}
}
// Tables which are selected for bootstrap should be skipped. Those tables would be bootstrapped
// along with the current incremental replication dump and thus no need to dump events for them.
// Note: If any event (other than alter table with table level replication) dump reaches here, it means, table is
// included in new replication policy.
if (isEventDump) {
// If replication policy is replaced with new included/excluded tables list, then events
// corresponding to tables which are not included in old policy but included in new policy
// should be skipped.
if (!ReplUtils.tableIncludedInReplScope(oldReplScope, tableHandle.getTableName())) {
return false;
}
// Tables in the list of tables to be bootstrapped should be skipped.
return (bootstrapTableList == null || !bootstrapTableList.contains(tableHandle.getTableName().toLowerCase()));
}
}
return true;
}
public static boolean shouldReplicate(NotificationEvent tableForEvent,
ReplicationSpec replicationSpec, Hive db,
boolean isEventDump, Set<String> bootstrapTableList,
ReplScope oldReplScope,
HiveConf hiveConf) {
Table table;
try {
table = db.getTable(tableForEvent.getDbName(), tableForEvent.getTableName());
} catch (HiveException e) {
LOG.info(
"error while getting table info for" + tableForEvent.getDbName() + "." + tableForEvent
.getTableName(), e);
return false;
}
return shouldReplicate(replicationSpec, table, isEventDump, bootstrapTableList, oldReplScope, hiveConf);
}
static List<Path> getDataPathList(Path fromPath, ReplicationSpec replicationSpec, HiveConf conf)
throws IOException {
if (replicationSpec.isTransactionalTableDump()) {
try {
conf.set(ValidTxnList.VALID_TXNS_KEY, replicationSpec.getValidTxnList());
return AcidUtils.getValidDataPaths(fromPath, conf, replicationSpec.getValidWriteIdList());
} catch (FileNotFoundException e) {
throw new IOException(ErrorMsg.FILE_NOT_FOUND.format(e.getMessage()), e);
}
} else {
return Collections.singletonList(fromPath);
}
}
public static boolean shouldDumpMetaDataOnly(HiveConf conf) {
return conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
}
public static boolean shouldDumpMetaDataOnlyForExternalTables(Table table, HiveConf conf) {
return (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) &&
table.getTableType().equals(TableType.EXTERNAL_TABLE) &&
conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE));
}
}