blob: 576a3c0edcf1b326e77981f24dc828e5538d2d66 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.state;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.rumen.Anonymizer;
import org.apache.hadoop.tools.rumen.datatypes.DataType;
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.module.SimpleModule;
/**
* A pool of states. States used by {@link DataType}'s can be managed the
* {@link StatePool}. {@link StatePool} also supports persistence. Persistence
* is key to share states across multiple {@link Anonymizer} runs.
*/
@SuppressWarnings("unchecked")
public class StatePool {
private static final long VERSION = 1L;
private boolean isUpdated = false;
private boolean isInitialized = false;
private Configuration conf;
// persistence configuration
public static final String DIR_CONFIG = "rumen.anonymization.states.dir";
public static final String RELOAD_CONFIG =
"rumen.anonymization.states.reload";
public static final String PERSIST_CONFIG =
"rumen.anonymization.states.persist";
// internal state management configs
private static final String COMMIT_STATE_FILENAME = "latest";
private static final String CURRENT_STATE_FILENAME = "temp";
private String timeStamp;
private Path persistDirPath;
private boolean reload;
private boolean persist;
/**
* A wrapper class that binds the state implementation to its implementing
* class name.
*/
public static class StatePair {
private String className;
private State state;
public StatePair(State state) {
this.className = state.getClass().getName();
this.state = state;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public State getState() {
return state;
}
public void setState(State state) {
this.state = state;
}
}
/**
* Identifies to identify and cache {@link State}s.
*/
private HashMap<String, StatePair> pool = new HashMap<String, StatePair>();
public void addState(Class id, State state) {
if (pool.containsKey(id.getName())) {
throw new RuntimeException("State '" + state.getName() + "' added for the"
+ " class " + id.getName() + " already exists!");
}
isUpdated = true;
pool.put(id.getName(), new StatePair(state));
}
public State getState(Class clazz) {
return pool.containsKey(clazz.getName())
? pool.get(clazz.getName()).getState()
: null;
}
// For testing
@JsonIgnore
public boolean isUpdated() {
if (!isUpdated) {
for (StatePair statePair : pool.values()) {
// if one of the states have changed, then the pool is dirty
if (statePair.getState().isUpdated()) {
isUpdated = true;
return true;
}
}
}
return isUpdated;
}
/**
* Initialized the {@link StatePool}. This API also reloads the previously
* persisted state. Note that the {@link StatePool} should be initialized only
* once.
*/
public void initialize(Configuration conf) throws Exception {
if (isInitialized) {
throw new RuntimeException("StatePool is already initialized!");
}
this.conf = conf;
String persistDir = conf.get(DIR_CONFIG);
reload = conf.getBoolean(RELOAD_CONFIG, false);
persist = conf.getBoolean(PERSIST_CONFIG, false);
// reload if configured
if (reload || persist) {
System.out.println("State Manager initializing. State directory : "
+ persistDir);
System.out.println("Reload:" + reload + " Persist:" + persist);
if (persistDir == null) {
throw new RuntimeException("No state persist directory configured!"
+ " Disable persistence.");
} else {
this.persistDirPath = new Path(persistDir);
}
} else {
System.out.println("State Manager disabled.");
}
// reload
reload();
// now set the timestamp
DateFormat formatter =
new SimpleDateFormat("dd-MMM-yyyy-hh'H'-mm'M'-ss'S'");
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(System.currentTimeMillis());
timeStamp = formatter.format(calendar.getTime());
isInitialized = true;
}
private void reload() throws Exception {
if (reload) {
// Reload persisted entries
Path stateFilename = new Path(persistDirPath, COMMIT_STATE_FILENAME);
FileSystem fs = stateFilename.getFileSystem(conf);
if (fs.exists(stateFilename)) {
reloadState(stateFilename, conf);
} else {
throw new RuntimeException("No latest state persist directory found!"
+ " Disable persistence and run.");
}
}
}
private void reloadState(Path stateFile, Configuration conf)
throws Exception {
FileSystem fs = stateFile.getFileSystem(conf);
if (fs.exists(stateFile)) {
System.out.println("Reading state from " + stateFile.toString());
FSDataInputStream in = fs.open(stateFile);
read(in);
in.close();
} else {
System.out.println("No state information found for " + stateFile);
}
}
private void read(DataInput in) throws IOException {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(
DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
// define a module
SimpleModule module = new SimpleModule("State Serializer",
new Version(0, 1, 1, "FINAL"));
// add the state deserializer
module.addDeserializer(StatePair.class, new StateDeserializer());
// register the module with the object-mapper
mapper.registerModule(module);
JsonParser parser =
mapper.getJsonFactory().createJsonParser((DataInputStream)in);
StatePool statePool = mapper.readValue(parser, StatePool.class);
this.setStates(statePool.getStates());
parser.close();
}
/**
* Persists the current state to the state directory. The state will be
* persisted to the 'latest' file in the state directory.
*/
public void persist() throws IOException {
if (!persist) {
return;
}
if (isUpdated()) {
System.out.println("State is updated! Committing.");
Path currStateFile = new Path(persistDirPath, CURRENT_STATE_FILENAME);
Path commitStateFile = new Path(persistDirPath, COMMIT_STATE_FILENAME);
FileSystem fs = currStateFile.getFileSystem(conf);
System.out.println("Starting the persist phase. Persisting to "
+ currStateFile.toString());
// persist current state
// write the contents of the current state to the current(temp) directory
FSDataOutputStream out = fs.create(currStateFile, true);
write(out);
out.close();
System.out.println("Persist phase over. The best known un-committed state"
+ " is located at " + currStateFile.toString());
// commit (phase-1)
// copy the previous commit file to the relocation file
if (fs.exists(commitStateFile)) {
Path commitRelocationFile = new Path(persistDirPath, timeStamp);
System.out.println("Starting the pre-commit phase. Moving the previous "
+ "best known state to " + commitRelocationFile.toString());
// copy the commit file to the relocation file
FileUtil.copy(fs,commitStateFile, fs, commitRelocationFile, false,
conf);
}
// commit (phase-2)
System.out.println("Starting the commit phase. Committing the states in "
+ currStateFile.toString());
FileUtil.copy(fs, currStateFile, fs, commitStateFile, true, true, conf);
System.out.println("Commit phase successful! The best known committed "
+ "state is located at " + commitStateFile.toString());
} else {
System.out.println("State not updated! No commit required.");
}
}
private void write(DataOutput out) throws IOException {
// This is just a JSON experiment
System.out.println("Dumping the StatePool's in JSON format.");
ObjectMapper outMapper = new ObjectMapper();
outMapper.configure(
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
// define a module
SimpleModule module = new SimpleModule("State Serializer",
new Version(0, 1, 1, "FINAL"));
// add the state serializer
//module.addSerializer(State.class, new StateSerializer());
// register the module with the object-mapper
outMapper.registerModule(module);
JsonFactory outFactory = outMapper.getJsonFactory();
JsonGenerator jGen =
outFactory.createJsonGenerator((DataOutputStream)out, JsonEncoding.UTF8);
jGen.useDefaultPrettyPrinter();
jGen.writeObject(this);
jGen.close();
}
/**
* Getters and setters for JSON serialization
*/
/**
* To be invoked only by the Jackson JSON serializer.
*/
public long getVersion() {
return VERSION;
}
/**
* To be invoked only by the Jackson JSON deserializer.
*/
public void setVersion(long version) {
if (version != VERSION) {
throw new RuntimeException("Version mismatch! Expected " + VERSION
+ " got " + version);
}
}
/**
* To be invoked only by the Jackson JSON serializer.
*/
public HashMap<String, StatePair> getStates() {
return pool;
}
/**
* To be invoked only by the Jackson JSON deserializer.
*/
public void setStates(HashMap<String, StatePair> states) {
if (pool.size() > 0) {
throw new RuntimeException("Pool not empty!");
}
//TODO Should we do a clone?
this.pool = states;
}
}