blob: d355712630c3ccf0f344f84d56db1641f08d12a5 [file] [log] [blame]
package backtype.storm.utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.io.File;
import org.apache.commons.io.FileUtils;
public class VersionedStore {
private static final String FINISHED_VERSION_SUFFIX = ".version";
private String _root;
public VersionedStore(String path) throws IOException {
_root = path;
mkdirs(_root);
}
public String getRoot() {
return _root;
}
public String versionPath(long version) {
return new File(_root, "" + version).getAbsolutePath();
}
public String mostRecentVersionPath() throws IOException {
Long v = mostRecentVersion();
if(v==null) return null;
return versionPath(v);
}
public String mostRecentVersionPath(long maxVersion) throws IOException {
Long v = mostRecentVersion(maxVersion);
if(v==null) return null;
return versionPath(v);
}
public Long mostRecentVersion() throws IOException {
List<Long> all = getAllVersions();
if(all.size()==0) return null;
return all.get(0);
}
public Long mostRecentVersion(long maxVersion) throws IOException {
List<Long> all = getAllVersions();
for(Long v: all) {
if(v <= maxVersion) return v;
}
return null;
}
public String createVersion() throws IOException {
Long mostRecent = mostRecentVersion();
long version = Time.currentTimeMillis();
if(mostRecent!=null && version <= mostRecent) {
version = mostRecent + 1;
}
return createVersion(version);
}
public String createVersion(long version) throws IOException {
String ret = versionPath(version);
if(getAllVersions().contains(version))
throw new RuntimeException("Version already exists or data already exists");
else
return ret;
}
public void failVersion(String path) throws IOException {
deleteVersion(validateAndGetVersion(path));
}
public void deleteVersion(long version) throws IOException {
File versionFile = new File(versionPath(version));
File tokenFile = new File(tokenPath(version));
if(versionFile.exists()) {
FileUtils.forceDelete(versionFile);
}
if(tokenFile.exists()) {
FileUtils.forceDelete(tokenFile);
}
}
public void succeedVersion(String path) throws IOException {
long version = validateAndGetVersion(path);
// should rewrite this to do a file move
createNewFile(tokenPath(version));
}
public void cleanup() throws IOException {
cleanup(-1);
}
public void cleanup(int versionsToKeep) throws IOException {
List<Long> versions = getAllVersions();
if(versionsToKeep >= 0) {
versions = versions.subList(0, Math.min(versions.size(), versionsToKeep));
}
HashSet<Long> keepers = new HashSet<Long>(versions);
for(String p: listDir(_root)) {
Long v = parseVersion(p);
if(v!=null && !keepers.contains(v)) {
deleteVersion(v);
}
}
}
/**
* Sorted from most recent to oldest
*/
public List<Long> getAllVersions() throws IOException {
List<Long> ret = new ArrayList<Long>();
for(String s: listDir(_root)) {
if(s.endsWith(FINISHED_VERSION_SUFFIX)) {
ret.add(validateAndGetVersion(s));
}
}
Collections.sort(ret);
Collections.reverse(ret);
return ret;
}
private String tokenPath(long version) {
return new File(_root, "" + version + FINISHED_VERSION_SUFFIX).getAbsolutePath();
}
private long validateAndGetVersion(String path) {
Long v = parseVersion(path);
if(v==null) throw new RuntimeException(path + " is not a valid version");
return v;
}
private Long parseVersion(String path) {
String name = new File(path).getName();
if(name.endsWith(FINISHED_VERSION_SUFFIX)) {
name = name.substring(0, name.length()-FINISHED_VERSION_SUFFIX.length());
}
try {
return Long.parseLong(name);
} catch(NumberFormatException e) {
return null;
}
}
private void createNewFile(String path) throws IOException {
new File(path).createNewFile();
}
private void mkdirs(String path) throws IOException {
new File(path).mkdirs();
}
private List<String> listDir(String dir) throws IOException {
List<String> ret = new ArrayList<String>();
for(File f: new File(dir).listFiles()) {
ret.add(f.getAbsolutePath());
}
return ret;
}
}