Merge branch 'S4-85' into piper
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index 7efadcc..1f5d7e4 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -188,4 +188,16 @@
         }
 
     }
+
+    @Override
+    public String toString() {
+        Map<String, String> attributesAsMap = getAttributesAsMap();
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        for (Map.Entry<String, String> entry : attributesAsMap.entrySet()) {
+            sb.append("{" + entry.getKey() + ";" + entry.getValue() + "},");
+        }
+        sb.append("]");
+        return sb.toString();
+    }
 }
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
index f75a867..8d3e062 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
@@ -18,7 +18,12 @@
     }
 
     public Class<?> loadGeneratedClass(String name, byte[] bytes) {
-        return defineClass(name, bytes, 0, bytes.length);
+        Class<?> clazz = findLoadedClass(name);
+        if (clazz == null) {
+            return defineClass(name, bytes, 0, bytes.length);
+        } else {
+            return clazz;
+        }
     }
 
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
index 46f0a8e..2a9dc5c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
@@ -46,6 +46,7 @@
             // System.out.println("QueueingListener: About to take message from queue");
             return queue.take();
         } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
             return null;
         }
     }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 4472910..d2aa64f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -132,6 +132,7 @@
         } catch (InterruptedException ie) {
             logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
                     clusterNode.getPort()));
+            Thread.currentThread().interrupt();
         }
         return false;
     }
@@ -182,7 +183,7 @@
             bootstrap.releaseExternalResources();
         } catch (InterruptedException ie) {
             logger.error("Interrupted while closing");
-            ie.printStackTrace();
+            Thread.currentThread().interrupt();
         }
     }
 
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index 3e905e6..d7d49bd 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -80,6 +80,7 @@
             byte[] msg = handoffQueue.take();
             return msg;
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             return null;
         }
     }
@@ -93,7 +94,7 @@
         try {
             channels.close().await();
         } catch (InterruptedException e) {
-            e.printStackTrace();
+            Thread.currentThread().interrupt();
         }
         bootstrap.releaseExternalResources();
     }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index 316999c..43bb470 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -74,6 +74,7 @@
         try {
             return handoffQueue.take();
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             return null;
         }
     }
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index 2791da6..63e32b5 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -5,9 +5,7 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.PrintWriter;
 import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -41,11 +39,9 @@
 
     public static final int ZK_PORT = 2181;
     public static final String ZK_STRING = "localhost:" + ZK_PORT;
-    public static final int INITIAL_BOOKIE_PORT = 5000;
     public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
     public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
             + "storage");
-    public static ServerSocket serverSocket;
     static {
         logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
     }
@@ -233,6 +229,7 @@
             try {
                 Thread.sleep(250);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 // ignore
             }
         }
@@ -281,6 +278,7 @@
             try {
                 Thread.sleep(250);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 // ignore
             }
         }
@@ -295,12 +293,6 @@
 
     }
 
-    public static void stopSocketAdapter() throws IOException {
-        if (serverSocket != null) {
-            serverSocket.close();
-        }
-    }
-
     /**
      * gradle and eclipse have different directories for output files This is justified here
      * http://gradle.1045684.n5.nabble.com/Changing-default-IDE-output-directories-td3335478.html#a3337433
@@ -330,21 +322,4 @@
 
     }
 
-    public static void injectIntoStringSocketAdapter(String string) throws IOException {
-        Socket socket = null;
-        PrintWriter writer = null;
-        try {
-            socket = new Socket("localhost", 12000);
-            writer = new PrintWriter(socket.getOutputStream(), true);
-            writer.println(string);
-        } catch (IOException e) {
-            e.printStackTrace();
-            System.exit(-1);
-        } finally {
-            if (socket != null) {
-                socket.close();
-            }
-        }
-    }
-
 }
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index ad3bbbe..c3a3f52 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -20,6 +20,7 @@
     compile project(":s4-base")

     compile project(":s4-comm")

     compile project(path: ':s4-comm', configuration: 'tests')

+    compile libraries.commons_codec
     compile libraries.jcommander

     testCompile project(path: ':s4-comm', configuration: 'tests')

     testCompile libraries.gradle_base_services

diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 0beec16..bc20e98 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,7 +26,7 @@
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.App.ClockType;
+import org.apache.s4.core.ft.CheckpointingFramework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,8 +34,10 @@
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
-/*
- * Container base class to hold all processing elements. We will implement administrative methods here.
+/**
+ * Container base class to hold all processing elements.
+ *
+ * It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
  */
 public abstract class App {
 
@@ -73,6 +75,10 @@
     @Named("cluster.name")
     String clusterName;
 
+    // default is NoOpCheckpointingFramework
+    @Inject
+    CheckpointingFramework checkpointingFramework;
+
     // serialization uses the application class loader
     private SerializerDeserializer serDeser = new KryoSerDeser(getClass().getClassLoader());
 
@@ -111,7 +117,7 @@
     }
 
     /* Should only be used within the core package. */
-    public void addStream(Streamable stream) {
+    public void addStream(Streamable<Event> stream) {
         streams.add(stream);
     }
 
@@ -262,6 +268,10 @@
         return serDeser;
     }
 
+    public CheckpointingFramework getCheckpointingFramework() {
+        return checkpointingFramework;
+    }
+
     /**
      * Creates a stream with a specific key finder. The event is delivered to the PE instances in the target PE
      * prototypes by key.
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index b740a21..d441d59 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -11,6 +11,8 @@
 import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.ft.NoOpCheckpointingFramework;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.slf4j.Logger;
@@ -59,6 +61,10 @@
         bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
 
         bind(S4RLoaderFactory.class);
+
+        // For enabling checkpointing, one needs to use a custom module, such as
+        // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
+        bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
     }
 
     private void loadProperties(Binder binder) {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 5f1375b..ea7cfd6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -136,6 +136,7 @@
             }
         } catch (Exception e) {
             logger.error("Cannot start S4 node", e);
+            System.exit(1);
         }
     }
 
@@ -160,10 +161,10 @@
         @Parameter(names = "-appClass", description = "App class to load. This will disable dynamic downloading but allows to start apps directly. These app classes must have been loaded first, usually through a custom module.", required = false, hidden = true)
         String appClass = null;
 
-        @Parameter(names = "-extraModulesClasses", description = "additional configuration modules (they will be instantiated through their constructor without arguments).", variableArity = true, required = false, hidden = true)
+        @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false, hidden = false)
         List<String> extraModulesClasses = new ArrayList<String>();
 
-        @Parameter(names = { "-namedStringParameters", "-p" }, description = "Inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-namedStringParameters={name1=value1},{name2=value2} '", hidden = false, converter = InlineConfigParameterConverter.class)
+        @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
         List<String> extraNamedParameters = new ArrayList<String>();
 
         @Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
@@ -175,7 +176,7 @@
 
         @Override
         public String convert(String arg) {
-            Pattern parameterPattern = Pattern.compile("\\{(\\S+=\\S+)\\}");
+            Pattern parameterPattern = Pattern.compile("(\\S+=\\S+)");
             logger.info("processing inline configuration parameter {}", arg);
             Matcher parameterMatcher = parameterPattern.matcher(arg);
             if (!parameterMatcher.find()) {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 4dced75..730ff56 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -1,20 +1,7 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
 package org.apache.s4.core;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Timer;
@@ -25,6 +12,10 @@
 import net.jcip.annotations.ThreadSafe;
 
 import org.apache.s4.base.Event;
+import org.apache.s4.core.ft.CheckpointId;
+import org.apache.s4.core.ft.CheckpointingConfig;
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+import org.apache.s4.core.ft.CheckpointingTask;
 import org.apache.s4.core.gen.OverloadDispatcher;
 import org.apache.s4.core.gen.OverloadDispatcherGenerator;
 import org.slf4j.Logger;
@@ -34,6 +25,7 @@
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
@@ -94,34 +86,41 @@
  */
 public abstract class ProcessingElement implements Cloneable {
 
-    private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
-    private static final String SINGLETON = "singleton";
+    transient private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
+    transient private static final String SINGLETON = "singleton";
 
-    protected App app;
+    transient protected App app;
 
     /*
      * This maps holds all the instances. We make it package private to prevent concrete classes from updating the
      * collection.
      */
-    Cache<String, ProcessingElement> peInstances;
+    transient Cache<String, ProcessingElement> peInstances;
 
     /* This map is initialized in the prototype and cloned to instances. */
-    Map<Class<? extends Event>, Trigger> triggers;
+    transient Map<Class<? extends Event>, Trigger> triggers;
 
     /* PE instance id. */
     String id = "";
 
     /* Private fields. */
-    private ProcessingElement pePrototype;
-    private boolean haveTriggers = false;
-    private long timerIntervalInMilliseconds = 0;
-    private Timer timer;
-    private boolean isPrototype = true;
-    private boolean isThreadSafe = false;
-    private String name = null;
-    private boolean isSingleton = false;
+    transient private ProcessingElement pePrototype;
+    transient private boolean haveTriggers = false;
+    transient private long timerIntervalInMilliseconds = 0;
+    transient private Timer triggerTimer;
+    transient private Timer checkpointingTimer;
+    transient private boolean isPrototype = true;
+    transient private boolean isThreadSafe = false;
+    transient private String name = null;
+    transient private boolean isSingleton = false;
+    transient long eventCount = 0;
 
-    private transient OverloadDispatcher overloadDispatcher;
+    transient private OverloadDispatcher overloadDispatcher;
+    transient private boolean recoveryAttempted = false;
+    transient private boolean dirty = false;
+
+    transient private CheckpointingConfig checkpointingConfig = new CheckpointingConfig.Builder(CheckpointingMode.NONE)
+            .build();
 
     protected ProcessingElement() {
         OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
@@ -211,7 +210,7 @@
         return peInstances.size();
     }
 
-    Map<String, ProcessingElement> getPEInstances() {
+    public Map<String, ProcessingElement> getPEInstances() {
         return peInstances.asMap();
     }
 
@@ -314,8 +313,9 @@
         /* Skip trigger checking overhead if there are no triggers. */
         haveTriggers = true;
 
-        if (timeUnit != null && timeUnit != TimeUnit.MILLISECONDS)
+        if (timeUnit != null && timeUnit != TimeUnit.MILLISECONDS) {
             interval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
+        }
 
         Trigger config = new Trigger(numEvents, interval);
 
@@ -374,14 +374,14 @@
 
         Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
 
-        if (timer != null) {
-            timer.cancel();
+        if (triggerTimer != null) {
+            triggerTimer.cancel();
         }
 
         if (interval == 0)
             return this;
 
-        timer = new Timer();
+        triggerTimer = new Timer();
         return this;
     }
 
@@ -406,8 +406,11 @@
         } else {
             object = this;
         }
-
         synchronized (object) {
+            if (!recoveryAttempted) {
+                recover();
+                recoveryAttempted = true;
+            }
 
             /* Dispatch onEvent() method. */
             overloadDispatcher.dispatchEvent(this, event);
@@ -416,9 +419,26 @@
             if (haveTriggers && isTrigger(event)) {
                 overloadDispatcher.dispatchTrigger(this, event);
             }
+
+            eventCount++;
+
+            dirty = true;
+
+            if (isCheckpointable()) {
+                checkpoint();
+            }
         }
     }
 
+    protected boolean isCheckpointable() {
+        return getApp().checkpointingFramework.isCheckpointable(this);
+    }
+
+    public void checkpoint() {
+        getApp().getCheckpointingFramework().saveState(this);
+        clearDirty();
+    }
+
     private boolean isTrigger(Event event) {
         return isTrigger(event, event.getClass());
     }
@@ -466,8 +486,8 @@
     protected void removeAll() {
 
         /* Close resources in prototype. */
-        if (timer != null) {
-            timer.cancel();
+        if (triggerTimer != null) {
+            triggerTimer.cancel();
             logger.info("Timer stopped.");
         }
 
@@ -503,12 +523,22 @@
         }
 
         /* Start timer. */
-        if (timer != null) {
-            timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
-            logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
+        if (triggerTimer != null) {
+            triggerTimer
+                    .scheduleAtFixedRate(new OnTimeTask(), timerIntervalInMilliseconds, timerIntervalInMilliseconds);
+            logger.debug("Started trigger timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
                     this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
         }
 
+        if (checkpointingConfig.mode == CheckpointingMode.TIME) {
+            checkpointingTimer = new Timer();
+            checkpointingTimer.scheduleAtFixedRate(new CheckpointingTask(this),
+                    checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency),
+                    checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency));
+            logger.debug("Started checkpointing timer for PE prototype [{}], ID [{}] with interval [{}].",
+                    new String[] { this.getClass().getName(), id, String.valueOf(checkpointingConfig.frequency) });
+        }
+
         /* Check if this PE is annotated as thread safe. */
         if (getClass().isAnnotationPresent(ThreadSafe.class) == true) {
 
@@ -535,7 +565,7 @@
             }
             return peInstances.get(id);
         } catch (ExecutionException e) {
-            logger.error("Problem when trying to create a PE instance.", e);
+            logger.error("Problem when trying to create a PE instance for id {}", id, e);
         }
         return null;
     }
@@ -544,8 +574,16 @@
      * Get all the local instances. See notes in {@link #getInstanceForKey(String) getLocalInstanceForKey}
      */
     public Collection<ProcessingElement> getInstances() {
-
-        return peInstances.asMap().values();
+        try {
+            if (isSingleton) {
+                return ImmutableList.of(peInstances.get(SINGLETON));
+            } else {
+                return peInstances.asMap().values();
+            }
+        } catch (ExecutionException e) {
+            logger.error("Problem when trying to create a PE instance for id {}", id, e);
+            return null;
+        }
     }
 
     /**
@@ -672,6 +710,95 @@
         app.peByName.put(name, this);
     }
 
+    public CheckpointingConfig getCheckpointingConfig() {
+        return checkpointingConfig;
+    }
+
+    public void setCheckpointingConfig(CheckpointingConfig checkpointingConfig) {
+        this.checkpointingConfig = checkpointingConfig;
+    }
+
+    /**
+     * By default, the state of a PE instance is considered dirty whenever it processed an event. Some event may
+     * actually leave the state of the PE unchanged. PE implementations can therefore override this method to
+     * accommodate specific behaviors, by managing a custom "dirty" flag.
+     * 
+     * <b>If this method is overriden, {@link #clearDirty()} method must also be overriden in order to correctly reflect
+     * the "dirty" state of the PE.</b>
+     */
+    public boolean isDirty() {
+        return dirty;
+    }
+
+    /**
+     * Dirty state is cleared after the PE has been serialized. PE implementations that maintain their "dirty" flag must
+     * override this method by clearing their internally managed "dirty" flag.
+     * 
+     * <b>If this method is overriden, {@link #isDirty()} must also be overriden in order to correctly reflect the
+     * "dirty" state of the PE.</b>
+     */
+    public void clearDirty() {
+        this.dirty = false;
+    }
+
+    public byte[] serializeState() {
+        return getApp().getSerDeser().serialize(this);
+    }
+
+    public ProcessingElement deserializeState(byte[] loadedState) {
+        return (ProcessingElement) getApp().getSerDeser().deserialize(loadedState);
+    }
+
+    public void restoreState(ProcessingElement oldState) {
+        restoreFieldsForClass(oldState.getClass(), oldState);
+    }
+
+    protected void recover() {
+        byte[] serializedState = null;
+        try {
+            serializedState = getApp().getCheckpointingFramework().fetchSerializedState(new CheckpointId(this));
+        } catch (RuntimeException e) {
+            logger.error("Cannot fetch serialized stated for [{}/{}]: {}", new String[] {
+                    getPrototype().getClass().getName(), getId(), e.getMessage() });
+        }
+        if (serializedState == null) {
+            return;
+        }
+        try {
+            ProcessingElement peInOldState = deserializeState(serializedState);
+            restoreState(peInOldState);
+        } catch (RuntimeException e) {
+            logger.error("Cannot restore state for key [" + new CheckpointId(this) + "]: " + e.getMessage(), e);
+        }
+    }
+
+    private void restoreFieldsForClass(Class<?> currentInOldStateClassHierarchy, ProcessingElement oldState) {
+        if (!ProcessingElement.class.isAssignableFrom(currentInOldStateClassHierarchy)) {
+            return;
+        } else {
+            Field[] fields = oldState.getClass().getDeclaredFields();
+            for (Field field : fields) {
+                if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) {
+                    if (!Modifier.isPublic(field.getModifiers())) {
+                        field.setAccessible(true);
+                    }
+                    try {
+                        // TODO use reflectasm
+                        field.set(this, field.get(oldState));
+                    } catch (IllegalArgumentException e) {
+                        logger.error("Cannot recover old state for this PE [{}]", e);
+                        return;
+                    } catch (IllegalAccessException e) {
+                        logger.error("Cannot recover old state for this PE [{}]", e);
+                        return;
+                    }
+
+                }
+            }
+            restoreFieldsForClass(currentInOldStateClassHierarchy.getSuperclass(), oldState);
+        }
+    }
+
     class Trigger {
         final long intervalInMilliseconds;
         final int intervalInEvents;
@@ -709,4 +836,21 @@
             return active;
         }
     }
+
+    public long getEventCount() {
+        return eventCount;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder(getClass().getName() + "/" + getId() + " ;");
+        if (isSingleton) {
+            sb.append("singleton ;");
+        }
+        sb.append(isThreadSafe ? "IS thread-safe ;" : "Not thread-safe ;");
+        sb.append("timerInterval=" + timerIntervalInMilliseconds + " ;");
+        return sb.toString();
+
+    }
+
 }
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index a7522bd..85d80f7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -187,9 +187,8 @@
                         .serialize(event)));
             }
         } catch (InterruptedException e) {
-            e.printStackTrace();
             logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
-            System.exit(-1);
+            Thread.currentThread().interrupt();
         }
     }
 
@@ -201,9 +200,8 @@
         try {
             queue.put(event);
         } catch (InterruptedException e) {
-            e.printStackTrace();
             logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
-            System.exit(-1);
+            Thread.currentThread().interrupt();
         }
     }
 
@@ -305,6 +303,7 @@
             } catch (InterruptedException e) {
                 logger.info("Closing stream {}.", name);
                 receiver.removeStream(this);
+                Thread.currentThread().interrupt();
                 return;
             }
         }
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java
new file mode 100644
index 0000000..290d46a
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * <p>
+ * Identifier of PEs. It is used to identify checkpointed PEs in the storage backend.
+ * </p>
+ * <p>
+ * The storage backend is responsible for converting this identifier to whatever internal representation is most
+ * suitable for it.
+ * </p>
+ * <p>
+ * This class provides methods for getting a compact String representation of the identifier and for creating an
+ * identifier from a String representation.
+ * </p>
+ *
+ */
+public class CheckpointId {
+
+    private String prototypeId;
+    private String key;
+
+    private static final Pattern STRING_REPRESENTATION_PATTERN = Pattern.compile("\\[(\\S*)\\];\\[(\\S*)\\]");
+
+    public CheckpointId() {
+    }
+
+    public CheckpointId(ProcessingElement pe) {
+        this.prototypeId = pe.getPrototype().getClass().getName();
+        this.key = pe.getId();
+    }
+
+    /**
+     *
+     * @param prototypeID
+     *            id of the PE as returned by {@link ProcessingElement#getId() getId()} method
+     * @param key
+     *            keyed attribute(s)
+     */
+    public CheckpointId(String prototypeID, String key) {
+        super();
+        this.prototypeId = prototypeID;
+        this.key = key;
+    }
+
+    public CheckpointId(String keyAsString) {
+        Matcher matcher = STRING_REPRESENTATION_PATTERN.matcher(keyAsString);
+
+        try {
+            matcher.find();
+            prototypeId = "".equals(matcher.group(1)) ? null : matcher.group(1);
+            key = "".equals(matcher.group(2)) ? null : matcher.group(2);
+        } catch (IndexOutOfBoundsException e) {
+
+        }
+
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public String getPrototypeId() {
+        return prototypeId;
+    }
+
+    public String toString() {
+        return "[PROTO_ID];[KEY] --> " + getStringRepresentation();
+    }
+
+    public String getStringRepresentation() {
+        return "[" + (prototypeId == null ? "" : prototypeId) + "];[" + (key == null ? "" : key) + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        return getStringRepresentation().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if ((obj == null) || (getClass() != obj.getClass())) {
+            return false;
+        }
+
+        CheckpointId other = (CheckpointId) obj;
+        if (key == null) {
+            if (other.key != null)
+                return false;
+        } else if (!key.equals(other.key))
+            return false;
+        if (prototypeId == null) {
+            if (other.prototypeId != null)
+                return false;
+        } else if (!prototypeId.equals(other.prototypeId))
+            return false;
+        return true;
+    }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java
new file mode 100644
index 0000000..eec765d
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java
@@ -0,0 +1,55 @@
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Checkpointing configuration: event count based vs time interval, frequency. User the {@link Builder} class to build
+ * instances.
+ *
+ */
+public class CheckpointingConfig {
+
+    /**
+     * Identifies the kind of checkpointing: time based, event count, or no checkpointing
+     *
+     */
+    public static enum CheckpointingMode {
+        TIME, EVENT_COUNT, NONE
+    }
+
+    public final CheckpointingMode mode;
+    public final int frequency;
+    public final TimeUnit timeUnit;
+
+    private CheckpointingConfig(CheckpointingMode mode, int frequency, TimeUnit timeUnit) {
+        this.mode = mode;
+        this.frequency = frequency;
+        this.timeUnit = timeUnit;
+    }
+
+    public static class Builder {
+        private CheckpointingMode mode;
+        private int frequency;
+        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
+
+        public Builder(CheckpointingMode mode) {
+            this.mode = mode;
+        }
+
+        public Builder frequency(int frequency) {
+            this.frequency = frequency;
+            return this;
+        }
+
+        public Builder timeUnit(TimeUnit timeUnit) {
+            this.timeUnit = timeUnit;
+            return this;
+        }
+
+        public CheckpointingConfig build() {
+            return new CheckpointingConfig(mode, frequency, timeUnit);
+        }
+
+    }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java
new file mode 100644
index 0000000..48b4616
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java
@@ -0,0 +1,52 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ProcessingElement;
+
+import com.google.inject.ImplementedBy;
+
+/**
+ *
+ * This interface defines the functionalities offered by the checkpointing framework.
+ *
+ */
+@ImplementedBy(value = NoOpCheckpointingFramework.class)
+public interface CheckpointingFramework {
+
+    /**
+     * Serializes and stores state to the storage backend. Serialization and storage operations are asynchronous.
+     *
+     * @return a callback for getting notified of the result of the storage operation
+     */
+    StorageCallback saveState(ProcessingElement pe);
+
+    /**
+     * Fetches checkpoint data from storage for a given PE
+     *
+     * @param key
+     *            safeKeeperId
+     * @return checkpoint data
+     */
+    byte[] fetchSerializedState(CheckpointId key);
+
+    /**
+     * Evaluates whether specified PE should be checkpointed, based on:
+     * <ul>
+     * <li>whether checkpointing enabled</li>
+     * <li>whether the pe is "dirty"</li>
+     * <li>the checkpointing frequency settings</li>
+     * </ul>
+     *
+     * This is used for count-based checkpointing intervals. Time-based checkpointing relies on the dirty flag when
+     * triggered.
+     *
+     * @param pe
+     *            processing element to evaluate
+     * @return true if checkpointable, according to the above requirements
+     */
+    boolean isCheckpointable(ProcessingElement pe);
+
+    public enum StorageResultCode {
+        SUCCESS, FAILURE
+    }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java
new file mode 100644
index 0000000..dd6003c
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java
@@ -0,0 +1,33 @@
+package org.apache.s4.core.ft;
+
+import java.util.Map;
+import java.util.TimerTask;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * When checkpointing at regular time intervals, this class is used to actually perform the checkpoints. It iterates
+ * among all instances of the specified prototype, and checkpoints every eligible instance.
+ *
+ */
+public class CheckpointingTask extends TimerTask {
+
+    ProcessingElement prototype;
+
+    public CheckpointingTask(ProcessingElement prototype) {
+        super();
+        this.prototype = prototype;
+    }
+
+    @Override
+    public void run() {
+        Map<String, ProcessingElement> peInstances = prototype.getPEInstances();
+        for (Map.Entry<String, ProcessingElement> entry : peInstances.entrySet()) {
+            synchronized (entry.getValue()) {
+                if (entry.getValue().isDirty()) {
+                    entry.getValue().checkpoint();
+                }
+            }
+        }
+    }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
new file mode 100644
index 0000000..3a95da6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * <p>
+ * Implementation of a file system backend storage to persist checkpoints.
+ * </p>
+ * <p>
+ * The file system may be the default local file system when running on a single machine, but should be a distributed
+ * file system such as NFS when running on a cluster.
+ * </p>
+ * <p>
+ * Checkpoints are stored in individual files (1 file = 1 safeKeeperId) in directories according to the following
+ * structure: <code>(storageRootpath)/prototypeId/safeKeeperId</code>
+ * </p>
+ *
+ */
+public class DefaultFileSystemStateStorage implements StateStorage {
+
+    private static Logger logger = LoggerFactory.getLogger(DefaultFileSystemStateStorage.class);
+    @Inject(optional = true)
+    @Named("s4.checkpointing.filesystem.storageRootPath")
+    String storageRootPath;
+
+    public DefaultFileSystemStateStorage() {
+    }
+
+    /**
+     * <p>
+     * Called by the dependency injection framework, after construction.
+     * <p/>
+     */
+    @Inject
+    public void init() {
+        checkStorageDir();
+    }
+
+    @Override
+    public byte[] fetchState(CheckpointId key) {
+        File file = safeKeeperID2File(key, storageRootPath);
+        if (file != null && file.exists()) {
+            logger.debug("Fetching " + file.getAbsolutePath() + "for : " + key);
+
+            try {
+                return Files.toByteArray(file);
+            } catch (IOException e) {
+                logger.error("Cannot read content from checkpoint file [" + file.getAbsolutePath() + "]", e);
+                return null;
+            }
+        } else {
+            return null;
+        }
+
+    }
+
+    @Override
+    public Set<CheckpointId> fetchStoredKeys() {
+        Set<CheckpointId> keys = new HashSet<CheckpointId>();
+        File rootDir = new File(storageRootPath);
+        File[] dirs = rootDir.listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File file) {
+                return file.isDirectory();
+            }
+        });
+        for (File dir : dirs) {
+            File[] files = dir.listFiles(new FileFilter() {
+                @Override
+                public boolean accept(File file) {
+                    return (file.isFile());
+                }
+            });
+            for (File file : files) {
+                keys.add(file2SafeKeeperID(file));
+            }
+        }
+        return keys;
+    }
+
+    // files kept as : root/<prototypeId>/encodedKeyWithFullInfo
+    private static File safeKeeperID2File(CheckpointId key, String storageRootPath) {
+
+        return new File(storageRootPath + File.separator + key.getPrototypeId() + File.separator
+                + Base64.encodeBase64URLSafeString(key.getStringRepresentation().getBytes()));
+    }
+
+    private static CheckpointId file2SafeKeeperID(File file) {
+        CheckpointId id = null;
+        id = new CheckpointId(new String(Base64.decodeBase64(file.getName())));
+        return id;
+    }
+
+    public void checkStorageDir() {
+        if (storageRootPath == null) {
+
+            File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+                    + "storage");
+            storageRootPath = defaultStorageDir.getAbsolutePath();
+            logger.warn("Unspecified storage dir; using default dir: {}", defaultStorageDir.getAbsolutePath());
+            if (!defaultStorageDir.exists()) {
+                if (!(defaultStorageDir.mkdirs())) {
+                    logger.error("Storage directory not specified, and cannot create default storage directory : "
+                            + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
+                }
+            }
+        }
+    }
+
+    @Override
+    public void saveState(CheckpointId key, byte[] state, StorageCallback callback) {
+        File f = safeKeeperID2File(key, storageRootPath);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
+        }
+        if (!f.exists()) {
+            if (!f.getParentFile().exists()) {
+                // parent file has prototype id
+                if (!f.getParentFile().mkdirs()) {
+                    callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE,
+                            "Cannot create directory for storing PE [" + key.toString() + "] for prototype: "
+                                    + f.getParentFile().getAbsolutePath());
+                    return;
+                }
+            }
+        } else {
+            if (!f.delete()) {
+                callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE,
+                        "Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath() + "]");
+                return;
+            }
+        }
+
+        try {
+            Files.write(state, f);
+            callback.storageOperationResult(CheckpointingFramework.StorageResultCode.SUCCESS, key.toString());
+        } catch (FileNotFoundException e) {
+            logger.error("Cannot write checkpoint file [" + f.getAbsolutePath() + "]", e);
+            callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE, key.toString() + " : "
+                    + e.getMessage());
+        } catch (IOException e) {
+            logger.error("Cannot write checkpoint file [" + f.getAbsolutePath() + "]", e);
+            callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE, key.toString() + " : "
+                    + e.getMessage());
+        }
+
+    }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java
new file mode 100644
index 0000000..6994acd
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java
@@ -0,0 +1,36 @@
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates a checkpoint fetching operation.
+ *
+ */
+public class FetchTask implements Callable<byte[]> {
+
+    private static Logger logger = LoggerFactory.getLogger(FetchTask.class);
+
+    StateStorage stateStorage;
+    CheckpointId checkpointId;
+
+    public FetchTask(StateStorage stateStorage, CheckpointId checkpointId) {
+        super();
+        this.stateStorage = stateStorage;
+        this.checkpointId = checkpointId;
+    }
+
+    @Override
+    public byte[] call() throws Exception {
+        try {
+            byte[] result = stateStorage.fetchState(checkpointId);
+            return result;
+        } catch (Exception e) {
+            logger.error("Cannot fetch checkpoint data for {}", checkpointId, e);
+            throw e;
+        }
+    }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
new file mode 100644
index 0000000..dd984e6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
@@ -0,0 +1,15 @@
+package org.apache.s4.core.ft;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * Checkpointing module that uses the {@link DefaultFileSystemStateStorage} as a checkpointing backend.
+ *
+ */
+public class FileSystemBackendCheckpointingModule extends AbstractModule {
+    @Override
+    protected void configure() {
+        bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
+        bind(CheckpointingFramework.class).to(SafeKeeper.class);
+    }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java
new file mode 100644
index 0000000..28279ba
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import org.apache.log4j.Logger;
+import org.apache.s4.core.ft.CheckpointingFramework.StorageResultCode;
+
+/**
+ * A factory for creating storage callbacks that simply log callback results
+ *
+ */
+public class LoggingStorageCallbackFactory implements StorageCallbackFactory {
+
+    @Override
+    public StorageCallback createStorageCallback() {
+        return new StorageCallbackLogger();
+    }
+
+    /**
+     * A basic storage callback that simply logs results from storage operations
+     *
+     */
+    static class StorageCallbackLogger implements StorageCallback {
+
+        private static Logger logger = Logger.getLogger("s4-ft");
+
+        @Override
+        public void storageOperationResult(StorageResultCode code, Object message) {
+            if (StorageResultCode.SUCCESS == code) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Callback from storage: " + StorageResultCode.SUCCESS.name() + " : " + message);
+                }
+            } else {
+                logger.warn("Callback from storage: " + StorageResultCode.FAILURE.name() + " : " + message);
+            }
+        }
+    }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java
new file mode 100644
index 0000000..10387ce
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java
@@ -0,0 +1,26 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * Implementation of {@link CheckpointingFramework} that does NO checkpointing.
+ *
+ */
+public final class NoOpCheckpointingFramework implements CheckpointingFramework {
+
+    @Override
+    public StorageCallback saveState(ProcessingElement pe) {
+        return null;
+    }
+
+    @Override
+    public byte[] fetchSerializedState(CheckpointId key) {
+        return null;
+    }
+
+    @Override
+    public boolean isCheckpointable(ProcessingElement pe) {
+        return false;
+    }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
new file mode 100644
index 0000000..4c8358f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * 
+ * <p>
+ * This class is responsible for coordinating interactions between the S4 event processor and the checkpoint storage
+ * backend. In particular, it schedules asynchronous save tasks to be executed on the backend.
+ * </p>
+ * 
+ * 
+ * 
+ */
+public final class SafeKeeper implements CheckpointingFramework {
+
+    private static final class UncaughtExceptionLogger implements UncaughtExceptionHandler {
+        String operationType;
+
+        public UncaughtExceptionLogger(String operationType) {
+            this.operationType = operationType;
+        }
+
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("Cannot execute checkpointing " + operationType + " operation", e);
+        }
+    }
+
+    private static Logger logger = LoggerFactory.getLogger(SafeKeeper.class);
+
+    @Inject
+    private StateStorage stateStorage;
+    @Inject(optional = true)
+    private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
+
+    private ThreadPoolExecutor storageThreadPool;
+    private ThreadPoolExecutor serializationThreadPool;
+    private ThreadPoolExecutor fetchingThreadPool;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.storageMaxThreads")
+    int storageMaxThreads = 1;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.storageThreadKeepAliveSeconds")
+    int storageThreadKeepAliveSeconds = 120;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.storageMaxOutstandingRequests")
+    int storageMaxOutstandingRequests = 1000;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.serializationMaxThreads")
+    int serializationMaxThreads = 1;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.serializationThreadKeepAliveSeconds")
+    int serializationThreadKeepAliveSeconds = 120;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.serializationMaxOutstandingRequests")
+    int serializationMaxOutstandingRequests = 1000;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.maxSerializationLockTime")
+    long maxSerializationLockTime = 1000;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingMaxThreads")
+    int fetchingMaxThreads = 1;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingThreadKeepAliveSeconds")
+    int fetchingThreadKeepAliveSeconds = 120;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingMaxWaitMs")
+    long fetchingMaxWaitMs = 1000;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingMaxConsecutiveFailuresBeforeDisabling")
+    int fetchingMaxConsecutiveFailuresBeforeDisabling = 10;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingDisabledDurationMs")
+    long fetchingDisabledDurationMs = 600000;
+
+    long fetchingDisabledInitTime = -1;
+    AtomicInteger fetchingCurrentConsecutiveFailures = new AtomicInteger();
+
+    public SafeKeeper() {
+    }
+
+    @Inject
+    private void init() {
+
+        // NOTE: those thread pools should be fine tuned according to backend and application load/requirements.
+        // For now:
+        // - number of threads and work queue size have overridable defaults
+        // - failures are logged
+        // - when storage queue is full, we throttle backwards to the serialization threadpool
+        // - when serialization queue is full, we abort execution for new entries
+        // - fetching uses a synchronous queue and therefore is a blocking operation, with a timeout
+
+        ThreadFactory storageThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-storage-%d")
+                .setUncaughtExceptionHandler(new UncaughtExceptionLogger("storage")).build();
+        storageThreadPool = new ThreadPoolExecutor(1, storageMaxThreads, storageThreadKeepAliveSeconds,
+                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(storageMaxOutstandingRequests),
+                storageThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
+        storageThreadPool.allowCoreThreadTimeOut(true);
+
+        ThreadFactory serializationThreadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("Checkpointing-serialization-%d")
+                .setUncaughtExceptionHandler(new UncaughtExceptionLogger("serialization")).build();
+        serializationThreadPool = new ThreadPoolExecutor(1, serializationMaxThreads,
+                serializationThreadKeepAliveSeconds, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
+                        serializationMaxOutstandingRequests), serializationThreadFactory,
+                new ThreadPoolExecutor.AbortPolicy());
+        serializationThreadPool.allowCoreThreadTimeOut(true);
+
+        ThreadFactory fetchingThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-fetching-%d")
+                .setUncaughtExceptionHandler(new UncaughtExceptionLogger("fetching")).build();
+        fetchingThreadPool = new ThreadPoolExecutor(1, fetchingMaxThreads, fetchingThreadKeepAliveSeconds,
+                TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), fetchingThreadFactory);
+        fetchingThreadPool.allowCoreThreadTimeOut(true);
+
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.ft.CheckpointingFramework#saveState(org.apache.s4.core.ProcessingElement)
+     */
+    @Override
+    public StorageCallback saveState(ProcessingElement pe) {
+        StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+        Future<byte[]> futureSerializedState = null;
+        try {
+            futureSerializedState = serializeState(pe);
+        } catch (RejectedExecutionException e) {
+            // if (monitor != null) {
+            // monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1,
+            // S4_CORE_METRICS.toString());
+            // }
+            storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+                    "Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["
+                            + pe.getId() + "]" + "	Remaining capacity for the serialization task queue is ["
+                            + serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+                            + serializationThreadPool.getQueue().size() + "] ; maximum capacity is ["
+                            + serializationThreadPool + "]");
+            return storageCallback;
+        }
+        submitSaveStateTask(new SaveStateTask(new CheckpointId(pe), futureSerializedState, storageCallback,
+                stateStorage), storageCallback);
+        return storageCallback;
+    }
+
+    private Future<byte[]> serializeState(ProcessingElement pe) {
+        Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe));
+        // if (monitor != null) {
+        // monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1,
+        // S4_CORE_METRICS.toString());
+        // }
+        return future;
+    }
+
+    private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) {
+        try {
+            storageThreadPool.execute(task);
+            // if (monitor != null) {
+            // monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1);
+            // }
+        } catch (RejectedExecutionException e) {
+            // if (monitor != null) {
+            // monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1);
+            // }
+            storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+                    "Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is ["
+                            + storageThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+                            + storageThreadPool.getQueue().size() + "] ; maximum capacity is ["
+                            + storageMaxOutstandingRequests + "]");
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.ft.CheckpointingFramework#fetchSerializedState(org.apache.s4.core.ft.SafeKeeperId)
+     */
+    @Override
+    public byte[] fetchSerializedState(CheckpointId key) {
+
+        byte[] result = null;
+
+        if (fetchingCurrentConsecutiveFailures.get() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
+            if ((fetchingDisabledInitTime + fetchingDisabledDurationMs) < System.currentTimeMillis()) {
+                return null;
+            } else {
+                // reached time, reinit
+                fetchingCurrentConsecutiveFailures.set(0);
+            }
+        }
+        Future<byte[]> fetched = fetchingThreadPool.submit(new FetchTask(stateStorage, key));
+        try {
+            result = fetched.get(fetchingMaxWaitMs, TimeUnit.MILLISECONDS);
+            fetchingCurrentConsecutiveFailures.set(0);
+            return result;
+        } catch (TimeoutException te) {
+            logger.error("Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms",
+                    key.getStringRepresentation(), fetchingMaxWaitMs);
+        } catch (InterruptedException e) {
+            logger.error(
+                    "Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms because of an interruption",
+                    key.getStringRepresentation(), fetchingMaxWaitMs);
+            Thread.currentThread().interrupt();
+        } catch (ExecutionException e) {
+            logger.error("Cannot fetch checkpoint from backend for key [{}] due to {}", key.getStringRepresentation(),
+                    e.getCause().getClass().getName() + "/" + e.getCause().getMessage());
+        }
+        if (fetchingCurrentConsecutiveFailures.incrementAndGet() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
+            logger.trace(
+                    "Due to {} successive checkpoint fetching failures, fetching is temporarily disabled for {} ms",
+                    fetchingMaxConsecutiveFailuresBeforeDisabling, fetchingDisabledDurationMs);
+            fetchingDisabledInitTime = System.currentTimeMillis();
+        }
+
+        return result;
+    }
+
+    @Override
+    public boolean isCheckpointable(ProcessingElement pe) {
+        if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.NONE)) {
+            return false;
+        }
+        if (pe.getCheckpointingConfig().frequency > 0 && pe.isDirty()) {
+            if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.EVENT_COUNT)) {
+                if (pe.getEventCount() % pe.getCheckpointingConfig().frequency == 0) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java
new file mode 100644
index 0000000..79931a5
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * Encapsulates a checkpoint request. It is scheduled by the checkpointing framework.
+ * 
+ */
+public class SaveStateTask implements Runnable {
+
+    private static Logger logger = LoggerFactory.getLogger(SaveStateTask.class);
+
+    CheckpointId safeKeeperId;
+    byte[] serializedState;
+    Future<byte[]> futureSerializedState = null;
+    StorageCallback storageCallback;
+    StateStorage stateStorage;
+
+    public SaveStateTask(CheckpointId safeKeeperId, byte[] state, StorageCallback storageCallback,
+            StateStorage stateStorage) {
+        super();
+        this.safeKeeperId = safeKeeperId;
+        this.serializedState = state;
+        this.storageCallback = storageCallback;
+        this.stateStorage = stateStorage;
+    }
+
+    public SaveStateTask(CheckpointId safeKeeperId, Future<byte[]> futureSerializedState,
+            StorageCallback storageCallback, StateStorage stateStorage) {
+        this.safeKeeperId = safeKeeperId;
+        this.futureSerializedState = futureSerializedState;
+        this.storageCallback = storageCallback;
+        this.stateStorage = stateStorage;
+    }
+
+    @Override
+    public void run() {
+        if (futureSerializedState != null) {
+            try {
+                // TODO parameterizable timeout
+                stateStorage.saveState(safeKeeperId, futureSerializedState.get(1000, TimeUnit.MILLISECONDS),
+                        storageCallback);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (ExecutionException e) {
+                logger.warn("Cannot save checkpoint : " + safeKeeperId, e);
+            } catch (TimeoutException e) {
+                logger.warn("Cannot save checkpoint {} : could not serialize before timeout", safeKeeperId);
+            }
+        }
+    }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java
new file mode 100644
index 0000000..6cdebb4
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.Callable;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * Encaspulate a PE serialization operation. This operation locks the PE instance in order to avoid any inconsistent
+ * serialized state. If serialization is successful, the PE is marked as "not dirty".
+ * 
+ */
+public class SerializeTask implements Callable<byte[]> {
+
+    ProcessingElement pe;
+
+    public SerializeTask(ProcessingElement pe) {
+        super();
+        this.pe = pe;
+    }
+
+    @Override
+    public byte[] call() throws Exception {
+        synchronized (pe) {
+            byte[] state = pe.serializeState();
+            pe.clearDirty();
+            return state;
+        }
+    }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java
new file mode 100644
index 0000000..cbe2bda
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.Set;
+
+import com.google.inject.Inject;
+
+/**
+ * <p>
+ * Defines the methods that must be implemented by a backend storage for checkpoints.
+ * </p>
+ *
+ * NOTE: the backend implementation usually needs some kind of initialization. The recommended place to do this is in a
+ * custom method annotated with {@link Inject} annotation, which will be called after the instance is constructed.
+ *
+ */
+public interface StateStorage {
+
+    /**
+     * Stores a checkpoint.
+     *
+     * <p>
+     * NOTE: we don't handle any failure/success return value, because all failure/success notifications go through the
+     * StorageCallback reference
+     * </p>
+     *
+     * @param key
+     *            safeKeeperId
+     * @param state
+     *            checkpoint data as a byte array
+     * @param callback
+     *            callback for receiving notifications of storage operations. This callback is configurable
+     */
+    public void saveState(CheckpointId key, byte[] state, StorageCallback callback);
+
+    /**
+     * Fetches data for a stored checkpoint.
+     * <p>
+     * Must return null if storage does not contain this key.
+     * </p>
+     *
+     * @param key
+     *            safeKeeperId for this checkpoint
+     *
+     * @return stored checkpoint data, or null if the storage does not contain data for the given key
+     */
+    public byte[] fetchState(CheckpointId key);
+
+    /**
+     * Fetches all stored safeKeeper Ids.
+     *
+     * @return all stored safeKeeper Ids.
+     */
+    public Set<CheckpointId> fetchStoredKeys();
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java
new file mode 100644
index 0000000..6676dd0
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+/**
+ *
+ * Callback for reporting the result of an asynchronous storage operation
+ *
+ */
+public interface StorageCallback {
+
+    /**
+     * Notifies the result of a storage operation
+     *
+     * @param resultCode
+     *            code for the result : {@link SafeKeeper.StorageResultCode SafeKeeper.StorageResultCode}
+     * @param message
+     *            whatever message object is suitable
+     */
+    public void storageOperationResult(CheckpointingFramework.StorageResultCode resultCode, Object message);
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java
new file mode 100644
index 0000000..c448399
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+/**
+ * A factory for creating storage callbacks. Storage callback implementations
+ * that can take specific actions upon success or failure of asynchronous
+ * storage operations.
+ *
+ */
+public interface StorageCallbackFactory {
+
+    /**
+     * Factory method
+     *
+     * @return returns a StorageCallback instance
+     */
+    public StorageCallback createStorageCallback();
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 12f5d2f..70fe4ec 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -6,16 +6,17 @@
 
 import junit.framework.Assert;
 
-import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.comm.BareCommModule;
 import org.apache.s4.core.triggers.TriggeredApp;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.s4.wordcount.StringEvent;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.junit.After;
 
-import com.google.common.io.Resources;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 
@@ -45,9 +46,7 @@
 
     protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
         final ZooKeeper zk = CommTestUtils.createZkClient();
-        Injector injector = Guice.createInjector(
-                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
-                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+        Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
         app = injector.getInstance(TriggeredApp.class);
         app.init();
         app.start();
@@ -61,7 +60,7 @@
         CountDownLatch signalEvent1Triggered = new CountDownLatch(1);
         CommTestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
 
-        CommTestUtils.injectIntoStringSocketAdapter(time1);
+        app.stream.receiveEvent(new EventMessage("-1", "stream", app.getSerDeser().serialize(new StringEvent(time1))));
 
         // check event processed
         Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
@@ -69,5 +68,4 @@
         // return latch on trigger signal
         return signalEvent1Triggered;
     }
-
 }
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java
new file mode 100644
index 0000000..431e31e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java
@@ -0,0 +1,20 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
+import org.apache.s4.fixtures.CommTestUtils;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+public class CheckpointingModuleWithUnrespondingFetchingStorageBackend extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
+                CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath());
+        bind(StateStorage.class).to(StorageWithUnrespondingFetching.class);
+        bind(CheckpointingFramework.class).to(SafeKeeper.class);
+        bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
new file mode 100644
index 0000000..ef3fabd
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -0,0 +1,153 @@
+package org.apache.s4.core.ft;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.comm.BareCommModule;
+import org.apache.s4.core.App;
+import org.apache.s4.core.BareCoreModule;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class CheckpointingTest {
+
+    private static Factory zookeeperServerConnectionFactory = null;
+    public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("user.dir") + File.separator + "tmp");
+    public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+            + "storage");
+
+    @Before
+    public void prepare() throws Exception {
+        zookeeperServerConnectionFactory = CoreTestUtils.startZookeeperServer();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        CoreTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+    }
+
+    @Test
+    public void testCheckpointStorage() throws Exception {
+        final ZooKeeper zk = CoreTestUtils.createZkClient();
+
+        // 2. generate a simple event that creates and changes the state of
+        // the
+        // PE
+
+        // NOTE: coordinate through zookeeper
+        final CountDownLatch signalValue1Set = new CountDownLatch(1);
+
+        CoreTestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+        final CountDownLatch signalCheckpointed = new CountDownLatch(1);
+        CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
+
+        Injector injector = Guice.createInjector(new BareCommModule(),
+                new MockCoreModuleWithFileBaseCheckpointingBackend());
+        TestApp app = injector.getInstance(TestApp.class);
+        app.init();
+        app.start();
+
+        Event event = new Event();
+        event.put("command", String.class, "setValue1");
+        event.put("value", String.class, "message1");
+
+        app.testStream.receiveEvent(new EventMessage("", "stream1", app.getSerDeser().serialize(event)));
+
+        signalValue1Set.await();
+
+        StatefulTestPE pe = (StatefulTestPE) app.getPE("statefulPE1").getInstanceForKey("X");
+
+        Assert.assertEquals("message1", pe.getValue1());
+        Assert.assertEquals("", pe.getValue2());
+
+        // 3. generate a checkpoint event
+        event = new Event();
+        event.put("command", String.class, "checkpoint");
+        app.testStream.receiveEvent(new EventMessage("", "stream1", app.getSerDeser().serialize(event)));
+        Assert.assertTrue(signalCheckpointed.await(10, TimeUnit.SECONDS));
+
+        // NOTE: the backend has asynchronous save operations
+        Thread.sleep(1000);
+
+        CheckpointId safeKeeperId = new CheckpointId(pe);
+        File expected = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator + "storage"
+                + File.separator + safeKeeperId.getPrototypeId() + File.separator
+                + Base64.encodeBase64URLSafeString(safeKeeperId.getStringRepresentation().getBytes()));
+
+        // 4. verify that state was correctly persisted
+        Assert.assertTrue(expected.exists());
+
+        StatefulTestPE refPE = new StatefulTestPE();
+        refPE.onCreate();
+        refPE.setValue1("message1");
+
+        Field idField = ProcessingElement.class.getDeclaredField("id");
+        idField.setAccessible(true);
+        idField.set(refPE, "X");
+
+        byte[] refBytes = app.getSerDeser().serialize(refPE);
+
+        Assert.assertTrue(Arrays.equals(refBytes, Files.toByteArray(expected)));
+
+    }
+
+    private static class TestApp extends App {
+        Stream<Event> testStream;
+        int count;
+
+        @Override
+        protected void onStart() {
+        }
+
+        @Override
+        protected void onInit() {
+
+            StatefulTestPE pe = createPE(StatefulTestPE.class, "statefulPE1");
+            testStream = createStream("stream1", new KeyFinder<Event>() {
+                @Override
+                public List<String> get(Event event) {
+                    return ImmutableList.of("X");
+                }
+            }, pe);
+        }
+
+        @Override
+        protected void onClose() {
+        }
+
+    }
+
+    private static class MockCoreModuleWithFileBaseCheckpointingBackend extends BareCoreModule {
+
+        protected void configure() {
+            super.configure();
+            bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
+            bind(CheckpointingFramework.class).to(SafeKeeper.class);
+            bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
+        }
+
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountApp.java
new file mode 100644
index 0000000..770b219
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountApp.java
@@ -0,0 +1,18 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+import org.apache.s4.wordcount.WordCountApp;
+
+public class FTWordCountApp extends WordCountApp {
+
+    @Override
+    protected void onInit() {
+        super.onInit();
+        getPE("classifierPE").setCheckpointingConfig(
+                new CheckpointingConfig.Builder(CheckpointingMode.EVENT_COUNT).frequency(1).build());
+        getPE("counterPE").setCheckpointingConfig(
+                new CheckpointingConfig.Builder(CheckpointingMode.EVENT_COUNT).frequency(1).build());
+
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
new file mode 100644
index 0000000..8ab093f
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -0,0 +1,131 @@
+package org.apache.s4.core.ft;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.DefaultCoreModule;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.s4.wordcount.WordCountTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class FTWordCountTest extends ZkBasedTest {
+
+    private Process forkedS4App;
+
+    @After
+    public void clean() throws IOException, InterruptedException {
+        CoreTestUtils.killS4App(forkedS4App);
+    }
+
+    @Test
+    public void testCheckpointAndRecovery() throws Exception {
+
+        Injector injector = Guice.createInjector(
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+
+        TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
+
+        final ZooKeeper zk = CoreTestUtils.createZkClient();
+
+        restartNode();
+
+        CountDownLatch signalTextProcessed = new CountDownLatch(1);
+        CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+
+        // add authorizations for processing
+        for (int i = 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+
+        CountDownLatch signalSentence1Processed = new CountDownLatch(1);
+        CoreTestUtils.watchAndSignalCreation("/classifierIteration_" + WordCountTest.SENTENCE_1_TOTAL_WORDS,
+                signalSentence1Processed, zk);
+
+        injectSentence(injector, emitter, WordCountTest.SENTENCE_1);
+        signalSentence1Processed.await(10, TimeUnit.SECONDS);
+        Thread.sleep(1000);
+
+        // crash the app
+        forkedS4App.destroy();
+
+        restartNode();
+        // add authorizations for continuing processing. Without these, the
+        // WordClassifier processed keeps waiting
+        for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS + 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS
+                + WordCountTest.SENTENCE_2_TOTAL_WORDS; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+
+        CountDownLatch sentence2Processed = new CountDownLatch(1);
+        CoreTestUtils
+                .watchAndSignalCreation("/classifierIteration_"
+                        + (WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS),
+                        sentence2Processed, zk);
+
+        injectSentence(injector, emitter, WordCountTest.SENTENCE_2);
+
+        sentence2Processed.await(10, TimeUnit.SECONDS);
+        Thread.sleep(1000);
+
+        // crash the app
+        forkedS4App.destroy();
+        restartNode();
+
+        // add authorizations for continuing processing. Without these, the
+        // WordClassifier processed keeps waiting
+        for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS + 1; i <= WordCountTest.TOTAL_WORDS; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+        injectSentence(injector, emitter, WordCountTest.SENTENCE_3);
+        signalTextProcessed.await(10, TimeUnit.SECONDS);
+        File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+        if (!results.exists()) {
+            // in case the results file isn't ready yet
+            Thread.sleep(1000);
+            results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+        }
+        String s = CoreTestUtils.readFile(results);
+        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+    }
+
+    private void injectSentence(Injector injector, TCPEmitter emitter, String sentence) {
+        Event event;
+        event = new Event();
+        event.put("sentence", String.class, sentence);
+        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+                .serialize(event)));
+    }
+
+    private void restartNode() throws IOException, InterruptedException {
+        CountDownLatch signalConsumerReady = RecoveryTest.getConsumerReadySignal("inputStream");
+
+        // recovering and making sure checkpointing still works
+        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass",
+                FTWordCountApp.class.getName(), "-p",
+                "s4.checkpointing.filesystem.storageRootPath=" + CommTestUtils.DEFAULT_STORAGE_DIR,
+                "-extraModulesClasses", FileSystemBackendCheckpointingModule.class.getName() });
+        Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
new file mode 100644
index 0000000..3787ef7
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
@@ -0,0 +1,50 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ft.CheckpointingFramework.StorageResultCode;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.google.inject.name.Names;
+
+/**
+ * Creates the /checkpointed znode if a successful checkpointing callback is received. Does it only once.
+ *
+ */
+public class FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule extends
+        FileSystemBackendCheckpointingModule {
+
+    @Override
+    protected void configure() {
+        super.configure();
+        bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
+        bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
+                CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath());
+    }
+
+    public static class DummyZKStorageCallbackFactory implements StorageCallbackFactory {
+
+        @Override
+        public StorageCallback createStorageCallback() {
+            return new StorageCallback() {
+
+                @Override
+                public void storageOperationResult(StorageResultCode resultCode, Object message) {
+                    if (resultCode == StorageResultCode.SUCCESS) {
+                        try {
+                            ZooKeeper zkClient = CoreTestUtils.createZkClient();
+                            zkClient.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+
+                    }
+                };
+            };
+
+        }
+
+    }
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
new file mode 100644
index 0000000..326e1a1
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -0,0 +1,158 @@
+package org.apache.s4.core.ft;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.DefaultCoreModule;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class RecoveryTest extends ZkBasedTest {
+
+    private Process forkedS4App = null;
+
+    @After
+    public void cleanup() throws Exception {
+        CoreTestUtils.killS4App(forkedS4App);
+    }
+
+    @Test
+    public void testCheckpointRestorationThroughApplicationEvent() throws Exception {
+
+        testCheckpointingConfiguration(S4AppWithManualCheckpointing.class,
+                FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.class, true,
+                "value1=message1 ; value2=message2");
+
+    }
+
+    @Test
+    public void testEventCountBasedCheckpointingAndRecovery() throws Exception {
+
+        testCheckpointingConfiguration(S4AppWithCountBasedCheckpointing.class,
+                FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.class, false,
+                "value1=message1 ; value2=message2");
+
+    }
+
+    @Test
+    public void testTimeBasedCheckpointingAndRecovery() throws Exception {
+        testCheckpointingConfiguration(S4AppWithTimeBasedCheckpointing.class,
+                FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.class, false,
+                "value1=message1 ; value2=message2");
+    }
+
+    @Test
+    public void testTimingOutRecovery() throws Exception {
+        testCheckpointingConfiguration(S4AppWithCountBasedCheckpointing.class,
+                CheckpointingModuleWithUnrespondingFetchingStorageBackend.class, false, "value1= ; value2=message2");
+    }
+
+    private void insertCheckpointInstruction(Injector injector, TCPEmitter emitter) {
+        Event event;
+        event = new Event();
+        event.put("command", String.class, "checkpoint");
+        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+                .serialize(event)));
+    }
+
+    private void testCheckpointingConfiguration(Class<?> appClass, Class<?> backendModuleClass,
+            boolean manualCheckpointing, String expectedFinalResult) throws IOException, InterruptedException,
+            KeeperException {
+        // here checkpointing is automatic for every event: no need to send a "checkpoint" event. The checkpointing
+        // configuration is specified in the app (S4AppWithCountBasedCheckpointing class)
+
+        final ZooKeeper zk = CoreTestUtils.createZkClient();
+
+        // use a latch for waiting for app to be ready
+        CountDownLatch signalConsumerReady = getConsumerReadySignal("inputStream");
+
+        // 1. instantiate remote S4 app
+        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass", appClass.getName(),
+                "-extraModulesClasses", backendModuleClass.getName() });
+
+        Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
+
+        CountDownLatch signalValue1Set = new CountDownLatch(1);
+        CoreTestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+        final CountDownLatch signalCheckpointed = new CountDownLatch(1);
+        CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
+
+        Injector injector = Guice.createInjector(
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+
+        TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
+
+        Event event = new Event();
+        event.put("command", String.class, "setValue1");
+        event.put("value", String.class, "message1");
+        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+                .serialize(event)));
+
+        if (manualCheckpointing) {
+            insertCheckpointInstruction(injector, emitter);
+        }
+
+        Assert.assertTrue(signalCheckpointed.await(10, TimeUnit.SECONDS));
+
+        forkedS4App.destroy();
+
+        zk.delete("/data", -1);
+
+        signalConsumerReady = getConsumerReadySignal("inputStream");
+        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass",
+                S4AppWithManualCheckpointing.class.getName(), "-extraModulesClasses", backendModuleClass.getName() });
+
+        Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
+        // // trigger recovery by sending application event to set value 2
+        CountDownLatch signalValue2Set = new CountDownLatch(1);
+        CoreTestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
+
+        event = new Event();
+        event.put("command", String.class, "setValue2");
+        event.put("value", String.class, "message2");
+        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+                .serialize(event)));
+
+        Assert.assertTrue(signalValue2Set.await(10, TimeUnit.SECONDS));
+
+        Assert.assertEquals(expectedFinalResult, new String(zk.getData("/data", false, null)));
+    }
+
+    public static CountDownLatch getConsumerReadySignal(String streamName) {
+        final CountDownLatch signalAppReady = new CountDownLatch(1);
+
+        ZkClient zkClient = new ZkClient("localhost:" + CoreTestUtils.ZK_PORT);
+        // TODO check a proper app state variable. This is hacky
+        zkClient.subscribeChildChanges("/s4/streams/" + streamName + "/consumers", new IZkChildListener() {
+
+            @Override
+            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+                if (currentChilds.size() == 1) {
+                    signalAppReady.countDown();
+                }
+
+            }
+        });
+        return signalAppReady;
+    }
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithCountBasedCheckpointing.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithCountBasedCheckpointing.java
new file mode 100644
index 0000000..0ad4c0d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithCountBasedCheckpointing.java
@@ -0,0 +1,24 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+
+public class S4AppWithCountBasedCheckpointing extends App {
+
+    @Override
+    protected void onStart() {
+    }
+
+    @Override
+    protected void onInit() {
+        StatefulTestPE pe = createPE(StatefulTestPE.class);
+        pe.setSingleton(true);
+        pe.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.EVENT_COUNT).frequency(1).build());
+        createInputStream("inputStream", pe);
+    }
+
+    @Override
+    protected void onClose() {
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithManualCheckpointing.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithManualCheckpointing.java
new file mode 100644
index 0000000..2971c57
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithManualCheckpointing.java
@@ -0,0 +1,29 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.App;
+
+/**
+ *
+ *
+ */
+public class S4AppWithManualCheckpointing extends App {
+
+    @Override
+    protected void onStart() {
+
+    }
+
+    @Override
+    protected void onInit() {
+        StatefulTestPE pe = createPE(StatefulTestPE.class);
+        pe.setSingleton(true);
+        createInputStream("inputStream", pe);
+    }
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithTimeBasedCheckpointing.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithTimeBasedCheckpointing.java
new file mode 100644
index 0000000..910f882
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithTimeBasedCheckpointing.java
@@ -0,0 +1,28 @@
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+
+public class S4AppWithTimeBasedCheckpointing extends App {
+
+    @Override
+    protected void onStart() {
+    }
+
+    @Override
+    protected void onInit() {
+        StatefulTestPE pe = createPE(StatefulTestPE.class);
+        pe.setSingleton(true);
+        // checkpoints (if applicable) every 1 ms!
+        pe.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.TIME).frequency(1)
+                .timeUnit(TimeUnit.MILLISECONDS).build());
+        createInputStream("inputStream", pe);
+    }
+
+    @Override
+    protected void onClose() {
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StatefulTestPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StatefulTestPE.java
new file mode 100644
index 0000000..70e321d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StatefulTestPE.java
@@ -0,0 +1,97 @@
+package org.apache.s4.core.ft;
+
+import java.io.IOException;
+
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class StatefulTestPE extends ProcessingElement {
+
+    String id;
+    String value1 = "";
+    String value2 = "";
+    transient ZooKeeper zk = null;
+
+    public void onEvent(org.apache.s4.base.Event event) {
+        try {
+
+            if ("setValue1".equals(event.get("command"))) {
+                setValue1(event.get("value"));
+                zk.create("/value1Set", event.get("value").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } else if ("setValue2".equals(event.get("command"))) {
+                setValue2(event.get("value"));
+                zk.create("/value2Set", event.get("value").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } else if ("checkpoint".equals(event.get("command"))) {
+                checkpoint();
+            } else {
+                throw new RuntimeException("unidentified event: " + event);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public String getValue1() {
+        return value1;
+    }
+
+    public void setValue1(String value1) {
+        this.value1 = value1;
+        persistValues();
+    }
+
+    public String getValue2() {
+        return value2;
+    }
+
+    public void setValue2(String value2) {
+        this.value2 = value2;
+        persistValues();
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    // NOTE: we use a file as a simple way to keep track of changes
+    private void persistValues() {
+
+        try {
+            try {
+                zk.delete("/data", -1);
+            } catch (NoNodeException ignored) {
+            }
+            zk.create("/data", ("value1=" + value1 + " ; value2=" + value2).getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (KeeperException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void onCreate() {
+        try {
+            zk = CoreTestUtils.createZkClient();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StorageWithUnrespondingFetching.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StorageWithUnrespondingFetching.java
new file mode 100644
index 0000000..2906fe8
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StorageWithUnrespondingFetching.java
@@ -0,0 +1,45 @@
+package org.apache.s4.core.ft;
+
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class StorageWithUnrespondingFetching implements StateStorage {
+
+    @Inject
+    DefaultFileSystemStateStorage storage = new DefaultFileSystemStateStorage();
+
+    @Inject
+    @Named("s4.checkpointing.filesystem.storageRootPath")
+    String storageRootPath;
+
+    @Inject
+    private void initStorageRootPath() {
+        // manual init because we are not directly injecting the default file system storage, but creating a new
+        // instance in this class
+        storage.storageRootPath = this.storageRootPath;
+    }
+
+    @Override
+    public void saveState(CheckpointId key, byte[] state, StorageCallback callback) {
+        storage.saveState(key, state, callback);
+    }
+
+    @Override
+    public byte[] fetchState(CheckpointId key) {
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            Thread.currentThread().interrupt();
+        }
+        return storage.fetchState(key);
+    }
+
+    @Override
+    public Set<CheckpointId> fetchStoredKeys() {
+        return storage.fetchStoredKeys();
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java
index e72073c..6f1cc9f 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java
@@ -31,8 +31,7 @@
             // TODO Auto-generated catch block
             e.printStackTrace();
         } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+            Thread.currentThread().interrupt();
         }
     }
 
@@ -55,8 +54,7 @@
             // TODO Auto-generated catch block
             e.printStackTrace();
         } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+            Thread.currentThread().interrupt();
         }
     }
 
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java
index 849d7b2..e2a1b38 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java
@@ -1,21 +1,18 @@
 package org.apache.s4.core.triggers;
 
-import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
 import org.apache.s4.core.Stream;
 import org.apache.s4.core.TriggerTest;
-import org.apache.s4.fixtures.SocketAdapter;
 import org.apache.s4.wordcount.SentenceKeyFinder;
-import org.apache.s4.wordcount.StringEvent;
 
 import com.google.inject.Inject;
 
 public class TriggeredApp extends App {
 
-    SocketAdapter<StringEvent> socketAdapter;
+    public Stream<Event> stream;
 
     @Inject
     public TriggeredApp() {
@@ -32,7 +29,7 @@
     protected void onInit() {
 
         TriggerablePE prototype = createPE(TriggerablePE.class);
-        Stream<StringEvent> stream = createStream("stream", new SentenceKeyFinder(), prototype);
+        stream = createStream("stream", new SentenceKeyFinder(), prototype);
         switch (TriggerTest.triggerType) {
             case COUNT_BASED:
                 prototype.setTrigger(Event.class, 1, 0, TimeUnit.SECONDS);
@@ -43,11 +40,6 @@
                 break;
         }
 
-        try {
-            socketAdapter = new SocketAdapter<StringEvent>(stream, new SocketAdapter.SentenceEventFactory());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
     }
 
     @Override
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 108a144..620494f 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -15,23 +15,30 @@
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
 import com.sun.net.httpserver.Headers;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
@@ -45,12 +52,11 @@
  * - ... or from a web server
  * 
  */
-public class TestAutomaticDeployment {
+public class TestAutomaticDeployment extends ZkBasedTest {
 
     private Factory zookeeperServerConnectionFactory;
     private Process forkedNode;
     private ZkClient zkClient;
-    private static final String CLUSTER_NAME = "clusterZ";
     private HttpServer httpServer;
     private static File tmpAppsDir;
 
@@ -95,7 +101,7 @@
 
         ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
         record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-        zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/app/s4App", record, CreateMode.PERSISTENT);
+        zkClient.create("/s4/clusters/cluster1/app/s4App", record, CreateMode.PERSISTENT);
 
         Assert.assertTrue(signalAppInitialized.await(20, TimeUnit.SECONDS));
         Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
@@ -106,7 +112,16 @@
         CommTestUtils
                 .watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, CommTestUtils.createZkClient());
 
-        CoreTestUtils.injectIntoStringSocketAdapter(time1);
+        Injector injector = Guice.createInjector(
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+
+        TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
+
+        Event event = new Event();
+        event.put("line", String.class, time1);
+        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+                .serialize(event)));
 
         // check event processed
         Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
@@ -148,17 +163,13 @@
         // current package .
 
         // 1. start s4 nodes. Check that no app is deployed.
-        TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
-        taskSetup.clean(CLUSTER_NAME);
-        taskSetup.setup(CLUSTER_NAME, 1, 1300);
-
         zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         zkClient.setZkSerializer(new ZNRecordSerializer());
-        List<String> processes = zkClient.getChildren("/s4/clusters/" + CLUSTER_NAME + "/process");
+        List<String> processes = zkClient.getChildren("/s4/clusters/cluster1/process");
         Assert.assertTrue(processes.size() == 0);
         final CountDownLatch signalProcessesReady = new CountDownLatch(1);
 
-        zkClient.subscribeChildChanges("/s4/clusters/" + CLUSTER_NAME + "/process", new IZkChildListener() {
+        zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
 
             @Override
             public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
@@ -169,7 +180,7 @@
             }
         });
 
-        forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CLUSTER_NAME });
+        forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" });
 
         // TODO synchro with ready state from zk
         Thread.sleep(10000);
@@ -177,22 +188,19 @@
 
     }
 
-    @Before
-    public void prepare() throws Exception {
-        CommTestUtils.cleanupTmpDirs();
-        zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
-        final ZooKeeper zk = CommTestUtils.createZkClient();
-        try {
-            zk.delete("/simpleAppCreated", -1);
-        } catch (Exception ignored) {
-        }
-
-        zk.close();
-    }
+    // @Before
+    // public void clean() throws Exception {
+    // final ZooKeeper zk = CommTestUtils.createZkClient();
+    // try {
+    // zk.delete("/simpleAppCreated", -1);
+    // } catch (Exception ignored) {
+    // }
+    //
+    // zk.close();
+    // }
 
     @After
     public void cleanup() throws Exception {
-        CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
         CommTestUtils.killS4App(forkedNode);
         if (httpServer != null) {
             httpServer.stop(0);
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 832ce1b..00a7611 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -7,7 +7,6 @@
 
 import junit.framework.Assert;
 
-import org.apache.s4.core.App;
 import org.apache.s4.core.Main;
 import org.gradle.tooling.BuildLauncher;
 import org.gradle.tooling.GradleConnector;
@@ -25,10 +24,6 @@
  */
 public class CoreTestUtils extends CommTestUtils {
 
-    public static Process forkS4App(Class<?> moduleClass, Class<?> appClass) throws IOException, InterruptedException {
-        return forkProcess(App.class.getName(), -1, moduleClass.getName(), appClass.getName());
-    }
-
     public static Process forkS4Node() throws IOException, InterruptedException {
         return forkS4Node(new String[] {});
     }
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
deleted file mode 100644
index c1815bf..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-import org.apache.s4.core.Stream;
-import org.apache.s4.wordcount.KeyValueEvent;
-import org.apache.s4.wordcount.StringEvent;
-
-public class SocketAdapter<T extends StringEvent> {
-
-    static ServerSocket serverSocket;
-
-    /**
-     * Listens to incoming sentence and forwards them to a sentence Stream. Each sentence is sent through a new socket
-     * connection
-     * 
-     * @param stream
-     * @throws IOException
-     */
-    public SocketAdapter(final Stream<T> stream, final StringEventFactory<T> stringEventFactory) throws IOException {
-        Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                serverSocket = null;
-                Socket connectedSocket;
-                BufferedReader in = null;
-                try {
-                    serverSocket = new ServerSocket(12000);
-                    while (true) {
-                        connectedSocket = serverSocket.accept();
-                        in = new BufferedReader(new InputStreamReader(connectedSocket.getInputStream()));
-
-                        String line = in.readLine();
-                        System.out.println("read: " + line);
-                        stream.put(stringEventFactory.create(line));
-                        connectedSocket.close();
-                    }
-
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    // System.exit(-1);
-                } finally {
-                    if (in != null) {
-                        try {
-                            in.close();
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                    if (serverSocket != null) {
-                        try {
-                            serverSocket.close();
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }
-            }
-        });
-        t.start();
-
-    }
-
-    public void close() {
-        if (serverSocket != null) {
-            try {
-                serverSocket.close();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    interface StringEventFactory<T> {
-        T create(String string);
-    }
-
-    public static class SentenceEventFactory implements StringEventFactory<StringEvent> {
-
-        @Override
-        public StringEvent create(String string) {
-            return new StringEvent(string);
-        }
-
-    }
-
-    public static class KeyValueEventFactory implements StringEventFactory<KeyValueEvent> {
-
-        @Override
-        public KeyValueEvent create(String string) {
-            return new KeyValueEvent(string);
-        }
-
-    }
-
-}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java
index 2401f70..b234ff3 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java
@@ -1,19 +1,19 @@
 package org.apache.s4.wordcount;
 
-
-import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.s4.base.Event;
 import org.apache.s4.base.KeyFinder;
 
-public class SentenceKeyFinder implements KeyFinder<StringEvent> {
+import com.google.common.collect.ImmutableList;
+
+public class SentenceKeyFinder implements KeyFinder<Event> {
 
     private static final String SENTENCE_KEY = "sentence";
 
-    @SuppressWarnings("serial")
     @Override
-    public List<String> get(StringEvent event) {
-        return new ArrayList<String>(){{add(SENTENCE_KEY);}};
+    public List<String> get(Event event) {
+        return ImmutableList.of(SENTENCE_KEY);
     }
 
 }
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java
index 278b752..cda0ab4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java
@@ -1,17 +1,14 @@
 package org.apache.s4.wordcount;
 
-import java.io.IOException;
-
+import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
 import org.apache.s4.core.Stream;
-import org.apache.s4.fixtures.SocketAdapter;
 
 import com.google.inject.Inject;
 
 public class WordCountApp extends App {
 
     protected boolean checkpointing = false;
-    SocketAdapter<StringEvent> socketAdapter;
 
     @Inject
     public WordCountApp() {
@@ -27,42 +24,21 @@
     @Override
     protected void onInit() {
 
-        WordClassifierPE wordClassifierPrototype = createPE(WordClassifierPE.class);
+        WordClassifierPE wordClassifierPrototype = createPE(WordClassifierPE.class, "classifierPE");
         Stream<WordCountEvent> wordCountStream = createStream("words counts stream", new WordCountKeyFinder(),
                 wordClassifierPrototype);
-        WordCounterPE wordCounterPrototype = createPE(WordCounterPE.class);
+        WordCounterPE wordCounterPrototype = createPE(WordCounterPE.class, "counterPE");
         // wordCounterPrototype.setTrigger(WordSeenEvent.class, 1, 0, null);
         wordCounterPrototype.setWordClassifierStream(wordCountStream);
         Stream<WordSeenEvent> wordSeenStream = createStream("words seen stream", new WordSeenKeyFinder(),
                 wordCounterPrototype);
         WordSplitterPE wordSplitterPrototype = createPE(WordSplitterPE.class);
         wordSplitterPrototype.setWordSeenStream(wordSeenStream);
-        Stream<StringEvent> sentenceStream = createStream("sentences stream", new SentenceKeyFinder(),
-                wordSplitterPrototype);
-
-        try {
-            socketAdapter = new SocketAdapter<StringEvent>(sentenceStream, new SocketAdapter.SentenceEventFactory());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-
-        if (checkpointing) {
-
-            // TODO move to subclass
-
-            // checkpoint word classifier because it maintains a sync counter
-            // for the test
-            // LoggerFactory.getLogger(getClass()).info("setting checkpointing for word classifier and word counter");
-            // wordClassifierPrototype.setCheckpointingFrequency(1);
-            // // checkpoint word counter because it maintains word counts
-            // wordCounterPrototype.setCheckpointingFrequency(1);
-        }
-
+        Stream<Event> sentenceStream = createInputStream("inputStream", new SentenceKeyFinder(), wordSplitterPrototype);
     }
 
     @Override
     protected void onClose() {
-        socketAdapter.close();
 
     }
 
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 2365a48..5987e4a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -3,22 +3,30 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
-import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.core.Main;
 import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class WordCountTest {
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class WordCountTest extends ZkBasedTest {
 
     public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
     public static final int SENTENCE_1_TOTAL_WORDS = SENTENCE_1.split(" ").length;
@@ -28,12 +36,25 @@
     public static final int SENTENCE_3_TOTAL_WORDS = SENTENCE_3.split(" ").length;
     public static final String FLAG = ";";
     public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
-    private static Factory zookeeperServerConnectionFactory;
+    private TCPEmitter emitter;
+    private Injector injector;
+
+    // private static Factory zookeeperServerConnectionFactory;
+
+    // @Before
+    // public void prepare() throws IOException, InterruptedException, KeeperException {
+    // CommTestUtils.cleanupTmpDirs();
+    // zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
+    //
+    // }
 
     @Before
-    public void prepare() throws IOException, InterruptedException, KeeperException {
-        CommTestUtils.cleanupTmpDirs();
-        zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
+    public void prepareEmitter() throws IOException {
+        injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+                .openStream(), "cluster1"), new DefaultCoreModule(Resources.getResource("default.s4.core.properties")
+                .openStream()));
+
+        emitter = injector.getInstance(TCPEmitter.class);
 
     }
 
@@ -54,12 +75,8 @@
     @Test
     public void testSimple() throws Exception {
         final ZooKeeper zk = CommTestUtils.createZkClient();
-        TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
-        String clusterName = "clusterA";
-        taskSetup.clean("s4");
-        taskSetup.setup(clusterName, 1, 10000);
 
-        Main.main(new String[] { "-cluster=" + clusterName, "-appClass=" + WordCountApp.class.getName(),
+        Main.main(new String[] { "-cluster=cluster1", "-appClass=" + WordCountApp.class.getName(),
                 "-extraModulesClasses=" + WordCountModule.class.getName() });
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);
@@ -69,20 +86,21 @@
         for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
             zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
         }
-        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
-        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
-        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
-        signalTextProcessed.await();
+        injectSentence(SENTENCE_1);
+        injectSentence(SENTENCE_2);
+        injectSentence(SENTENCE_3);
+        Assert.assertTrue(signalTextProcessed.await(10, TimeUnit.SECONDS));
         File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
         String s = CommTestUtils.readFile(results);
         Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
 
     }
 
-    @After
-    public void cleanup() throws IOException, InterruptedException {
-        CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
-
+    public void injectSentence(String sentence) throws IOException {
+        Event event = new Event();
+        event.put("sentence", String.class, sentence);
+        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+                .serialize(event)));
     }
 
 }
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java
index 745f521..b2a43a9 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java
@@ -1,26 +1,25 @@
 package org.apache.s4.wordcount;
 
+import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
 
-
 public class WordSplitterPE extends ProcessingElement {
-    
+
     private Stream<WordSeenEvent> wordSeenStream;
 
     public WordSplitterPE(App app) {
         super(app);
     }
 
-    public void onEvent(StringEvent event) {
-        StringEvent sentence = event;
-        String[] split = sentence.getString().split(" ");
+    public void onEvent(Event event) {
+        String[] split = event.get("sentence").split(" ");
         for (String word : split) {
             wordSeenStream.put(new WordSeenEvent(word));
         }
     }
-    
+
     public void setWordSeenStream(Stream<WordSeenEvent> stream) {
         this.wordSeenStream = stream;
     }
@@ -28,13 +27,13 @@
     @Override
     protected void onCreate() {
         // TODO Auto-generated method stub
-        
+
     }
 
     @Override
     protected void onRemove() {
         // TODO Auto-generated method stub
-        
+
     }
 
 }
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
index 668499a..d473065 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
@@ -96,11 +96,11 @@
         String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
                 + "zookeeper" + File.separator + "log").getAbsolutePath();
 
-        @Parameter(names = { "-t", "-testMode" }, description = "Launch Zookeeper instance and load a default cluster configuration for easy testing (2 clusters with following configs: {"
-                + TEST_MODE_CLUSTER_CONF_1 + "} and {" + TEST_MODE_CLUSTER_CONF_2 + "}")
+        @Parameter(names = { "-t", "-testMode" }, description = "Launch Zookeeper instance and load a default cluster configuration for easy testing (2 clusters with following configs: "
+                + TEST_MODE_CLUSTER_CONF_1 + " and " + TEST_MODE_CLUSTER_CONF_2 + "")
         boolean testMode = false;
 
-        @Parameter(names = "-clusters", description = "Inline clusters configuration, comma-separated list of curly-braces enclosed cluster definitions with format: {c=<cluster name>:flp=<first listening port for this cluster>:nbTasks=<number of tasks>} (Overrides default configuration for test mode)", converter = ClusterConfigsConverter.class)
+        @Parameter(names = "-clusters", description = "Inline clusters configuration, comma-separated list of cluster definitions with format: c=<cluster name>:flp=<first listening port for this cluster>:nbTasks=<number of tasks> (Overrides default configuration for test mode)", converter = ClusterConfigsConverter.class)
         List<ClusterConfig> clusterConfigs;
 
     }
@@ -109,7 +109,7 @@
 
         @Override
         public ClusterConfig convert(String arg) {
-            Pattern clusterConfigPattern = Pattern.compile("\\{(c=\\w+[:]flp=\\d+[:]nbTasks=\\d+)\\}");
+            Pattern clusterConfigPattern = Pattern.compile("(c=\\w+[:]flp=\\d+[:]nbTasks=\\d+)");
             logger.info("processing cluster configuration {}", arg);
             Matcher configMatcher = clusterConfigPattern.matcher(arg);
             if (!configMatcher.find()) {
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java
deleted file mode 100644
index 5ee7197..0000000
--- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package org.apache.s4.deploy;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.Stream;
-
-public class SocketAdapter {
-
-    static ServerSocket serverSocket;
-
-    /**
-     * Listens to incoming sentence and forwards them to a sentence Stream. Each sentence is sent through a new socket
-     * connection
-     * 
-     * @param stream
-     * @throws IOException
-     */
-    public SocketAdapter(final Stream<Event> stream) throws IOException {
-        Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                serverSocket = null;
-                Socket connectedSocket;
-                BufferedReader in = null;
-                try {
-                    serverSocket = new ServerSocket(12000);
-                    while (true) {
-                        connectedSocket = serverSocket.accept();
-                        in = new BufferedReader(new InputStreamReader(connectedSocket.getInputStream()));
-
-                        String line = in.readLine();
-                        System.out.println("read: " + line);
-                        Event event = new Event();
-                        event.put("line", String.class, line);
-                        stream.put(event);
-                        connectedSocket.close();
-                    }
-
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    System.exit(-1);
-                } finally {
-                    if (in != null) {
-                        try {
-                            in.close();
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                    if (serverSocket != null) {
-                        try {
-                            serverSocket.close();
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }
-            }
-        });
-        t.start();
-
-    }
-
-    public void close() {
-        if (serverSocket != null) {
-            try {
-                serverSocket.close();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-}
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
index bbfaf17..389562b 100644
--- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -1,8 +1,5 @@
 package org.apache.s4.deploy;
 
-import java.io.IOException;
-import java.util.ArrayList;
-
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.KeyFinder;
@@ -10,10 +7,11 @@
 import org.apache.s4.core.Stream;
 import org.apache.zookeeper.CreateMode;
 
+import com.google.common.collect.ImmutableList;
+
 public class TestApp extends App {
 
     private ZkClient zkClient;
-    private SocketAdapter socketAdapter;
 
     @Override
     protected void onClose() {
@@ -24,21 +22,12 @@
     @Override
     protected void onInit() {
         try {
-            try {
-                SimplePE prototype = createPE(SimplePE.class);
-                Stream<Event> stream = createStream("stream", new KeyFinder<Event>() {
-                    public java.util.List<String> get(Event event) {
-                        return new ArrayList<String>() {
-                            {
-                                add("line");
-                            }
-                        };
-                    }
-                }, prototype);
-                socketAdapter = new SocketAdapter(stream);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
+            SimplePE prototype = createPE(SimplePE.class);
+            Stream<Event> stream = createInputStream("inputStream", new KeyFinder<Event>() {
+                public java.util.List<String> get(Event event) {
+                    return ImmutableList.of("line");
+                }
+            }, prototype);
             zkClient = new ZkClient("localhost:" + 2181);
             if (!zkClient.exists("/s4-test")) {
                 zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
index f6b211c..2218ca5 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
@@ -23,6 +23,10 @@
     static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
     Map<String, Integer> countedTopics = Maps.newHashMap();
 
+    public TopNTopicPE() {
+        // required for checkpointing. Requirement to be lifted in 0.6
+    }
+
     public TopNTopicPE(App app) {
         super(app);
         logger.info("key: [{}]", getId());
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
index 28d87f2..90c3729 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
@@ -9,13 +9,17 @@
 // keyed by topic name
 public class TopicCountAndReportPE extends ProcessingElement {
 
-    Stream<TopicEvent> downStream;
-    int threshold = 10;
+    transient Stream<TopicEvent> downStream;
+    transient int threshold = 10;
     int count;
     boolean firstEvent = true;
 
     static Logger logger = LoggerFactory.getLogger(TopicCountAndReportPE.class);
 
+    public TopicCountAndReportPE() {
+        // required for checkpointing in S4 0.5. Requirement to be removed in 0.6
+    }
+
     public TopicCountAndReportPE(App app) {
         super(app);
     }
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
index 90c3199..fc4e3ae 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -1,24 +1,20 @@
 package org.apache.s4.example.twitter;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.App;
 import org.apache.s4.core.Stream;
+import org.apache.s4.core.ft.CheckpointingConfig;
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+
+import com.google.common.collect.ImmutableList;
 
 public class TwitterCounterApp extends App {
 
-    private ZkClient zkClient;
-
-    private Thread t;
-
     @Override
     protected void onClose() {
-        // TODO Auto-generated method stub
-
     }
 
     @Override
@@ -27,31 +23,29 @@
 
             TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
             topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
+            // we checkpoint this PE every 20s
+            topNTopicPE.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.TIME).frequency(20)
+                    .timeUnit(TimeUnit.SECONDS).build());
             @SuppressWarnings("unchecked")
             Stream<TopicEvent> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder<TopicEvent>() {
 
                 @Override
                 public List<String> get(final TopicEvent arg0) {
-                    return new ArrayList<String>() {
-                        {
-                            add("aggregationKey");
-                        }
-                    };
+                    return ImmutableList.of("aggregationKey");
                 }
             }, topNTopicPE);
 
             TopicCountAndReportPE topicCountAndReportPE = createPE(TopicCountAndReportPE.class);
             topicCountAndReportPE.setDownstream(aggregatedTopicStream);
             topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
+            // we checkpoint instances every 2 events
+            topicCountAndReportPE.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.EVENT_COUNT)
+                    .frequency(2).build());
             Stream<TopicEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicEvent>() {
 
                 @Override
                 public List<String> get(final TopicEvent arg0) {
-                    return new ArrayList<String>() {
-                        {
-                            add(arg0.getTopic());
-                        }
-                    };
+                    return ImmutableList.of(arg0.getTopic());
                 }
             }, topicCountAndReportPE);