package org.apache.falcon.hive.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.hive.exception.HiveReplicationException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class HiveDRStatusStore extends DRStatusStore {
private static final Logger LOG = LoggerFactory.getLogger(DRStatusStore.class);
private FileSystem fileSystem;
private static final String DEFAULT_STORE_PATH = StringUtils.removeEnd
(DRStatusStore.BASE_DEFAULT_STORE_PATH, File.separator) + File.separator
+ "hiveReplicationStatusStore" + File.separator;
private static final FsPermission DEFAULT_STATUS_DIR_PERMISSION =
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE);
private static final String LATEST_FILE = "latest.json";
private static final int FILE_ROTATION_LIMIT = 10;
private static final int FILE_ROTATION_TIME = 86400000; // 1 day
public HiveDRStatusStore(FileSystem targetFileSystem) throws IOException {
public HiveDRStatusStore(FileSystem targetFileSystem, String group) throws IOException {
private void init(FileSystem targetFileSystem) throws IOException {
this.fileSystem = targetFileSystem;
Path basePath = new Path(BASE_DEFAULT_STORE_PATH);
FileUtils.validatePath(fileSystem, basePath);
// Current limitation is that only users who belong to DRStatusStore.storeGroup can submit HiveDR jobs.
// BaseDir for status store is created with permissions 770 so that all eligible users can access statusStore.
Path storePath = new Path(DEFAULT_STORE_PATH);
if (!fileSystem.exists(storePath)) {
if (!FileSystem.mkdirs(fileSystem, storePath, DEFAULT_STORE_PERMISSION)) {
throw new IOException("mkdir failed for " + DEFAULT_STORE_PATH);
} else {
if (!fileSystem.getFileStatus(storePath).getPermission().equals(DEFAULT_STORE_PERMISSION)) {
throw new IOException("Base dir " + DEFAULT_STORE_PATH + "does not have correct permissions. "
+ "Please set to 777");
public void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList)
throws HiveReplicationException {
Map<String, DBReplicationStatus> dbStatusMap = new HashMap<String, DBReplicationStatus>();
for (ReplicationStatus status : statusList) {
if (!status.getJobName().equals(jobName)) {
String error = "JobName for status does not match current job \"" + jobName
+ "\". Status is " + status.toJsonString();
throw new HiveReplicationException(error);
// init dbStatusMap and tableStatusMap from existing statuses.
if (!dbStatusMap.containsKey(status.getDatabase())) {
DBReplicationStatus dbStatus = getDbReplicationStatus(status.getSourceUri(), status.getTargetUri(),
status.getJobName(), status.getDatabase());
dbStatusMap.put(status.getDatabase(), dbStatus);
// update existing statuses with new status for db/tables
if (StringUtils.isEmpty(status.getTable())) { // db level replication status.
} else { // table level replication status
// write to disk
for (Map.Entry<String, DBReplicationStatus> entry : dbStatusMap.entrySet()) {
public ReplicationStatus getReplicationStatus(String source, String target, String jobName, String database)
throws HiveReplicationException {
return getReplicationStatus(source, target, jobName, database, null);
public ReplicationStatus getReplicationStatus(String source, String target,
String jobName, String database,
String table) throws HiveReplicationException {
if (StringUtils.isEmpty(table)) {
return getDbReplicationStatus(source, target, jobName, database).getDatabaseStatus();
} else {
return getDbReplicationStatus(source, target, jobName, database).getTableStatus(table);
public Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String source, String target,
String jobName, String database)
throws HiveReplicationException {
DBReplicationStatus dbReplicationStatus = getDbReplicationStatus(source, target, jobName, database);
return dbReplicationStatus.getTableStatusIterator();
public void deleteReplicationStatus(String jobName, String database) throws HiveReplicationException {
Path deletePath = getStatusDirPath(database, jobName);
try {
if (fileSystem.exists(deletePath)) {
fileSystem.delete(deletePath, true);
} catch (IOException e) {
throw new HiveReplicationException("Failed to delete status for Job "
+ jobName + " and DB "+ database, e);
private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName,
String database) throws HiveReplicationException{
DBReplicationStatus dbReplicationStatus = null;
Path statusDbDirPath = getStatusDbDirPath(database);
Path statusDirPath = getStatusDirPath(database, jobName);
// check if database name or jobName can contain chars not allowed by hdfs dir/file naming.
// if yes, use md5 of the same for dir names. prefer to use actual db names for readability.
try {
if (fileSystem.exists(statusDirPath)) {
dbReplicationStatus = readStatusFile(statusDirPath);
if (null == dbReplicationStatus) {
// Init replication state for this database
ReplicationStatus initDbStatus = new ReplicationStatus(source, target, jobName,
database, null, ReplicationStatus.Status.INIT, -1);
dbReplicationStatus = new DBReplicationStatus(initDbStatus);
// Create parent dir first with default status store permissions. FALCON-2057
if (!fileSystem.exists(statusDbDirPath)) {
if (!FileSystem.mkdirs(fileSystem, statusDbDirPath, DEFAULT_STATUS_DIR_PERMISSION)) {
String error = "mkdir failed for " + statusDbDirPath.toString();
throw new HiveReplicationException(error);
if (!FileSystem.mkdirs(fileSystem, statusDirPath, DEFAULT_STATUS_DIR_PERMISSION)) {
String error = "mkdir failed for " + statusDirPath.toString();
throw new HiveReplicationException(error);
return dbReplicationStatus;
} catch (IOException e) {
String error = "Failed to get ReplicationStatus for job " + jobName;
throw new HiveReplicationException(error);
private Path getStatusDirPath(DBReplicationStatus dbReplicationStatus) {
ReplicationStatus status = dbReplicationStatus.getDatabaseStatus();
return getStatusDirPath(status.getDatabase(), status.getJobName());
public Path getStatusDirPath(String database, String jobName) {
return new Path(getStatusDbDirPath(database), jobName);
public Path getStatusDbDirPath(String dbName) {
return new Path(new Path(BASE_DEFAULT_STORE_PATH), dbName.toLowerCase());
private void writeStatusFile(DBReplicationStatus dbReplicationStatus) throws HiveReplicationException {
String statusDir = getStatusDirPath(dbReplicationStatus).toString();
try {
Path latestFile = new Path(statusDir + "/" + LATEST_FILE);
if (fileSystem.exists(latestFile)) {
Path renamedFile = new Path(statusDir + "/"
+ String.valueOf(fileSystem.getFileStatus(latestFile).getModificationTime()) + ".json");
fileSystem.rename(latestFile, renamedFile);
FSDataOutputStream stream = FileSystem.create(fileSystem, latestFile, DEFAULT_STATUS_DIR_PERMISSION);
} catch (IOException e) {
String error = "Failed to write latest Replication status into dir " + statusDir;
throw new HiveReplicationException(error);
rotateStatusFiles(new Path(statusDir), FILE_ROTATION_LIMIT, FILE_ROTATION_TIME);
public void rotateStatusFiles(Path statusDir, int numFiles, int maxFileAge) throws HiveReplicationException {
List<String> fileList = new ArrayList<String>();
long now = System.currentTimeMillis();
try {
RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(statusDir, false);
while (fileIterator.hasNext()) {
if (fileList.size() > (numFiles+1)) {
// delete some files, as long as they are older than the time.
for (String file : fileList.subList(0, (fileList.size() - numFiles + 1))) {
long modTime = fileSystem.getFileStatus(new Path(file)).getModificationTime();
if ((now - modTime) > maxFileAge) {
Path deleteFilePath = new Path(file);
if (fileSystem.exists(deleteFilePath)) {
fileSystem.delete(deleteFilePath, false);
} catch (IOException e) {
String error = "Failed to rotate status files in dir " + statusDir.toString();
throw new HiveReplicationException(error);
private DBReplicationStatus readStatusFile(Path statusDirPath) throws HiveReplicationException {
try {
Path statusFile = new Path(statusDirPath.toString() + "/" + LATEST_FILE);
if ((!fileSystem.exists(statusDirPath)) || (!fileSystem.exists(statusFile))) {
return null;
} else {
return new DBReplicationStatus(IOUtils.toString(;
} catch (IOException e) {
String error = "Failed to read latest Replication status from dir " + statusDirPath.toString();
throw new HiveReplicationException(error);
public void checkForReplicationConflict(String newSource, String jobName,
String database, String table) throws HiveReplicationException {
try {
Path globPath = new Path(getStatusDbDirPath(database), "*" + File.separator + "latest.json");
FileStatus[] files = fileSystem.globStatus(globPath);
for(FileStatus file : files) {
DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString(;
ReplicationStatus existingJob = dbFileStatus.getDatabaseStatus();
if (!(newSource.equals(existingJob.getSourceUri()))) {
throw new HiveReplicationException("Two different sources are attempting to replicate to same db "
+ database + ". New Source = " + newSource
+ ", Existing Source = " + existingJob.getSourceUri());
} // two different sources replicating to same DB. Conflict
if (jobName.equals(existingJob.getJobName())) {
} // same job, no conflict.
if (StringUtils.isEmpty(table)) {
// When it is DB level replication, two different jobs cannot replicate to same DB
throw new HiveReplicationException("Two different jobs are attempting to replicate to same db "
+ database.toLowerCase() + ". New Job = " + jobName
+ ", Existing Job = " + existingJob.getJobName());
At this point, it is different table level jobs replicating from same newSource to same target. This is
allowed as long as the target tables are different. For example, job1 can replicate db1.table1 and
job2 can replicate db1.table2. Both jobs cannot replicate to same table.
for(Map.Entry<String, ReplicationStatus> entry : dbFileStatus.getTableStatuses().entrySet()) {
if (table.equals(entry.getKey())) {
throw new HiveReplicationException("Two different jobs are trying to replicate to same table "
+ entry.getKey() + ". New job = " + jobName
+ ", Existing job = " + existingJob.getJobName());
} catch (IOException e) {
throw new HiveReplicationException("Failed to read status files for DB "
+ database, e);