blob: 05b5f96875ad38c644d810213b62eb9daf6f3685 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.hive.util;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.hive.HiveDRArgs;
import org.apache.falcon.hive.exception.HiveReplicationException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hive.hcatalog.api.repl.Command;
import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Utility class to handle Hive events for data-mirroring.
*/
public class EventUtils {
private static final String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
private static final int TIMEOUT_IN_SECS = 300;
private static final String JDBC_PREFIX = "jdbc:";
private static final int RETRY_ATTEMPTS = 3;
private Configuration conf = null;
private String sourceHiveServer2Uri = null;
private String sourceDatabase = null;
private String sourceNN = null;
private String sourceNNKerberosPrincipal = null;
private String jobNN = null;
private String jobNNKerberosPrincipal = null;
private String targetHiveServer2Uri = null;
private String sourceStagingPath = null;
private String targetStagingPath = null;
private String targetNN = null;
private String targetNNKerberosPrincipal = null;
private String sourceStagingUri = null;
private String targetStagingUri = null;
private List<Path> sourceCleanUpList = null;
private List<Path> targetCleanUpList = null;
private static final Logger LOG = LoggerFactory.getLogger(EventUtils.class);
private FileSystem sourceFileSystem = null;
private FileSystem jobFileSystem = null;
private FileSystem targetFileSystem = null;
private Connection sourceConnection = null;
private Connection targetConnection = null;
private Statement sourceStatement = null;
private Statement targetStatement = null;
private Map<String, Long> countersMap = null;
private List<ReplicationStatus> listReplicationStatus;
public EventUtils(Configuration conf) {
this.conf = conf;
sourceHiveServer2Uri = conf.get(HiveDRArgs.SOURCE_HS2_URI.getName());
sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName());
sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName());
sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName());
sourceStagingPath = conf.get(HiveDRArgs.SOURCE_STAGING_PATH.getName());
jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName());
jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName());
targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName());
targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName());
targetNN = conf.get(HiveDRArgs.TARGET_NN.getName());
targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName());
sourceCleanUpList = new ArrayList<>();
targetCleanUpList = new ArrayList<>();
countersMap = new HashMap<>();
}
public void setupConnection() throws Exception {
Class.forName(DRIVER_NAME);
DriverManager.setLoginTimeout(TIMEOUT_IN_SECS);
String authTokenString = ";auth=delegationToken";
//To bypass findbugs check, need to store empty password in Properties.
Properties password = new Properties();
password.put("password", "");
String user = "";
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
if (currentUser != null) {
user = currentUser.getShortUserName();
}
if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
.equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
String connString = JDBC_PREFIX + sourceHiveServer2Uri + "/" + sourceDatabase;
if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName()))) {
connString += authTokenString;
}
sourceConnection = DriverManager.getConnection(connString, user, password.getProperty("password"));
sourceStatement = sourceConnection.createStatement();
} else {
String connString = JDBC_PREFIX + targetHiveServer2Uri + "/" + sourceDatabase;
if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL.getName()))) {
connString += authTokenString;
}
targetConnection = DriverManager.getConnection(connString, user, password.getProperty("password"));
targetStatement = targetConnection.createStatement();
}
}
public void initializeFS() throws IOException {
LOG.info("Initializing staging directory");
sourceStagingUri = new Path(sourceNN, sourceStagingPath).toString();
targetStagingUri = new Path(targetNN, targetStagingPath).toString();
sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(conf, sourceNN, sourceNNKerberosPrincipal));
jobFileSystem = FileSystem.get(FileUtils.getConfiguration(conf, jobNN, jobNNKerberosPrincipal));
targetFileSystem = FileSystem.get(FileUtils.getConfiguration(conf, targetNN, targetNNKerberosPrincipal));
}
private String readEvents(Path eventFileName) throws IOException {
StringBuilder eventString = new StringBuilder();
BufferedReader in = new BufferedReader(new InputStreamReader(jobFileSystem.open(eventFileName)));
try {
String line;
while ((line=in.readLine())!=null) {
eventString.append(line);
eventString.append(DelimiterUtils.NEWLINE_DELIM);
}
} catch (Exception e) {
throw new IOException(e);
} finally {
IOUtils.closeQuietly(in);
}
return eventString.toString();
}
public void processEvents(String event) throws Exception {
listReplicationStatus = new ArrayList<>();
String[] eventSplit = event.split(DelimiterUtils.FIELD_DELIM);
String dbName = new String(Base64.decodeBase64(eventSplit[0]), "UTF-8");
String tableName = new String(Base64.decodeBase64(eventSplit[1]), "UTF-8");
String exportEventStr;
String importEventStr;
if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
.equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
exportEventStr = readEvents(new Path(eventSplit[2]));
if (StringUtils.isNotEmpty(exportEventStr)) {
LOG.info("Process the export statements for db {} table {}", dbName, tableName);
processCommands(exportEventStr, dbName, tableName, sourceStatement, sourceCleanUpList, false);
if (!sourceCleanUpList.isEmpty()) {
invokeCopy();
}
}
} else if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
.equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) {
importEventStr = readEvents(new Path(eventSplit[3]));
if (StringUtils.isNotEmpty(importEventStr)) {
LOG.info("Process the import statements for db {} table {}", dbName, tableName);
processCommands(importEventStr, dbName, tableName, targetStatement, targetCleanUpList, true);
}
}
}
public List<ReplicationStatus> getListReplicationStatus() {
return listReplicationStatus;
}
private void processCommands(String eventStr, String dbName, String tableName, Statement sqlStmt,
List<Path> cleanUpList, boolean isImportStatements)
throws SQLException, HiveReplicationException, IOException {
String[] commandList = eventStr.split(DelimiterUtils.NEWLINE_DELIM);
List<Command> deserializeCommand = new ArrayList<>();
for (String command : commandList) {
Command cmd = ReplicationUtils.deserializeCommand(command);
deserializeCommand.add(cmd);
List<String> cleanupLocations = cmd.cleanupLocationsAfterEvent();
cleanUpList.addAll(getCleanUpPaths(cleanupLocations));
}
for (Command cmd : deserializeCommand) {
try {
LOG.debug("Executing command : {} : {} ", cmd.getEventId(), cmd.toString());
executeCommand(cmd, dbName, tableName, sqlStmt, isImportStatements, 0);
} catch (Exception e) {
// clean up locations before failing.
cleanupEventLocations(sourceCleanUpList, sourceFileSystem);
cleanupEventLocations(targetCleanUpList, targetFileSystem);
throw new HiveReplicationException("Could not process replication command for "
+ " DB Name:" + dbName + ", Table Name:" + tableName, e);
}
}
}
private void executeCommand(Command cmd, String dbName, String tableName,
Statement sqlStmt, boolean isImportStatements, int attempt)
throws HiveReplicationException, SQLException, IOException {
for (final String stmt : cmd.get()) {
executeSqlStatement(cmd, dbName, tableName, sqlStmt, stmt, isImportStatements, attempt);
}
if (isImportStatements) {
addReplicationStatus(ReplicationStatus.Status.SUCCESS, dbName, tableName, cmd.getEventId());
}
}
private void executeSqlStatement(Command cmd, String dbName, String tableName,
Statement sqlStmt, String stmt, boolean isImportStatements, int attempt)
throws HiveReplicationException, SQLException, IOException {
try {
sqlStmt.execute(stmt);
} catch (SQLException sqeOuter) {
// Retry if command is retriable.
if (attempt < RETRY_ATTEMPTS && cmd.isRetriable()) {
if (isImportStatements) {
try {
cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), targetFileSystem);
} catch (IOException ioe) {
// Clean up failed before retry on target. Update failure status and return
addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName,
tableName, cmd.getEventId());
throw ioe;
}
} else {
cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), sourceFileSystem);
}
executeCommand(cmd, dbName, tableName, sqlStmt, isImportStatements, ++attempt);
return; // Retry succeeded, return without throwing an exception.
}
// If we reached here, retries have failed.
LOG.error("SQL Exception: {}", sqeOuter);
undoCommand(cmd, dbName, tableName, sqlStmt, isImportStatements);
if (isImportStatements) {
addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName, tableName, cmd.getEventId());
}
throw sqeOuter;
}
}
private static List<Path> getCleanUpPaths(List<String> cleanupLocations) {
List<Path> cleanupLocationPaths = new ArrayList<>();
for (String cleanupLocation : cleanupLocations) {
cleanupLocationPaths.add(new Path(cleanupLocation));
}
return cleanupLocationPaths;
}
private void undoCommand(Command cmd, String dbName,
String tableName, Statement sqlStmt, boolean isImportStatements)
throws SQLException, HiveReplicationException {
if (cmd.isUndoable()) {
try {
List<String> undoCommands = cmd.getUndo();
LOG.debug("Undo command: {}", StringUtils.join(undoCommands.toArray()));
if (undoCommands.size() != 0) {
for (final String undoStmt : undoCommands) {
sqlStmt.execute(undoStmt);
}
}
} catch (SQLException sqeInner) {
if (isImportStatements) {
addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName,
tableName, cmd.getEventId());
}
LOG.error("SQL Exception: {}", sqeInner);
throw sqeInner;
}
}
}
private void addReplicationStatus(ReplicationStatus.Status status, String dbName, String tableName, long eventId)
throws HiveReplicationException {
try {
String drJobName = conf.get(HiveDRArgs.JOB_NAME.getName());
ReplicationStatus rs = new ReplicationStatus(conf.get(HiveDRArgs.SOURCE_CLUSTER.getName()),
conf.get(HiveDRArgs.TARGET_CLUSTER.getName()), drJobName, dbName, tableName, status, eventId);
listReplicationStatus.add(rs);
} catch (HiveReplicationException hre) {
throw new HiveReplicationException("Could not update replication status store for "
+ " EventId:" + eventId
+ " DB Name:" + dbName
+ " Table Name:" + tableName
+ hre.toString());
}
}
public void invokeCopy() throws Exception {
DistCpOptions options = getDistCpOptions();
DistCp distCp = new DistCp(conf, options);
LOG.info("Started DistCp with source Path: {} \ttarget path: {}", sourceStagingUri, targetStagingUri);
Job distcpJob = distCp.execute();
LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
LOG.info("Completed DistCp");
if (distcpJob.getStatus().getState() == JobStatus.State.SUCCEEDED) {
countersMap = HiveDRUtils.fetchReplicationCounters(conf, distcpJob);
}
}
public DistCpOptions getDistCpOptions() {
// DistCpOptions expects the first argument to be a file OR a list of Paths
List<Path> sourceUris=new ArrayList<>();
sourceUris.add(new Path(sourceStagingUri));
DistCpOptions distcpOptions = new DistCpOptions(sourceUris, new Path(targetStagingUri));
// setSyncFolder(true) ensures directory structure is maintained when source is copied to target
distcpOptions.setSyncFolder(true);
// skipCRCCheck if TDE is enabled.
if (Boolean.parseBoolean(conf.get(HiveDRArgs.TDE_ENCRYPTION_ENABLED.getName()))) {
distcpOptions.setSkipCRC(true);
}
distcpOptions.setBlocking(true);
distcpOptions.setMaxMaps(Integer.parseInt(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName())));
distcpOptions.setMapBandwidth(Integer.parseInt(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName())));
return distcpOptions;
}
public Long getCounterValue(String counterKey) {
return countersMap.get(counterKey);
}
public boolean isCountersMapEmpty() {
return countersMap.size() == 0;
}
public void cleanEventsDirectory() throws IOException {
LOG.info("Cleaning staging directory");
cleanupEventLocations(sourceCleanUpList, sourceFileSystem);
cleanupEventLocations(targetCleanUpList, targetFileSystem);
}
private void cleanupEventLocations(List<Path> cleanupList, FileSystem fileSystem)
throws IOException {
for (Path cleanUpPath : cleanupList) {
try {
fileSystem.delete(cleanUpPath, true);
} catch (IOException ioe) {
LOG.error("Cleaning up of staging directory {} failed {}", cleanUpPath, ioe.toString());
throw ioe;
}
}
}
public void closeConnection() throws SQLException {
if (sourceStatement != null) {
sourceStatement.close();
}
if (targetStatement != null) {
targetStatement.close();
}
if (sourceConnection != null) {
sourceConnection.close();
}
if (targetConnection != null) {
targetConnection.close();
}
}
}