blob: 02cad69ad54bdb701e62106adf8cbdd5f97ea67e [file] [log] [blame]
/*
* Copyright © 2012-2014 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package co.cask.tephra.persist;
import co.cask.tephra.TxConstants;
import co.cask.tephra.metrics.MetricsCollector;
import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.snapshot.SnapshotCodecProvider;
import co.cask.tephra.util.ConfigurationFactory;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.CountingInputStream;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import javax.annotation.Nullable;
/**
* Handles persistence of transaction snapshot and logs to a directory in HDFS.
*
* The directory used for file storage is configured using the {@code data.tx.snapshot.dir} configuration property.
* Both snapshot and transaction log files are suffixed with a timestamp to allow easy ordering. Snapshot files
* are written with the filename "snapshot.<timestamp>". Transaction log files are written with the filename
* "txlog.<timestamp>".
*/
public class HDFSTransactionStateStorage extends AbstractTransactionStateStorage {
private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionStateStorage.class);
private static final String SNAPSHOT_FILE_PREFIX = "snapshot.";
private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.snapshot.";
private static final String LOG_FILE_PREFIX = "txlog.";
private static final PathFilter SNAPSHOT_FILE_FILTER = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith(SNAPSHOT_FILE_PREFIX);
}
};
// buffer size used for HDFS reads and writes
private static final int BUFFER_SIZE = 16384;
private final Configuration hConf;
private final String configuredSnapshotDir;
private final MetricsCollector metricsCollector;
private FileSystem fs;
private Path snapshotDir;
@Inject
public HDFSTransactionStateStorage(Configuration hConf, SnapshotCodecProvider codecProvider,
MetricsCollector metricsCollector) {
super(codecProvider);
this.hConf = hConf;
this.configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
this.metricsCollector = metricsCollector;
}
@Override
protected void startUp() throws Exception {
Preconditions.checkState(configuredSnapshotDir != null,
"Snapshot directory is not configured. Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_DIR +
" in configuration.");
String hdfsUser = hConf.get(TxConstants.Manager.CFG_TX_HDFS_USER);
if (hdfsUser == null || UserGroupInformation.isSecurityEnabled()) {
if (hdfsUser != null && LOG.isDebugEnabled()) {
LOG.debug("Ignoring configuration {}={}, running on secure Hadoop",
TxConstants.Manager.CFG_TX_HDFS_USER, hdfsUser);
}
// NOTE: we can start multiple times this storage. As hdfs uses per-jvm cache, we want to create new fs instead
// of getting closed one
fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf);
} else {
fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf, hdfsUser);
}
snapshotDir = new Path(configuredSnapshotDir);
LOG.info("Using snapshot dir " + snapshotDir);
if (!fs.exists(snapshotDir)) {
LOG.info("Creating snapshot dir at {}", snapshotDir);
fs.mkdirs(snapshotDir);
}
}
public TransactionLog getTransactionLog(Path path, long timestamp) throws IOException {
Path destination = new Path(snapshotDir, LOG_FILE_PREFIX + timestamp);
fs.copyFromLocalFile(path, destination);
return new HDFSTransactionLog(fs, hConf, destination, 1443792213636L, metricsCollector);
}
@Override
protected void shutDown() throws Exception {
fs.close();
}
@Override
public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
// create a temporary file, and save the snapshot
Path snapshotTmpFile = new Path(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile);
FSDataOutputStream out = fs.create(snapshotTmpFile, false, BUFFER_SIZE);
// encode the snapshot and stream the serialized version to the file
try {
codecProvider.encode(out, snapshot);
} finally {
out.close();
}
// move the temporary file into place with the correct filename
Path finalFile = new Path(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
fs.rename(snapshotTmpFile, finalFile);
LOG.debug("Completed snapshot to file {}", finalFile);
}
@Override
public TransactionSnapshot getLatestSnapshot() throws IOException {
InputStream in = getLatestSnapshotInputStream();
if (in == null) {
return null;
}
try {
return readSnapshotInputStream(in);
} finally {
in.close();
}
}
private InputStream getLatestSnapshotInputStream() throws IOException {
TimestampedFilename[] snapshots = listSnapshotFiles();
Arrays.sort(snapshots);
if (snapshots.length > 0) {
// last is the most recent
return fs.open(snapshots[snapshots.length - 1].getPath(), BUFFER_SIZE);
}
LOG.info("No snapshot files found in {}", snapshotDir);
return null;
}
private TransactionSnapshot readSnapshotInputStream(InputStream in) throws IOException {
CountingInputStream countingIn = new CountingInputStream(in);
TransactionSnapshot snapshot = codecProvider.decode(countingIn);
LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount());
return snapshot;
}
private TransactionSnapshot readSnapshotFile(Path filePath) throws IOException {
FSDataInputStream in = fs.open(filePath, BUFFER_SIZE);
try {
return readSnapshotInputStream(in);
} finally {
in.close();
}
}
private TimestampedFilename[] listSnapshotFiles() throws IOException {
FileStatus[] snapshotFileStatuses = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER);
TimestampedFilename[] snapshotFiles = new TimestampedFilename[snapshotFileStatuses.length];
for (int i = 0; i < snapshotFileStatuses.length; i++) {
snapshotFiles[i] = new TimestampedFilename(snapshotFileStatuses[i].getPath());
}
return snapshotFiles;
}
@Override
public long deleteOldSnapshots(int numberToKeep) throws IOException {
TimestampedFilename[] snapshots = listSnapshotFiles();
if (snapshots.length == 0) {
return -1;
}
Arrays.sort(snapshots, Collections.reverseOrder());
if (snapshots.length <= numberToKeep) {
// nothing to remove, oldest timestamp is the last snapshot
return snapshots[snapshots.length - 1].getTimestamp();
}
int toRemoveCount = snapshots.length - numberToKeep;
TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount];
System.arraycopy(snapshots, numberToKeep, toRemove, 0, toRemoveCount);
for (TimestampedFilename f : toRemove) {
LOG.debug("Removing old snapshot file {}", f.getPath());
fs.delete(f.getPath(), false);
}
long oldestTimestamp = snapshots[numberToKeep - 1].getTimestamp();
LOG.debug("Removed {} old snapshot files prior to {}", toRemoveCount, oldestTimestamp);
return oldestTimestamp;
}
@Override
public List<String> listSnapshots() throws IOException {
FileStatus[] files = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER);
return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() {
@Nullable
@Override
public String apply(@Nullable FileStatus input) {
return input.getPath().getName();
}
});
}
@Override
public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(timestamp, Long.MAX_VALUE));
TimestampedFilename[] timestampedFiles = new TimestampedFilename[statuses.length];
for (int i = 0; i < statuses.length; i++) {
timestampedFiles[i] = new TimestampedFilename(statuses[i].getPath());
}
return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() {
@Nullable
@Override
public TransactionLog apply(@Nullable TimestampedFilename input) {
return openLog(input.getPath(), input.getTimestamp());
}
});
}
@Override
public TransactionLog createLog(long timestamp) throws IOException {
Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timestamp);
return openLog(newLog, timestamp);
}
private TransactionLog openLog(Path path, long timestamp) {
return new HDFSTransactionLog(fs, hConf, path, timestamp, metricsCollector);
}
@Override
public void deleteLogsOlderThan(long timestamp) throws IOException {
FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(0, timestamp));
int removedCnt = 0;
for (FileStatus status : statuses) {
LOG.debug("Removing old transaction log {}", status.getPath());
if (fs.delete(status.getPath(), false)) {
removedCnt++;
} else {
LOG.error("Failed to delete transaction log file {}", status.getPath());
}
}
LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp);
}
@Override
public List<String> listLogs() throws IOException {
FileStatus[] files = fs.listStatus(snapshotDir, new LogFileFilter(0, Long.MAX_VALUE));
return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() {
@Nullable
@Override
public String apply(@Nullable FileStatus input) {
return input.getPath().getName();
}
});
}
@Override
public String getLocation() {
return snapshotDir.toString();
}
private static class LogFileFilter implements PathFilter {
// starting time of files to include (inclusive)
private final long startTime;
// ending time of files to include (exclusive)
private final long endTime;
public LogFileFilter(long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
}
@Override
public boolean accept(Path path) {
if (path.getName().startsWith(LOG_FILE_PREFIX)) {
String[] parts = path.getName().split("\\.");
if (parts.length == 2) {
try {
long fileTime = Long.parseLong(parts[1]);
return fileTime >= startTime && fileTime < endTime;
} catch (NumberFormatException ignored) {
LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", path.getName());
}
}
}
return false;
}
}
/**
* Represents a filename composed of a prefix and a ".timestamp" suffix. This is useful for manipulating both
* snapshot and transaction log filenames.
*/
private static class TimestampedFilename implements Comparable<TimestampedFilename> {
private Path path;
private String prefix;
private long timestamp;
public TimestampedFilename(Path path) {
this.path = path;
String[] parts = path.getName().split("\\.");
if (parts.length != 2) {
throw new IllegalArgumentException("Filename " + path.getName() +
" did not match the expected pattern prefix.timestamp");
}
prefix = parts[0];
timestamp = Long.parseLong(parts[1]);
}
public Path getPath() {
return path;
}
public String getPrefix() {
return prefix;
}
public long getTimestamp() {
return timestamp;
}
@Override
public int compareTo(TimestampedFilename other) {
int res = prefix.compareTo(other.getPrefix());
if (res == 0) {
res = Longs.compare(timestamp, other.getTimestamp());
}
return res;
}
}
// TODO move this out as a separate command line tool
private enum CLIMode { SNAPSHOT, TXLOG };
/**
* Reads a transaction state snapshot or transaction log from HDFS and prints the entries to stdout.
*
* Supports the following options:
* -s read snapshot state (defaults to the latest)
* -l read a transaction log
* [filename] reads the given file
* @param args
*/
public static void main(String[] args) {
List<String> filenames = Lists.newArrayList();
CLIMode mode = null;
for (String arg : args) {
if ("-s".equals(arg)) {
mode = CLIMode.SNAPSHOT;
} else if ("-l".equals(arg)) {
mode = CLIMode.TXLOG;
} else if ("-h".equals(arg)) {
printUsage(null);
} else {
filenames.add(arg);
}
}
if (mode == null) {
printUsage("ERROR: Either -s or -l is required to set mode.", 1);
}
Configuration config = new ConfigurationFactory().get();
// Use the no-op metrics collector. We are being run as a command line tool, so there are no relevant metrics
// to report
HDFSTransactionStateStorage storage =
new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector());
storage.startAndWait();
try {
switch (mode) {
case SNAPSHOT:
try {
if (filenames.isEmpty()) {
TransactionSnapshot snapshot = storage.getLatestSnapshot();
printSnapshot(snapshot);
}
for (String file : filenames) {
Path path = new Path(file);
TransactionSnapshot snapshot = storage.readSnapshotFile(path);
printSnapshot(snapshot);
System.out.println();
}
} catch (IOException ioe) {
System.err.println("Error reading snapshot files: " + ioe.getMessage());
ioe.printStackTrace();
System.exit(1);
}
break;
case TXLOG:
if (filenames.isEmpty()) {
printUsage("ERROR: At least one transaction log filename is required!", 1);
}
for (String file : filenames) {
TimestampedFilename timestampedFilename = new TimestampedFilename(new Path(file));
TransactionLog log = storage.openLog(timestampedFilename.getPath(), timestampedFilename.getTimestamp());
printLog(log);
System.out.println();
}
break;
}
} finally {
storage.stop();
}
}
private static void printUsage(String message) {
printUsage(message, 0);
}
private static void printUsage(String message, int exitCode) {
if (message != null) {
System.out.println(message);
}
System.out.println("Usage: java " + HDFSTransactionStateStorage.class.getName() + " (-s|-l) file1 [file2...]");
System.out.println();
System.out.println("\t-s\tRead files as transaction state snapshots (will default to latest if no file given)");
System.out.println("\t-l\tRead files as transaction logs [filename is required]");
System.out.println("\t-h\tPrint this message");
System.exit(exitCode);
}
private static void printSnapshot(TransactionSnapshot snapshot) {
Date snapshotDate = new Date(snapshot.getTimestamp());
System.out.println("TransactionSnapshot at " + snapshotDate.toString());
System.out.println("\t" + snapshot.toString());
}
private static void printLog(TransactionLog log) {
try {
System.out.println("TransactionLog " + log.getName());
TransactionLogReader reader = log.getReader();
TransactionEdit edit;
long seq = 0;
while ((edit = reader.next()) != null) {
System.out.println(String.format(" %d: %s", seq++, edit.toString()));
}
} catch (IOException ioe) {
System.err.println("ERROR reading log " + log.getName() + ": " + ioe.getMessage());
ioe.printStackTrace();
}
}
}