blob: 3d3badf26c2ec88e0f8af4fdc5a8d9560076cf99 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.hive.util;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.hive.ReplicationEventMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hive.hcatalog.api.repl.Command;
import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
/**
* Utility methods for event sourcer.
*/
public class EventSourcerUtils {
private static final String METAFILE_EXTENSION = ".meta";
private static final String SRCFILE_EXTENSION = ".src";
private static final String TGTFILE_EXTENSION = ".tgt";
private Path eventsInputDirPath;
private final boolean shouldKeepHistory;
private final FileSystem jobFS;
private static final Logger LOG = LoggerFactory.getLogger(EventSourcerUtils.class);
public EventSourcerUtils(final Configuration conf, final boolean shouldKeepHistory,
final String jobName) throws Exception {
this.shouldKeepHistory = shouldKeepHistory;
jobFS = FileSystem.get(conf);
init(jobName);
}
private void init(final String jobName) throws Exception {
// Create base dir to store events on cluster where job is running
Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH);
// Validate base path
FileUtils.validatePath(jobFS, new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH));
if (!jobFS.exists(dir)) {
if (!FileSystem.mkdirs(jobFS, dir, FileUtils.DEFAULT_DIR_PERMISSION)) {
throw new IOException("Creating directory failed: " + dir);
}
}
eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, jobName);
if (!jobFS.exists(eventsInputDirPath)) {
if (!jobFS.mkdirs(eventsInputDirPath)) {
throw new Exception("Creating directory failed: " + eventsInputDirPath);
}
}
}
public OutputStream getFileOutputStream(final String path) throws Exception {
return FileSystem.create(jobFS, new Path(path), FileUtils.FS_PERMISSION_700);
}
public void closeOutputStream(OutputStream out) throws IOException {
if (out != null) {
try {
out.flush();
} finally {
IOUtils.closeQuietly(out);
}
}
}
public void persistReplicationEvents(final OutputStream out,
final java.lang.Iterable
<? extends org.apache.hive.hcatalog.api.repl.Command> cmds)
throws Exception {
for (Command cmd : cmds) {
persistReplicationEvents(out, cmd);
}
}
public void persistReplicationEvents(final OutputStream out,
final Command cmd) throws Exception {
out.write(ReplicationUtils.serializeCommand(cmd).getBytes());
LOG.debug("HiveDR Serialized Repl Command : {}", cmd);
out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
}
public String persistToMetaFile(final ReplicationEventMetadata data, final String identifier) throws IOException {
if (data != null && data.getEventFileMetadata() != null && !data.getEventFileMetadata().isEmpty()) {
Path metaFilename = new Path(eventsInputDirPath.toString(), identifier + METAFILE_EXTENSION);
OutputStream out = null;
try {
out = FileSystem.create(jobFS, metaFilename, FileUtils.FS_PERMISSION_700);
for (Map.Entry<String, String> entry : data.getEventFileMetadata().entrySet()) {
out.write(entry.getKey().getBytes());
out.write(DelimiterUtils.FIELD_DELIM.getBytes());
out.write(entry.getValue().getBytes());
out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
}
out.flush();
} finally {
IOUtils.closeQuietly(out);
}
return jobFS.makeQualified(metaFilename).toString();
} else {
return null;
}
}
public static void updateEventMetadata(ReplicationEventMetadata data, final String dbName, final String tableName,
final String srcFilename, final String tgtFilename) {
if (data == null || data.getEventFileMetadata() == null) {
return;
}
StringBuilder key = new StringBuilder();
if (StringUtils.isNotEmpty(dbName)) {
key.append(Base64.encodeBase64URLSafeString(dbName.toLowerCase().getBytes()));
}
key.append(DelimiterUtils.FIELD_DELIM);
if (StringUtils.isNotEmpty(tableName)) {
key.append(Base64.encodeBase64URLSafeString(tableName.toLowerCase().getBytes()));
}
StringBuilder value = new StringBuilder();
if (StringUtils.isNotEmpty(srcFilename)) {
value.append(srcFilename);
}
value.append(DelimiterUtils.FIELD_DELIM);
if (StringUtils.isNotEmpty(tgtFilename)) {
value.append(tgtFilename);
}
data.getEventFileMetadata().put(key.toString(), value.toString());
}
public static void updateEventMetadata(ReplicationEventMetadata data, final ReplicationEventMetadata inputData) {
if (data == null || data.getEventFileMetadata() == null || inputData == null
|| inputData.getEventFileMetadata() == null || inputData.getEventFileMetadata().isEmpty()) {
return;
}
data.getEventFileMetadata().putAll(inputData.getEventFileMetadata());
}
public Path getSrcFileName(final String identifier) {
return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + SRCFILE_EXTENSION));
}
public Path getTargetFileName(final String identifier) {
return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + TGTFILE_EXTENSION));
}
public void cleanUpEventInputDir() {
if (!shouldKeepHistory) {
try {
jobFS.delete(eventsInputDirPath, true);
eventsInputDirPath = null;
} catch (IOException e) {
LOG.error("Unable to cleanup: {}", eventsInputDirPath, e);
}
}
}
}