blob: a603debb4ad973535c8e2559c89026d6075df600 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.hive;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.hive.exception.HiveReplicationException;
import org.apache.falcon.hive.util.DRStatusStore;
import org.apache.falcon.hive.util.DelimiterUtils;
import org.apache.falcon.hive.util.FileUtils;
import org.apache.falcon.hive.util.HiveDRUtils;
import org.apache.falcon.hive.util.HiveMetastoreUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatTable;
import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
import org.apache.hive.hcatalog.common.HCatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Sources meta store change events from Hive.
*/
public class LastReplicatedEvents {
private static final Logger LOG = LoggerFactory.getLogger(LastReplicatedEvents.class);
private final HCatClient targetMetastoreClient;
private final DRStatusStore drStore;
private final FileSystem jobFS;
private Path eventsInputDirPath;
/* TODO handle cases when no events. files will be empty and lists will be empty */
public LastReplicatedEvents(Configuration conf, String targetMetastoreUri,
String targetMetastoreKerberosPrincipal,
String targetHive2KerberosPrincipal,
DRStatusStore drStore, HiveDROptions inputOptions) throws Exception {
targetMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient(targetMetastoreUri,
targetMetastoreKerberosPrincipal, targetHive2KerberosPrincipal);
jobFS = FileSystem.get(conf);
this.drStore = drStore;
init(inputOptions.getJobName());
}
private void init(final String jobName) throws Exception {
// Create base dir to store events on cluster where job is running
Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH);
// Validate base path
FileUtils.validatePath(jobFS, new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH));
if (!jobFS.exists(dir)) {
if (!FileSystem.mkdirs(jobFS, dir, FileUtils.DEFAULT_DIR_PERMISSION)) {
throw new IOException("Creating directory failed: " + dir);
}
}
eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, jobName);
if (!jobFS.exists(eventsInputDirPath)) {
if (!jobFS.mkdirs(eventsInputDirPath)) {
throw new Exception("Creating directory failed: " + eventsInputDirPath);
}
}
}
public String getLastEvents(HiveDROptions inputOptions) throws Exception {
HiveDRUtils.ReplicationType replicationType = HiveDRUtils.getReplicationType(inputOptions.getSourceTables());
LOG.info("Obtaining last events for replicationType : {}", replicationType);
HashMap<String, Long> lastEvents = new HashMap<String, Long>();
if (replicationType == HiveDRUtils.ReplicationType.DB) {
List<String> dbNames = inputOptions.getSourceDatabases();
for (String db : dbNames) {
lastEvents.put(db, getLastSavedEventId(inputOptions, db, null));
}
} else {
List<String> tableNames = inputOptions.getSourceTables();
String db = inputOptions.getSourceDatabases().get(0);
for (String tableName : tableNames) {
lastEvents.put(db + "." + tableName, getLastSavedEventId(inputOptions, db, tableName));
}
}
return persistLastEventsToFile(lastEvents, inputOptions.getJobName());
}
private long getLastSavedEventId(HiveDROptions inputOptions, final String dbName,
final String tableName) throws Exception {
HiveDRUtils.ReplicationType replicationType = HiveDRUtils.getReplicationType(inputOptions.getSourceTables());
String jobName = inputOptions.getJobName();
String sourceMetastoreUri = inputOptions.getSourceMetastoreUri();
String targetMetastoreUri = inputOptions.getTargetMetastoreUri();
long eventId = 0;
if (HiveDRUtils.ReplicationType.DB == replicationType) {
eventId = drStore.getReplicationStatus(sourceMetastoreUri, targetMetastoreUri,
jobName, dbName).getEventId();
} else if (HiveDRUtils.ReplicationType.TABLE == replicationType) {
eventId = drStore.getReplicationStatus(sourceMetastoreUri, targetMetastoreUri,
jobName, dbName, tableName).getEventId();
}
if (eventId == -1) {
if (HiveDRUtils.ReplicationType.DB == replicationType) {
/*
* API to get last repl ID for a DB is very expensive, so Hive does not want to make it public.
* HiveDrTool finds last repl id for DB by finding min last repl id of all tables.
*/
eventId = getLastReplicationIdForDatabase(dbName);
} else {
HCatTable table = targetMetastoreClient.getTable(dbName, tableName);
eventId = ReplicationUtils.getLastReplicationId(table);
}
}
if ((StringUtils.isEmpty(tableName))) {
LOG.info("Last replicated eventId for DB : {} is {}", dbName, eventId);
} else {
LOG.info("Last replicated eventId for DB : {} Table : {} is {}", dbName, tableName, eventId);
}
return eventId;
}
private long getLastReplicationIdForDatabase(String databaseName) throws HiveReplicationException {
/*
* This is a very expensive method and should only be called during first dbReplication instance.
*/
long eventId = Long.MAX_VALUE;
try {
List<String> tableList = targetMetastoreClient.listTableNamesByPattern(databaseName, "*");
for (String tableName : tableList) {
long temp = ReplicationUtils.getLastReplicationId(
targetMetastoreClient.getTable(databaseName, tableName));
if (temp < eventId) {
eventId = temp;
}
}
return (eventId == Long.MAX_VALUE) ? 0 : eventId;
} catch (HCatException e) {
throw new HiveReplicationException("Unable to find last replication id for database "
+ databaseName, e);
}
}
public String persistLastEventsToFile(final HashMap<String, Long> lastEvents,
final String identifier) throws IOException {
if (lastEvents.size()!=0) {
Path eventsFile = new Path(eventsInputDirPath.toString(), identifier+".id");
OutputStream out = null;
try {
out = FileSystem.create(jobFS, eventsFile, FileUtils.FS_PERMISSION_700);
for (Map.Entry<String, Long> entry : lastEvents.entrySet()) {
out.write(entry.getKey().getBytes());
out.write(DelimiterUtils.TAB_DELIM.getBytes());
out.write(String.valueOf(entry.getValue()).getBytes());
out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
}
out.flush();
} finally {
IOUtils.closeQuietly(out);
}
return jobFS.makeQualified(eventsFile).toString();
} else {
return null;
}
}
public void cleanUp() throws Exception {
if (targetMetastoreClient != null) {
targetMetastoreClient.close();
}
}
}