blob: 44f098992a9dd5c74632ae4091c2ec60e2df5c9f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.hive.util;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.hive.exception.HiveReplicationException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* DRStatusStore implementation for hive.
*/
public class HiveDRStatusStore extends DRStatusStore {
private static final Logger LOG = LoggerFactory.getLogger(DRStatusStore.class);
private FileSystem fileSystem;
private static final String DEFAULT_STORE_PATH = StringUtils.removeEnd
(DRStatusStore.BASE_DEFAULT_STORE_PATH, File.separator) + File.separator
+ "hiveReplicationStatusStore" + File.separator;
private static final FsPermission DEFAULT_STATUS_DIR_PERMISSION =
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE);
private static final String LATEST_FILE = "latest.json";
private static final int FILE_ROTATION_LIMIT = 10;
private static final int FILE_ROTATION_TIME = 86400000; // 1 day
public HiveDRStatusStore(FileSystem targetFileSystem) throws IOException {
init(targetFileSystem);
}
public HiveDRStatusStore(FileSystem targetFileSystem, String group) throws IOException {
HiveDRStatusStore.setStoreGroup(group);
init(targetFileSystem);
}
private void init(FileSystem targetFileSystem) throws IOException {
this.fileSystem = targetFileSystem;
Path basePath = new Path(BASE_DEFAULT_STORE_PATH);
FileUtils.validatePath(fileSystem, basePath);
// Current limitation is that only users who belong to DRStatusStore.storeGroup can submit HiveDR jobs.
// BaseDir for status store is created with permissions 770 so that all eligible users can access statusStore.
Path storePath = new Path(DEFAULT_STORE_PATH);
if (!fileSystem.exists(storePath)) {
if (!FileSystem.mkdirs(fileSystem, storePath, DEFAULT_STORE_PERMISSION)) {
throw new IOException("mkdir failed for " + DEFAULT_STORE_PATH);
}
} else {
if (!fileSystem.getFileStatus(storePath).getPermission().equals(DEFAULT_STORE_PERMISSION)) {
throw new IOException("Base dir " + DEFAULT_STORE_PATH + "does not have correct permissions. "
+ "Please set to 777");
}
}
}
/**
get all DB updated by the job. get all current table statuses for the DB merge the latest repl
status with prev table repl statuses. If all statuses are success, store the status as success
with largest eventId for the DB else store status as failure for the DB and lowest eventId.
*/
@Override
public void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList)
throws HiveReplicationException {
Map<String, DBReplicationStatus> dbStatusMap = new HashMap<String, DBReplicationStatus>();
for (ReplicationStatus status : statusList) {
if (!status.getJobName().equals(jobName)) {
String error = "JobName for status does not match current job \"" + jobName
+ "\". Status is " + status.toJsonString();
LOG.error(error);
throw new HiveReplicationException(error);
}
// init dbStatusMap and tableStatusMap from existing statuses.
if (!dbStatusMap.containsKey(status.getDatabase())) {
DBReplicationStatus dbStatus = getDbReplicationStatus(status.getSourceUri(), status.getTargetUri(),
status.getJobName(), status.getDatabase());
dbStatusMap.put(status.getDatabase(), dbStatus);
}
// update existing statuses with new status for db/tables
if (StringUtils.isEmpty(status.getTable())) { // db level replication status.
dbStatusMap.get(status.getDatabase()).updateDbStatus(status);
} else { // table level replication status
dbStatusMap.get(status.getDatabase()).updateTableStatus(status);
}
}
// write to disk
for (Map.Entry<String, DBReplicationStatus> entry : dbStatusMap.entrySet()) {
writeStatusFile(entry.getValue());
}
}
@Override
public ReplicationStatus getReplicationStatus(String source, String target, String jobName, String database)
throws HiveReplicationException {
return getReplicationStatus(source, target, jobName, database, null);
}
public ReplicationStatus getReplicationStatus(String source, String target,
String jobName, String database,
String table) throws HiveReplicationException {
if (StringUtils.isEmpty(table)) {
return getDbReplicationStatus(source, target, jobName, database).getDatabaseStatus();
} else {
return getDbReplicationStatus(source, target, jobName, database).getTableStatus(table);
}
}
@Override
public Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String source, String target,
String jobName, String database)
throws HiveReplicationException {
DBReplicationStatus dbReplicationStatus = getDbReplicationStatus(source, target, jobName, database);
return dbReplicationStatus.getTableStatusIterator();
}
@Override
public void deleteReplicationStatus(String jobName, String database) throws HiveReplicationException {
Path deletePath = getStatusDirPath(database, jobName);
try {
if (fileSystem.exists(deletePath)) {
fileSystem.delete(deletePath, true);
}
} catch (IOException e) {
throw new HiveReplicationException("Failed to delete status for Job "
+ jobName + " and DB "+ database, e);
}
}
private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName,
String database) throws HiveReplicationException{
DBReplicationStatus dbReplicationStatus = null;
Path statusDbDirPath = getStatusDbDirPath(database);
Path statusDirPath = getStatusDirPath(database, jobName);
// check if database name or jobName can contain chars not allowed by hdfs dir/file naming.
// if yes, use md5 of the same for dir names. prefer to use actual db names for readability.
try {
if (fileSystem.exists(statusDirPath)) {
dbReplicationStatus = readStatusFile(statusDirPath);
}
if (null == dbReplicationStatus) {
// Init replication state for this database
ReplicationStatus initDbStatus = new ReplicationStatus(source, target, jobName,
database, null, ReplicationStatus.Status.INIT, -1);
dbReplicationStatus = new DBReplicationStatus(initDbStatus);
// Create parent dir first with default status store permissions. FALCON-2057
if (!fileSystem.exists(statusDbDirPath)) {
if (!FileSystem.mkdirs(fileSystem, statusDbDirPath, DEFAULT_STATUS_DIR_PERMISSION)) {
String error = "mkdir failed for " + statusDbDirPath.toString();
LOG.error(error);
throw new HiveReplicationException(error);
}
}
if (!FileSystem.mkdirs(fileSystem, statusDirPath, DEFAULT_STATUS_DIR_PERMISSION)) {
String error = "mkdir failed for " + statusDirPath.toString();
LOG.error(error);
throw new HiveReplicationException(error);
}
writeStatusFile(dbReplicationStatus);
}
return dbReplicationStatus;
} catch (IOException e) {
String error = "Failed to get ReplicationStatus for job " + jobName;
LOG.error(error);
throw new HiveReplicationException(error);
}
}
private Path getStatusDirPath(DBReplicationStatus dbReplicationStatus) {
ReplicationStatus status = dbReplicationStatus.getDatabaseStatus();
return getStatusDirPath(status.getDatabase(), status.getJobName());
}
public Path getStatusDirPath(String database, String jobName) {
return new Path(getStatusDbDirPath(database), jobName);
}
public Path getStatusDbDirPath(String dbName) {
return new Path(new Path(BASE_DEFAULT_STORE_PATH), dbName.toLowerCase());
}
private void writeStatusFile(DBReplicationStatus dbReplicationStatus) throws HiveReplicationException {
dbReplicationStatus.updateDbStatusFromTableStatuses();
String statusDir = getStatusDirPath(dbReplicationStatus).toString();
try {
Path latestFile = new Path(statusDir + "/" + LATEST_FILE);
if (fileSystem.exists(latestFile)) {
Path renamedFile = new Path(statusDir + "/"
+ String.valueOf(fileSystem.getFileStatus(latestFile).getModificationTime()) + ".json");
fileSystem.rename(latestFile, renamedFile);
}
FSDataOutputStream stream = FileSystem.create(fileSystem, latestFile, DEFAULT_STATUS_DIR_PERMISSION);
stream.write(dbReplicationStatus.toJsonString().getBytes());
stream.close();
} catch (IOException e) {
String error = "Failed to write latest Replication status into dir " + statusDir;
LOG.error(error);
throw new HiveReplicationException(error);
}
rotateStatusFiles(new Path(statusDir), FILE_ROTATION_LIMIT, FILE_ROTATION_TIME);
}
public void rotateStatusFiles(Path statusDir, int numFiles, int maxFileAge) throws HiveReplicationException {
List<String> fileList = new ArrayList<String>();
long now = System.currentTimeMillis();
try {
RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(statusDir, false);
while (fileIterator.hasNext()) {
fileList.add(fileIterator.next().getPath().toString());
}
if (fileList.size() > (numFiles+1)) {
// delete some files, as long as they are older than the time.
Collections.sort(fileList);
for (String file : fileList.subList(0, (fileList.size() - numFiles + 1))) {
long modTime = fileSystem.getFileStatus(new Path(file)).getModificationTime();
if ((now - modTime) > maxFileAge) {
Path deleteFilePath = new Path(file);
if (fileSystem.exists(deleteFilePath)) {
fileSystem.delete(deleteFilePath, false);
}
}
}
}
} catch (IOException e) {
String error = "Failed to rotate status files in dir " + statusDir.toString();
LOG.error(error);
throw new HiveReplicationException(error);
}
}
private DBReplicationStatus readStatusFile(Path statusDirPath) throws HiveReplicationException {
try {
Path statusFile = new Path(statusDirPath.toString() + "/" + LATEST_FILE);
if ((!fileSystem.exists(statusDirPath)) || (!fileSystem.exists(statusFile))) {
return null;
} else {
return new DBReplicationStatus(IOUtils.toString(fileSystem.open(statusFile)));
}
} catch (IOException e) {
String error = "Failed to read latest Replication status from dir " + statusDirPath.toString();
LOG.error(error);
throw new HiveReplicationException(error);
}
}
public void checkForReplicationConflict(String newSource, String jobName,
String database, String table) throws HiveReplicationException {
try {
Path globPath = new Path(getStatusDbDirPath(database), "*" + File.separator + "latest.json");
FileStatus[] files = fileSystem.globStatus(globPath);
for(FileStatus file : files) {
DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString(
fileSystem.open(file.getPath())));
ReplicationStatus existingJob = dbFileStatus.getDatabaseStatus();
if (!(newSource.equals(existingJob.getSourceUri()))) {
throw new HiveReplicationException("Two different sources are attempting to replicate to same db "
+ database + ". New Source = " + newSource
+ ", Existing Source = " + existingJob.getSourceUri());
} // two different sources replicating to same DB. Conflict
if (jobName.equals(existingJob.getJobName())) {
continue;
} // same job, no conflict.
if (StringUtils.isEmpty(table)) {
// When it is DB level replication, two different jobs cannot replicate to same DB
throw new HiveReplicationException("Two different jobs are attempting to replicate to same db "
+ database.toLowerCase() + ". New Job = " + jobName
+ ", Existing Job = " + existingJob.getJobName());
}
/*
At this point, it is different table level jobs replicating from same newSource to same target. This is
allowed as long as the target tables are different. For example, job1 can replicate db1.table1 and
job2 can replicate db1.table2. Both jobs cannot replicate to same table.
*/
for(Map.Entry<String, ReplicationStatus> entry : dbFileStatus.getTableStatuses().entrySet()) {
if (table.equals(entry.getKey())) {
throw new HiveReplicationException("Two different jobs are trying to replicate to same table "
+ entry.getKey() + ". New job = " + jobName
+ ", Existing job = " + existingJob.getJobName());
}
}
}
} catch (IOException e) {
throw new HiveReplicationException("Failed to read status files for DB "
+ database, e);
}
}
}