blob: f0af8dc6abc8b0d816ad5aaffb0152b62ae66bfa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.util.EnumSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import com.datatorrent.stram.util.FSUtil;
/**
* <p>FSRecoveryHandler class.</p>
*
* @since 0.9.2
*/
public class FSRecoveryHandler implements StreamingContainerManager.RecoveryHandler
{
private static final Logger LOG = LoggerFactory.getLogger(FSRecoveryHandler.class);
private final Path basedir;
private final Path logPath;
private final Path logBackupPath;
private final FileSystem fs;
private final Path snapshotPath;
private final Path snapshotBackupPath;
private final Path heartbeatPath;
public static final String FILE_LOG = "log";
public static final String FILE_LOG_BACKUP = "log0";
public static final String FILE_SNAPSHOT = "snapshot";
public static final String FILE_SNAPSHOT_BACKUP = "snapshot0";
private static final String DIRECTORY_RECOVERY = "recovery";
private static final String FILE_HEARTBEATURI = "heartbeatUri";
public FSRecoveryHandler(String appDir, Configuration conf) throws IOException
{
this.basedir = new Path(appDir, DIRECTORY_RECOVERY);
fs = FileSystem.newInstance(this.basedir.toUri(), conf);
logPath = new Path(basedir, FILE_LOG);
logBackupPath = new Path(basedir, FILE_LOG_BACKUP);
snapshotPath = new Path(basedir, FILE_SNAPSHOT);
snapshotBackupPath = new Path(basedir, FILE_SNAPSHOT_BACKUP);
heartbeatPath = new Path(basedir, FILE_HEARTBEATURI);
}
public String getDir()
{
return basedir.toUri().toString();
}
@Override
public DataOutputStream rotateLog() throws IOException
{
if (fs.exists(logBackupPath)) {
// log backup is purged on snapshot/restore
throw new AssertionError("Snapshot state prior to log rotation: " + logBackupPath);
}
if (fs.exists(logPath)) {
LOG.debug("Creating log backup {}", logBackupPath);
if (!fs.rename(logPath, logBackupPath)) {
throw new IOException("Failed to rotate log: " + logPath);
}
}
LOG.info("Creating {}", logPath);
final FSDataOutputStream fsOutputStream;
String scheme = null;
try {
scheme = fs.getScheme();
} catch (UnsupportedOperationException e) {
LOG.warn("{} doesn't implement getScheme() method", fs.getClass().getName());
}
if ("file".equals(scheme)) {
// local FS does not support hflush and does not flush native stream
FSUtil.mkdirs(fs, logPath.getParent());
fsOutputStream = new FSDataOutputStream(new FileOutputStream(Path.getPathWithoutSchemeAndAuthority(logPath).toString()), null);
} else {
fsOutputStream = fs.create(logPath);
}
DataOutputStream osWrapper = new DataOutputStream(fsOutputStream)
{
@Override
public void flush() throws IOException
{
super.flush();
fsOutputStream.hflush();
}
@Override
public void close() throws IOException
{
LOG.debug("Closing {}", logPath);
super.close();
}
};
return osWrapper;
}
@Override
public DataInputStream getLog() throws IOException
{
if (fs.exists(logBackupPath)) {
// restore state prior to log replay
throw new AssertionError("Restore state prior to reading log: " + logBackupPath);
}
if (fs.exists(logPath)) {
LOG.debug("Opening existing log ({})", logPath);
return fs.open(logPath);
} else {
LOG.debug("No existing log ({})", logPath);
return new DataInputStream(new ByteArrayInputStream(new byte[] {}));
}
}
@Override
public void save(Object state) throws IOException
{
if (fs.exists(snapshotBackupPath)) {
throw new IllegalStateException("Found previous backup " + snapshotBackupPath);
}
if (fs.exists(snapshotPath)) {
LOG.debug("Backup {} to {}", snapshotPath, snapshotBackupPath);
fs.rename(snapshotPath, snapshotBackupPath);
}
LOG.debug("Writing checkpoint to {}", snapshotPath);
try (FSDataOutputStream fsOutputStream = fs.create(snapshotPath);
ObjectOutputStream oos = new ObjectOutputStream(fsOutputStream)) {
oos.writeObject(state);
}
// remove snapshot backup
if (fs.exists(snapshotBackupPath) && !fs.delete(snapshotBackupPath, false)) {
throw new IOException("Failed to remove " + snapshotBackupPath);
}
// remove log backup
Path logBackup = new Path(basedir + Path.SEPARATOR + FILE_LOG_BACKUP);
if (fs.exists(logBackup) && !fs.delete(logBackup, false)) {
throw new IOException("Failed to remove " + logBackup);
}
}
@Override
public Object restore() throws IOException
{
FileContext fc = FileContext.getFileContext(fs.getUri());
// recover from wherever it was left
if (fc.util().exists(snapshotBackupPath)) {
LOG.warn("Incomplete checkpoint, reverting to {}", snapshotBackupPath);
fc.rename(snapshotBackupPath, snapshotPath, Rename.OVERWRITE);
// combine logs (w/o append, create new file)
Path tmpLogPath = new Path(basedir, "log.combined");
try (FSDataOutputStream fsOut = fc.create(tmpLogPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE))) {
try (FSDataInputStream fsIn = fc.open(logBackupPath)) {
IOUtils.copy(fsIn, fsOut);
}
try (FSDataInputStream fsIn = fc.open(logPath)) {
IOUtils.copy(fsIn, fsOut);
}
}
fc.rename(tmpLogPath, logPath, Rename.OVERWRITE);
fc.delete(logBackupPath, false);
} else {
// we have log backup, but no checkpoint backup
// failure between log rotation and writing checkpoint
if (fc.util().exists(logBackupPath)) {
LOG.warn("Found {}, did checkpointing fail?", logBackupPath);
fc.rename(logBackupPath, logPath, Rename.OVERWRITE);
}
}
if (!fc.util().exists(snapshotPath)) {
LOG.debug("No existing checkpoint.");
return null;
}
LOG.debug("Reading checkpoint {}", snapshotPath);
InputStream is = fc.open(snapshotPath);
// indeterministic class loading behavior
// http://stackoverflow.com/questions/9110677/readresolve-not-working-an-instance-of-guavas-serializedform-appears
final ClassLoader loader = Thread.currentThread().getContextClassLoader();
try (ObjectInputStream ois = new ObjectInputStream(is)
{
@Override
protected Class<?> resolveClass(ObjectStreamClass objectStreamClass)
throws IOException, ClassNotFoundException
{
return Class.forName(objectStreamClass.getName(), true, loader);
}
}) {
return ois.readObject();
} catch (ClassNotFoundException cnfe) {
throw new IOException("Failed to read checkpointed state", cnfe);
}
}
public void writeConnectUri(String uri) throws IOException
{
try (DataOutputStream out = fs.create(heartbeatPath, true)) {
out.write(uri.getBytes());
}
LOG.debug("Connect address: {} written to {} ", uri, heartbeatPath);
}
public String readConnectUri() throws IOException
{
byte[] bytes;
DataInputStream in = fs.open(heartbeatPath);
try {
bytes = IOUtils.toByteArray(in);
} finally {
in.close();
}
String uri = new String(bytes);
LOG.debug("Connect address: {} from {} ", uri, heartbeatPath);
return uri;
}
@Override
@SuppressWarnings("FinalizeDeclaration")
protected void finalize() throws Throwable
{
try {
fs.close();
} finally {
super.finalize();
}
}
}