blob: b5a43fe8524ca54253008e711d712b7a84e50dad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.common.util;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectStreamException;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Throwables;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.annotation.Stateless;
/**
* FSStorageAgent
*
* @since 0.3.2
*/
public class FSStorageAgent implements StorageAgent, Serializable
{
public static final String TMP_FILE = "_tmp";
protected static final String STATELESS_CHECKPOINT_WINDOW_ID = Long.toHexString(Stateless.WINDOW_ID);
public final String path;
protected final transient FileContext fileContext;
protected static final transient Kryo kryo;
static {
kryo = new Kryo();
}
protected FSStorageAgent()
{
path = null;
fileContext = null;
}
public FSStorageAgent(String path, Configuration conf)
{
this.path = path;
try {
logger.debug("Initialize storage agent with {}.", path);
Path lPath = new Path(path);
URI pathUri = lPath.toUri();
if (pathUri.getScheme() != null) {
fileContext = FileContext.getFileContext(pathUri, conf == null ? new Configuration() : conf);
} else {
fileContext = FileContext.getFileContext(conf == null ? new Configuration() : conf);
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@SuppressWarnings("ThrowFromFinallyBlock")
@Override
public void save(Object object, int operatorId, long windowId) throws IOException
{
String operatorIdStr = String.valueOf(operatorId);
Path lPath = new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + TMP_FILE);
String window = Long.toHexString(windowId);
boolean stateSaved = false;
FSDataOutputStream stream = null;
try {
stream = fileContext.create(lPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
Options.CreateOpts.CreateParent.createParent());
store(stream, object);
stateSaved = true;
} catch (Throwable t) {
logger.debug("while saving {} {}", operatorId, window, t);
stateSaved = false;
throw Throwables.propagate(t);
} finally {
try {
if (stream != null) {
stream.close();
}
} catch (IOException ie) {
stateSaved = false;
throw new RuntimeException(ie);
} finally {
if (stateSaved) {
logger.debug("Saving {}: {}", operatorId, window);
fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window),
Options.Rename.OVERWRITE);
}
}
}
}
@Override
public Object load(int operatorId, long windowId) throws IOException
{
Path lPath = new Path(path + Path.SEPARATOR + String.valueOf(operatorId) + Path.SEPARATOR + Long.toHexString(windowId));
logger.debug("Loading: {}", lPath);
FSDataInputStream stream = fileContext.open(lPath);
try {
return retrieve(stream);
} finally {
stream.close();
}
}
@Override
public void delete(int operatorId, long windowId) throws IOException
{
Path lPath = new Path(path + Path.SEPARATOR + String.valueOf(operatorId) + Path.SEPARATOR + Long.toHexString(windowId));
logger.debug("Deleting: {}", lPath);
fileContext.delete(lPath, false);
}
@Override
public long[] getWindowIds(int operatorId) throws IOException
{
Path lPath = new Path(path + Path.SEPARATOR + String.valueOf(operatorId));
try {
FileStatus status = fileContext.getFileStatus(lPath);
if (!status.isDirectory()) {
throw new IOException("Checkpoint location is not a directory ");
}
} catch (FileNotFoundException ex) {
// During initialization this directory may not exists.
// return an empty array.
return new long[0];
}
RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath);
if (!fileStatusRemoteIterator.hasNext()) {
throw new IOException("Storage Agent has not saved anything yet!");
}
List<Long> lwindows = new ArrayList<>();
do {
FileStatus fileStatus = fileStatusRemoteIterator.next();
String name = fileStatus.getPath().getName();
if (name.equals(TMP_FILE)) {
continue;
}
lwindows.add(STATELESS_CHECKPOINT_WINDOW_ID.equals(name) ? Stateless.WINDOW_ID : Long.parseLong(name, 16));
} while (fileStatusRemoteIterator.hasNext());
long[] windowIds = new long[lwindows.size()];
for (int i = 0; i < windowIds.length; i++) {
windowIds[i] = lwindows.get(i);
}
return windowIds;
}
public static void store(OutputStream stream, Object operator)
{
synchronized (kryo) {
Output output = new Output(4096, Integer.MAX_VALUE);
output.setOutputStream(stream);
kryo.writeClassAndObject(output, operator);
output.flush();
}
}
public static Object retrieve(InputStream stream)
{
synchronized (kryo) {
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
Input input = new Input(stream);
return kryo.readClassAndObject(input);
}
}
public Object readResolve() throws ObjectStreamException
{
return new FSStorageAgent(this.path, null);
}
private static final long serialVersionUID = 201404031201L;
private static final Logger logger = LoggerFactory.getLogger(FSStorageAgent.class);
}