Merge branch 'S4-85' into piper
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index 7efadcc..1f5d7e4 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -188,4 +188,16 @@
}
}
+
+ @Override
+ public String toString() {
+ Map<String, String> attributesAsMap = getAttributesAsMap();
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (Map.Entry<String, String> entry : attributesAsMap.entrySet()) {
+ sb.append("{" + entry.getKey() + ";" + entry.getValue() + "},");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
}
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
index f75a867..8d3e062 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
@@ -18,7 +18,12 @@
}
public Class<?> loadGeneratedClass(String name, byte[] bytes) {
- return defineClass(name, bytes, 0, bytes.length);
+ Class<?> clazz = findLoadedClass(name);
+ if (clazz == null) {
+ return defineClass(name, bytes, 0, bytes.length);
+ } else {
+ return clazz;
+ }
}
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
index 46f0a8e..2a9dc5c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
@@ -46,6 +46,7 @@
// System.out.println("QueueingListener: About to take message from queue");
return queue.take();
} catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
return null;
}
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 4472910..d2aa64f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -132,6 +132,7 @@
} catch (InterruptedException ie) {
logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
clusterNode.getPort()));
+ Thread.currentThread().interrupt();
}
return false;
}
@@ -182,7 +183,7 @@
bootstrap.releaseExternalResources();
} catch (InterruptedException ie) {
logger.error("Interrupted while closing");
- ie.printStackTrace();
+ Thread.currentThread().interrupt();
}
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index 3e905e6..d7d49bd 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -80,6 +80,7 @@
byte[] msg = handoffQueue.take();
return msg;
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
return null;
}
}
@@ -93,7 +94,7 @@
try {
channels.close().await();
} catch (InterruptedException e) {
- e.printStackTrace();
+ Thread.currentThread().interrupt();
}
bootstrap.releaseExternalResources();
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index 316999c..43bb470 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -74,6 +74,7 @@
try {
return handoffQueue.take();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
return null;
}
}
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index 2791da6..63e32b5 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -5,9 +5,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.io.PrintWriter;
import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -41,11 +39,9 @@
public static final int ZK_PORT = 2181;
public static final String ZK_STRING = "localhost:" + ZK_PORT;
- public static final int INITIAL_BOOKIE_PORT = 5000;
public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+ "storage");
- public static ServerSocket serverSocket;
static {
logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
}
@@ -233,6 +229,7 @@
try {
Thread.sleep(250);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
// ignore
}
}
@@ -281,6 +278,7 @@
try {
Thread.sleep(250);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
// ignore
}
}
@@ -295,12 +293,6 @@
}
- public static void stopSocketAdapter() throws IOException {
- if (serverSocket != null) {
- serverSocket.close();
- }
- }
-
/**
* gradle and eclipse have different directories for output files This is justified here
* http://gradle.1045684.n5.nabble.com/Changing-default-IDE-output-directories-td3335478.html#a3337433
@@ -330,21 +322,4 @@
}
- public static void injectIntoStringSocketAdapter(String string) throws IOException {
- Socket socket = null;
- PrintWriter writer = null;
- try {
- socket = new Socket("localhost", 12000);
- writer = new PrintWriter(socket.getOutputStream(), true);
- writer.println(string);
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(-1);
- } finally {
- if (socket != null) {
- socket.close();
- }
- }
- }
-
}
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index ad3bbbe..c3a3f52 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -20,6 +20,7 @@
compile project(":s4-base")
compile project(":s4-comm")
compile project(path: ':s4-comm', configuration: 'tests')
+ compile libraries.commons_codec
compile libraries.jcommander
testCompile project(path: ':s4-comm', configuration: 'tests')
testCompile libraries.gradle_base_services
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 0beec16..bc20e98 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,7 +26,7 @@
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.App.ClockType;
+import org.apache.s4.core.ft.CheckpointingFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +34,10 @@
import com.google.inject.Inject;
import com.google.inject.name.Named;
-/*
- * Container base class to hold all processing elements. We will implement administrative methods here.
+/**
+ * Container base class to hold all processing elements.
+ *
+ * It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
*/
public abstract class App {
@@ -73,6 +75,10 @@
@Named("cluster.name")
String clusterName;
+ // default is NoOpCheckpointingFramework
+ @Inject
+ CheckpointingFramework checkpointingFramework;
+
// serialization uses the application class loader
private SerializerDeserializer serDeser = new KryoSerDeser(getClass().getClassLoader());
@@ -111,7 +117,7 @@
}
/* Should only be used within the core package. */
- public void addStream(Streamable stream) {
+ public void addStream(Streamable<Event> stream) {
streams.add(stream);
}
@@ -262,6 +268,10 @@
return serDeser;
}
+ public CheckpointingFramework getCheckpointingFramework() {
+ return checkpointingFramework;
+ }
+
/**
* Creates a stream with a specific key finder. The event is delivered to the PE instances in the target PE
* prototypes by key.
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index b740a21..d441d59 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -11,6 +11,8 @@
import org.apache.s4.base.util.S4RLoaderFactory;
import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.ft.NoOpCheckpointingFramework;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.DistributedDeploymentManager;
import org.slf4j.Logger;
@@ -59,6 +61,10 @@
bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
bind(S4RLoaderFactory.class);
+
+ // For enabling checkpointing, one needs to use a custom module, such as
+ // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
+ bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
}
private void loadProperties(Binder binder) {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 5f1375b..ea7cfd6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -136,6 +136,7 @@
}
} catch (Exception e) {
logger.error("Cannot start S4 node", e);
+ System.exit(1);
}
}
@@ -160,10 +161,10 @@
@Parameter(names = "-appClass", description = "App class to load. This will disable dynamic downloading but allows to start apps directly. These app classes must have been loaded first, usually through a custom module.", required = false, hidden = true)
String appClass = null;
- @Parameter(names = "-extraModulesClasses", description = "additional configuration modules (they will be instantiated through their constructor without arguments).", variableArity = true, required = false, hidden = true)
+ @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false, hidden = false)
List<String> extraModulesClasses = new ArrayList<String>();
- @Parameter(names = { "-namedStringParameters", "-p" }, description = "Inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-namedStringParameters={name1=value1},{name2=value2} '", hidden = false, converter = InlineConfigParameterConverter.class)
+ @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
List<String> extraNamedParameters = new ArrayList<String>();
@Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
@@ -175,7 +176,7 @@
@Override
public String convert(String arg) {
- Pattern parameterPattern = Pattern.compile("\\{(\\S+=\\S+)\\}");
+ Pattern parameterPattern = Pattern.compile("(\\S+=\\S+)");
logger.info("processing inline configuration parameter {}", arg);
Matcher parameterMatcher = parameterPattern.matcher(arg);
if (!parameterMatcher.find()) {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 4dced75..730ff56 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -1,20 +1,7 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
package org.apache.s4.core;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.Collection;
import java.util.Map;
import java.util.Timer;
@@ -25,6 +12,10 @@
import net.jcip.annotations.ThreadSafe;
import org.apache.s4.base.Event;
+import org.apache.s4.core.ft.CheckpointId;
+import org.apache.s4.core.ft.CheckpointingConfig;
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+import org.apache.s4.core.ft.CheckpointingTask;
import org.apache.s4.core.gen.OverloadDispatcher;
import org.apache.s4.core.gen.OverloadDispatcherGenerator;
import org.slf4j.Logger;
@@ -34,6 +25,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
@@ -94,34 +86,41 @@
*/
public abstract class ProcessingElement implements Cloneable {
- private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
- private static final String SINGLETON = "singleton";
+ transient private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
+ transient private static final String SINGLETON = "singleton";
- protected App app;
+ transient protected App app;
/*
* This maps holds all the instances. We make it package private to prevent concrete classes from updating the
* collection.
*/
- Cache<String, ProcessingElement> peInstances;
+ transient Cache<String, ProcessingElement> peInstances;
/* This map is initialized in the prototype and cloned to instances. */
- Map<Class<? extends Event>, Trigger> triggers;
+ transient Map<Class<? extends Event>, Trigger> triggers;
/* PE instance id. */
String id = "";
/* Private fields. */
- private ProcessingElement pePrototype;
- private boolean haveTriggers = false;
- private long timerIntervalInMilliseconds = 0;
- private Timer timer;
- private boolean isPrototype = true;
- private boolean isThreadSafe = false;
- private String name = null;
- private boolean isSingleton = false;
+ transient private ProcessingElement pePrototype;
+ transient private boolean haveTriggers = false;
+ transient private long timerIntervalInMilliseconds = 0;
+ transient private Timer triggerTimer;
+ transient private Timer checkpointingTimer;
+ transient private boolean isPrototype = true;
+ transient private boolean isThreadSafe = false;
+ transient private String name = null;
+ transient private boolean isSingleton = false;
+ transient long eventCount = 0;
- private transient OverloadDispatcher overloadDispatcher;
+ transient private OverloadDispatcher overloadDispatcher;
+ transient private boolean recoveryAttempted = false;
+ transient private boolean dirty = false;
+
+ transient private CheckpointingConfig checkpointingConfig = new CheckpointingConfig.Builder(CheckpointingMode.NONE)
+ .build();
protected ProcessingElement() {
OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
@@ -211,7 +210,7 @@
return peInstances.size();
}
- Map<String, ProcessingElement> getPEInstances() {
+ public Map<String, ProcessingElement> getPEInstances() {
return peInstances.asMap();
}
@@ -314,8 +313,9 @@
/* Skip trigger checking overhead if there are no triggers. */
haveTriggers = true;
- if (timeUnit != null && timeUnit != TimeUnit.MILLISECONDS)
+ if (timeUnit != null && timeUnit != TimeUnit.MILLISECONDS) {
interval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
+ }
Trigger config = new Trigger(numEvents, interval);
@@ -374,14 +374,14 @@
Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
- if (timer != null) {
- timer.cancel();
+ if (triggerTimer != null) {
+ triggerTimer.cancel();
}
if (interval == 0)
return this;
- timer = new Timer();
+ triggerTimer = new Timer();
return this;
}
@@ -406,8 +406,11 @@
} else {
object = this;
}
-
synchronized (object) {
+ if (!recoveryAttempted) {
+ recover();
+ recoveryAttempted = true;
+ }
/* Dispatch onEvent() method. */
overloadDispatcher.dispatchEvent(this, event);
@@ -416,9 +419,26 @@
if (haveTriggers && isTrigger(event)) {
overloadDispatcher.dispatchTrigger(this, event);
}
+
+ eventCount++;
+
+ dirty = true;
+
+ if (isCheckpointable()) {
+ checkpoint();
+ }
}
}
+ protected boolean isCheckpointable() {
+ return getApp().checkpointingFramework.isCheckpointable(this);
+ }
+
+ public void checkpoint() {
+ getApp().getCheckpointingFramework().saveState(this);
+ clearDirty();
+ }
+
private boolean isTrigger(Event event) {
return isTrigger(event, event.getClass());
}
@@ -466,8 +486,8 @@
protected void removeAll() {
/* Close resources in prototype. */
- if (timer != null) {
- timer.cancel();
+ if (triggerTimer != null) {
+ triggerTimer.cancel();
logger.info("Timer stopped.");
}
@@ -503,12 +523,22 @@
}
/* Start timer. */
- if (timer != null) {
- timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
- logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
+ if (triggerTimer != null) {
+ triggerTimer
+ .scheduleAtFixedRate(new OnTimeTask(), timerIntervalInMilliseconds, timerIntervalInMilliseconds);
+ logger.debug("Started trigger timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
}
+ if (checkpointingConfig.mode == CheckpointingMode.TIME) {
+ checkpointingTimer = new Timer();
+ checkpointingTimer.scheduleAtFixedRate(new CheckpointingTask(this),
+ checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency),
+ checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency));
+ logger.debug("Started checkpointing timer for PE prototype [{}], ID [{}] with interval [{}].",
+ new String[] { this.getClass().getName(), id, String.valueOf(checkpointingConfig.frequency) });
+ }
+
/* Check if this PE is annotated as thread safe. */
if (getClass().isAnnotationPresent(ThreadSafe.class) == true) {
@@ -535,7 +565,7 @@
}
return peInstances.get(id);
} catch (ExecutionException e) {
- logger.error("Problem when trying to create a PE instance.", e);
+ logger.error("Problem when trying to create a PE instance for id {}", id, e);
}
return null;
}
@@ -544,8 +574,16 @@
* Get all the local instances. See notes in {@link #getInstanceForKey(String) getLocalInstanceForKey}
*/
public Collection<ProcessingElement> getInstances() {
-
- return peInstances.asMap().values();
+ try {
+ if (isSingleton) {
+ return ImmutableList.of(peInstances.get(SINGLETON));
+ } else {
+ return peInstances.asMap().values();
+ }
+ } catch (ExecutionException e) {
+ logger.error("Problem when trying to create a PE instance for id {}", id, e);
+ return null;
+ }
}
/**
@@ -672,6 +710,95 @@
app.peByName.put(name, this);
}
+ public CheckpointingConfig getCheckpointingConfig() {
+ return checkpointingConfig;
+ }
+
+ public void setCheckpointingConfig(CheckpointingConfig checkpointingConfig) {
+ this.checkpointingConfig = checkpointingConfig;
+ }
+
+ /**
+ * By default, the state of a PE instance is considered dirty whenever it processed an event. Some event may
+ * actually leave the state of the PE unchanged. PE implementations can therefore override this method to
+ * accommodate specific behaviors, by managing a custom "dirty" flag.
+ *
+ * <b>If this method is overriden, {@link #clearDirty()} method must also be overriden in order to correctly reflect
+ * the "dirty" state of the PE.</b>
+ */
+ public boolean isDirty() {
+ return dirty;
+ }
+
+ /**
+ * Dirty state is cleared after the PE has been serialized. PE implementations that maintain their "dirty" flag must
+ * override this method by clearing their internally managed "dirty" flag.
+ *
+ * <b>If this method is overriden, {@link #isDirty()} must also be overriden in order to correctly reflect the
+ * "dirty" state of the PE.</b>
+ */
+ public void clearDirty() {
+ this.dirty = false;
+ }
+
+ public byte[] serializeState() {
+ return getApp().getSerDeser().serialize(this);
+ }
+
+ public ProcessingElement deserializeState(byte[] loadedState) {
+ return (ProcessingElement) getApp().getSerDeser().deserialize(loadedState);
+ }
+
+ public void restoreState(ProcessingElement oldState) {
+ restoreFieldsForClass(oldState.getClass(), oldState);
+ }
+
+ protected void recover() {
+ byte[] serializedState = null;
+ try {
+ serializedState = getApp().getCheckpointingFramework().fetchSerializedState(new CheckpointId(this));
+ } catch (RuntimeException e) {
+ logger.error("Cannot fetch serialized stated for [{}/{}]: {}", new String[] {
+ getPrototype().getClass().getName(), getId(), e.getMessage() });
+ }
+ if (serializedState == null) {
+ return;
+ }
+ try {
+ ProcessingElement peInOldState = deserializeState(serializedState);
+ restoreState(peInOldState);
+ } catch (RuntimeException e) {
+ logger.error("Cannot restore state for key [" + new CheckpointId(this) + "]: " + e.getMessage(), e);
+ }
+ }
+
+ private void restoreFieldsForClass(Class<?> currentInOldStateClassHierarchy, ProcessingElement oldState) {
+ if (!ProcessingElement.class.isAssignableFrom(currentInOldStateClassHierarchy)) {
+ return;
+ } else {
+ Field[] fields = oldState.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) {
+ if (!Modifier.isPublic(field.getModifiers())) {
+ field.setAccessible(true);
+ }
+ try {
+ // TODO use reflectasm
+ field.set(this, field.get(oldState));
+ } catch (IllegalArgumentException e) {
+ logger.error("Cannot recover old state for this PE [{}]", e);
+ return;
+ } catch (IllegalAccessException e) {
+ logger.error("Cannot recover old state for this PE [{}]", e);
+ return;
+ }
+
+ }
+ }
+ restoreFieldsForClass(currentInOldStateClassHierarchy.getSuperclass(), oldState);
+ }
+ }
+
class Trigger {
final long intervalInMilliseconds;
final int intervalInEvents;
@@ -709,4 +836,21 @@
return active;
}
}
+
+ public long getEventCount() {
+ return eventCount;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(getClass().getName() + "/" + getId() + " ;");
+ if (isSingleton) {
+ sb.append("singleton ;");
+ }
+ sb.append(isThreadSafe ? "IS thread-safe ;" : "Not thread-safe ;");
+ sb.append("timerInterval=" + timerIntervalInMilliseconds + " ;");
+ return sb.toString();
+
+ }
+
}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index a7522bd..85d80f7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -187,9 +187,8 @@
.serialize(event)));
}
} catch (InterruptedException e) {
- e.printStackTrace();
logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
- System.exit(-1);
+ Thread.currentThread().interrupt();
}
}
@@ -201,9 +200,8 @@
try {
queue.put(event);
} catch (InterruptedException e) {
- e.printStackTrace();
logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
- System.exit(-1);
+ Thread.currentThread().interrupt();
}
}
@@ -305,6 +303,7 @@
} catch (InterruptedException e) {
logger.info("Closing stream {}.", name);
receiver.removeStream(this);
+ Thread.currentThread().interrupt();
return;
}
}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java
new file mode 100644
index 0000000..290d46a
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * <p>
+ * Identifier of PEs. It is used to identify checkpointed PEs in the storage backend.
+ * </p>
+ * <p>
+ * The storage backend is responsible for converting this identifier to whatever internal representation is most
+ * suitable for it.
+ * </p>
+ * <p>
+ * This class provides methods for getting a compact String representation of the identifier and for creating an
+ * identifier from a String representation.
+ * </p>
+ *
+ */
+public class CheckpointId {
+
+ private String prototypeId;
+ private String key;
+
+ private static final Pattern STRING_REPRESENTATION_PATTERN = Pattern.compile("\\[(\\S*)\\];\\[(\\S*)\\]");
+
+ public CheckpointId() {
+ }
+
+ public CheckpointId(ProcessingElement pe) {
+ this.prototypeId = pe.getPrototype().getClass().getName();
+ this.key = pe.getId();
+ }
+
+ /**
+ *
+ * @param prototypeID
+ * id of the PE as returned by {@link ProcessingElement#getId() getId()} method
+ * @param key
+ * keyed attribute(s)
+ */
+ public CheckpointId(String prototypeID, String key) {
+ super();
+ this.prototypeId = prototypeID;
+ this.key = key;
+ }
+
+ public CheckpointId(String keyAsString) {
+ Matcher matcher = STRING_REPRESENTATION_PATTERN.matcher(keyAsString);
+
+ try {
+ matcher.find();
+ prototypeId = "".equals(matcher.group(1)) ? null : matcher.group(1);
+ key = "".equals(matcher.group(2)) ? null : matcher.group(2);
+ } catch (IndexOutOfBoundsException e) {
+
+ }
+
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getPrototypeId() {
+ return prototypeId;
+ }
+
+ public String toString() {
+ return "[PROTO_ID];[KEY] --> " + getStringRepresentation();
+ }
+
+ public String getStringRepresentation() {
+ return "[" + (prototypeId == null ? "" : prototypeId) + "];[" + (key == null ? "" : key) + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return getStringRepresentation().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ CheckpointId other = (CheckpointId) obj;
+ if (key == null) {
+ if (other.key != null)
+ return false;
+ } else if (!key.equals(other.key))
+ return false;
+ if (prototypeId == null) {
+ if (other.prototypeId != null)
+ return false;
+ } else if (!prototypeId.equals(other.prototypeId))
+ return false;
+ return true;
+ }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java
new file mode 100644
index 0000000..eec765d
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java
@@ -0,0 +1,55 @@
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Checkpointing configuration: event count based vs time interval, frequency. User the {@link Builder} class to build
+ * instances.
+ *
+ */
+public class CheckpointingConfig {
+
+ /**
+ * Identifies the kind of checkpointing: time based, event count, or no checkpointing
+ *
+ */
+ public static enum CheckpointingMode {
+ TIME, EVENT_COUNT, NONE
+ }
+
+ public final CheckpointingMode mode;
+ public final int frequency;
+ public final TimeUnit timeUnit;
+
+ private CheckpointingConfig(CheckpointingMode mode, int frequency, TimeUnit timeUnit) {
+ this.mode = mode;
+ this.frequency = frequency;
+ this.timeUnit = timeUnit;
+ }
+
+ public static class Builder {
+ private CheckpointingMode mode;
+ private int frequency;
+ private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
+
+ public Builder(CheckpointingMode mode) {
+ this.mode = mode;
+ }
+
+ public Builder frequency(int frequency) {
+ this.frequency = frequency;
+ return this;
+ }
+
+ public Builder timeUnit(TimeUnit timeUnit) {
+ this.timeUnit = timeUnit;
+ return this;
+ }
+
+ public CheckpointingConfig build() {
+ return new CheckpointingConfig(mode, frequency, timeUnit);
+ }
+
+ }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java
new file mode 100644
index 0000000..48b4616
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java
@@ -0,0 +1,52 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ProcessingElement;
+
+import com.google.inject.ImplementedBy;
+
+/**
+ *
+ * This interface defines the functionalities offered by the checkpointing framework.
+ *
+ */
+@ImplementedBy(value = NoOpCheckpointingFramework.class)
+public interface CheckpointingFramework {
+
+ /**
+ * Serializes and stores state to the storage backend. Serialization and storage operations are asynchronous.
+ *
+ * @return a callback for getting notified of the result of the storage operation
+ */
+ StorageCallback saveState(ProcessingElement pe);
+
+ /**
+ * Fetches checkpoint data from storage for a given PE
+ *
+ * @param key
+ * safeKeeperId
+ * @return checkpoint data
+ */
+ byte[] fetchSerializedState(CheckpointId key);
+
+ /**
+ * Evaluates whether specified PE should be checkpointed, based on:
+ * <ul>
+ * <li>whether checkpointing enabled</li>
+ * <li>whether the pe is "dirty"</li>
+ * <li>the checkpointing frequency settings</li>
+ * </ul>
+ *
+ * This is used for count-based checkpointing intervals. Time-based checkpointing relies on the dirty flag when
+ * triggered.
+ *
+ * @param pe
+ * processing element to evaluate
+ * @return true if checkpointable, according to the above requirements
+ */
+ boolean isCheckpointable(ProcessingElement pe);
+
+ public enum StorageResultCode {
+ SUCCESS, FAILURE
+ }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java
new file mode 100644
index 0000000..dd6003c
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java
@@ -0,0 +1,33 @@
+package org.apache.s4.core.ft;
+
+import java.util.Map;
+import java.util.TimerTask;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * When checkpointing at regular time intervals, this class is used to actually perform the checkpoints. It iterates
+ * among all instances of the specified prototype, and checkpoints every eligible instance.
+ *
+ */
+public class CheckpointingTask extends TimerTask {
+
+ ProcessingElement prototype;
+
+ public CheckpointingTask(ProcessingElement prototype) {
+ super();
+ this.prototype = prototype;
+ }
+
+ @Override
+ public void run() {
+ Map<String, ProcessingElement> peInstances = prototype.getPEInstances();
+ for (Map.Entry<String, ProcessingElement> entry : peInstances.entrySet()) {
+ synchronized (entry.getValue()) {
+ if (entry.getValue().isDirty()) {
+ entry.getValue().checkpoint();
+ }
+ }
+ }
+ }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
new file mode 100644
index 0000000..3a95da6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * <p>
+ * Implementation of a file system backend storage to persist checkpoints.
+ * </p>
+ * <p>
+ * The file system may be the default local file system when running on a single machine, but should be a distributed
+ * file system such as NFS when running on a cluster.
+ * </p>
+ * <p>
+ * Checkpoints are stored in individual files (1 file = 1 safeKeeperId) in directories according to the following
+ * structure: <code>(storageRootpath)/prototypeId/safeKeeperId</code>
+ * </p>
+ *
+ */
+public class DefaultFileSystemStateStorage implements StateStorage {
+
+ private static Logger logger = LoggerFactory.getLogger(DefaultFileSystemStateStorage.class);
+ @Inject(optional = true)
+ @Named("s4.checkpointing.filesystem.storageRootPath")
+ String storageRootPath;
+
+ public DefaultFileSystemStateStorage() {
+ }
+
+ /**
+ * <p>
+ * Called by the dependency injection framework, after construction.
+ * <p/>
+ */
+ @Inject
+ public void init() {
+ checkStorageDir();
+ }
+
+ @Override
+ public byte[] fetchState(CheckpointId key) {
+ File file = safeKeeperID2File(key, storageRootPath);
+ if (file != null && file.exists()) {
+ logger.debug("Fetching " + file.getAbsolutePath() + "for : " + key);
+
+ try {
+ return Files.toByteArray(file);
+ } catch (IOException e) {
+ logger.error("Cannot read content from checkpoint file [" + file.getAbsolutePath() + "]", e);
+ return null;
+ }
+ } else {
+ return null;
+ }
+
+ }
+
+ @Override
+ public Set<CheckpointId> fetchStoredKeys() {
+ Set<CheckpointId> keys = new HashSet<CheckpointId>();
+ File rootDir = new File(storageRootPath);
+ File[] dirs = rootDir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.isDirectory();
+ }
+ });
+ for (File dir : dirs) {
+ File[] files = dir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return (file.isFile());
+ }
+ });
+ for (File file : files) {
+ keys.add(file2SafeKeeperID(file));
+ }
+ }
+ return keys;
+ }
+
+ // files kept as : root/<prototypeId>/encodedKeyWithFullInfo
+ private static File safeKeeperID2File(CheckpointId key, String storageRootPath) {
+
+ return new File(storageRootPath + File.separator + key.getPrototypeId() + File.separator
+ + Base64.encodeBase64URLSafeString(key.getStringRepresentation().getBytes()));
+ }
+
+ private static CheckpointId file2SafeKeeperID(File file) {
+ CheckpointId id = null;
+ id = new CheckpointId(new String(Base64.decodeBase64(file.getName())));
+ return id;
+ }
+
+ public void checkStorageDir() {
+ if (storageRootPath == null) {
+
+ File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+ + "storage");
+ storageRootPath = defaultStorageDir.getAbsolutePath();
+ logger.warn("Unspecified storage dir; using default dir: {}", defaultStorageDir.getAbsolutePath());
+ if (!defaultStorageDir.exists()) {
+ if (!(defaultStorageDir.mkdirs())) {
+ logger.error("Storage directory not specified, and cannot create default storage directory : "
+ + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void saveState(CheckpointId key, byte[] state, StorageCallback callback) {
+ File f = safeKeeperID2File(key, storageRootPath);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
+ }
+ if (!f.exists()) {
+ if (!f.getParentFile().exists()) {
+ // parent file has prototype id
+ if (!f.getParentFile().mkdirs()) {
+ callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE,
+ "Cannot create directory for storing PE [" + key.toString() + "] for prototype: "
+ + f.getParentFile().getAbsolutePath());
+ return;
+ }
+ }
+ } else {
+ if (!f.delete()) {
+ callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE,
+ "Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath() + "]");
+ return;
+ }
+ }
+
+ try {
+ Files.write(state, f);
+ callback.storageOperationResult(CheckpointingFramework.StorageResultCode.SUCCESS, key.toString());
+ } catch (FileNotFoundException e) {
+ logger.error("Cannot write checkpoint file [" + f.getAbsolutePath() + "]", e);
+ callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE, key.toString() + " : "
+ + e.getMessage());
+ } catch (IOException e) {
+ logger.error("Cannot write checkpoint file [" + f.getAbsolutePath() + "]", e);
+ callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE, key.toString() + " : "
+ + e.getMessage());
+ }
+
+ }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java
new file mode 100644
index 0000000..6994acd
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java
@@ -0,0 +1,36 @@
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates a checkpoint fetching operation.
+ *
+ */
+public class FetchTask implements Callable<byte[]> {
+
+ private static Logger logger = LoggerFactory.getLogger(FetchTask.class);
+
+ StateStorage stateStorage;
+ CheckpointId checkpointId;
+
+ public FetchTask(StateStorage stateStorage, CheckpointId checkpointId) {
+ super();
+ this.stateStorage = stateStorage;
+ this.checkpointId = checkpointId;
+ }
+
+ @Override
+ public byte[] call() throws Exception {
+ try {
+ byte[] result = stateStorage.fetchState(checkpointId);
+ return result;
+ } catch (Exception e) {
+ logger.error("Cannot fetch checkpoint data for {}", checkpointId, e);
+ throw e;
+ }
+ }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
new file mode 100644
index 0000000..dd984e6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
@@ -0,0 +1,15 @@
+package org.apache.s4.core.ft;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * Checkpointing module that uses the {@link DefaultFileSystemStateStorage} as a checkpointing backend.
+ *
+ */
+public class FileSystemBackendCheckpointingModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
+ bind(CheckpointingFramework.class).to(SafeKeeper.class);
+ }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java
new file mode 100644
index 0000000..28279ba
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import org.apache.log4j.Logger;
+import org.apache.s4.core.ft.CheckpointingFramework.StorageResultCode;
+
+/**
+ * A factory for creating storage callbacks that simply log callback results
+ *
+ */
+public class LoggingStorageCallbackFactory implements StorageCallbackFactory {
+
+ @Override
+ public StorageCallback createStorageCallback() {
+ return new StorageCallbackLogger();
+ }
+
+ /**
+ * A basic storage callback that simply logs results from storage operations
+ *
+ */
+ static class StorageCallbackLogger implements StorageCallback {
+
+ private static Logger logger = Logger.getLogger("s4-ft");
+
+ @Override
+ public void storageOperationResult(StorageResultCode code, Object message) {
+ if (StorageResultCode.SUCCESS == code) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Callback from storage: " + StorageResultCode.SUCCESS.name() + " : " + message);
+ }
+ } else {
+ logger.warn("Callback from storage: " + StorageResultCode.FAILURE.name() + " : " + message);
+ }
+ }
+ }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java
new file mode 100644
index 0000000..10387ce
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java
@@ -0,0 +1,26 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * Implementation of {@link CheckpointingFramework} that does NO checkpointing.
+ *
+ */
+public final class NoOpCheckpointingFramework implements CheckpointingFramework {
+
+ @Override
+ public StorageCallback saveState(ProcessingElement pe) {
+ return null;
+ }
+
+ @Override
+ public byte[] fetchSerializedState(CheckpointId key) {
+ return null;
+ }
+
+ @Override
+ public boolean isCheckpointable(ProcessingElement pe) {
+ return false;
+ }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
new file mode 100644
index 0000000..4c8358f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ *
+ * <p>
+ * This class is responsible for coordinating interactions between the S4 event processor and the checkpoint storage
+ * backend. In particular, it schedules asynchronous save tasks to be executed on the backend.
+ * </p>
+ *
+ *
+ *
+ */
+public final class SafeKeeper implements CheckpointingFramework {
+
+ private static final class UncaughtExceptionLogger implements UncaughtExceptionHandler {
+ String operationType;
+
+ public UncaughtExceptionLogger(String operationType) {
+ this.operationType = operationType;
+ }
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.error("Cannot execute checkpointing " + operationType + " operation", e);
+ }
+ }
+
+ private static Logger logger = LoggerFactory.getLogger(SafeKeeper.class);
+
+ @Inject
+ private StateStorage stateStorage;
+ @Inject(optional = true)
+ private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
+
+ private ThreadPoolExecutor storageThreadPool;
+ private ThreadPoolExecutor serializationThreadPool;
+ private ThreadPoolExecutor fetchingThreadPool;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.storageMaxThreads")
+ int storageMaxThreads = 1;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.storageThreadKeepAliveSeconds")
+ int storageThreadKeepAliveSeconds = 120;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.storageMaxOutstandingRequests")
+ int storageMaxOutstandingRequests = 1000;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.serializationMaxThreads")
+ int serializationMaxThreads = 1;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.serializationThreadKeepAliveSeconds")
+ int serializationThreadKeepAliveSeconds = 120;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.serializationMaxOutstandingRequests")
+ int serializationMaxOutstandingRequests = 1000;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.maxSerializationLockTime")
+ long maxSerializationLockTime = 1000;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingMaxThreads")
+ int fetchingMaxThreads = 1;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingThreadKeepAliveSeconds")
+ int fetchingThreadKeepAliveSeconds = 120;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingMaxWaitMs")
+ long fetchingMaxWaitMs = 1000;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingMaxConsecutiveFailuresBeforeDisabling")
+ int fetchingMaxConsecutiveFailuresBeforeDisabling = 10;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingDisabledDurationMs")
+ long fetchingDisabledDurationMs = 600000;
+
+ long fetchingDisabledInitTime = -1;
+ AtomicInteger fetchingCurrentConsecutiveFailures = new AtomicInteger();
+
+ public SafeKeeper() {
+ }
+
+ @Inject
+ private void init() {
+
+ // NOTE: those thread pools should be fine tuned according to backend and application load/requirements.
+ // For now:
+ // - number of threads and work queue size have overridable defaults
+ // - failures are logged
+ // - when storage queue is full, we throttle backwards to the serialization threadpool
+ // - when serialization queue is full, we abort execution for new entries
+ // - fetching uses a synchronous queue and therefore is a blocking operation, with a timeout
+
+ ThreadFactory storageThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-storage-%d")
+ .setUncaughtExceptionHandler(new UncaughtExceptionLogger("storage")).build();
+ storageThreadPool = new ThreadPoolExecutor(1, storageMaxThreads, storageThreadKeepAliveSeconds,
+ TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(storageMaxOutstandingRequests),
+ storageThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
+ storageThreadPool.allowCoreThreadTimeOut(true);
+
+ ThreadFactory serializationThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("Checkpointing-serialization-%d")
+ .setUncaughtExceptionHandler(new UncaughtExceptionLogger("serialization")).build();
+ serializationThreadPool = new ThreadPoolExecutor(1, serializationMaxThreads,
+ serializationThreadKeepAliveSeconds, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
+ serializationMaxOutstandingRequests), serializationThreadFactory,
+ new ThreadPoolExecutor.AbortPolicy());
+ serializationThreadPool.allowCoreThreadTimeOut(true);
+
+ ThreadFactory fetchingThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-fetching-%d")
+ .setUncaughtExceptionHandler(new UncaughtExceptionLogger("fetching")).build();
+ fetchingThreadPool = new ThreadPoolExecutor(1, fetchingMaxThreads, fetchingThreadKeepAliveSeconds,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), fetchingThreadFactory);
+ fetchingThreadPool.allowCoreThreadTimeOut(true);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.ft.CheckpointingFramework#saveState(org.apache.s4.core.ProcessingElement)
+ */
+ @Override
+ public StorageCallback saveState(ProcessingElement pe) {
+ StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+ Future<byte[]> futureSerializedState = null;
+ try {
+ futureSerializedState = serializeState(pe);
+ } catch (RejectedExecutionException e) {
+ // if (monitor != null) {
+ // monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1,
+ // S4_CORE_METRICS.toString());
+ // }
+ storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+ "Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["
+ + pe.getId() + "]" + " Remaining capacity for the serialization task queue is ["
+ + serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ + serializationThreadPool.getQueue().size() + "] ; maximum capacity is ["
+ + serializationThreadPool + "]");
+ return storageCallback;
+ }
+ submitSaveStateTask(new SaveStateTask(new CheckpointId(pe), futureSerializedState, storageCallback,
+ stateStorage), storageCallback);
+ return storageCallback;
+ }
+
+ private Future<byte[]> serializeState(ProcessingElement pe) {
+ Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe));
+ // if (monitor != null) {
+ // monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1,
+ // S4_CORE_METRICS.toString());
+ // }
+ return future;
+ }
+
+ private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) {
+ try {
+ storageThreadPool.execute(task);
+ // if (monitor != null) {
+ // monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1);
+ // }
+ } catch (RejectedExecutionException e) {
+ // if (monitor != null) {
+ // monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1);
+ // }
+ storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+ "Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is ["
+ + storageThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ + storageThreadPool.getQueue().size() + "] ; maximum capacity is ["
+ + storageMaxOutstandingRequests + "]");
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.ft.CheckpointingFramework#fetchSerializedState(org.apache.s4.core.ft.SafeKeeperId)
+ */
+ @Override
+ public byte[] fetchSerializedState(CheckpointId key) {
+
+ byte[] result = null;
+
+ if (fetchingCurrentConsecutiveFailures.get() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
+ if ((fetchingDisabledInitTime + fetchingDisabledDurationMs) < System.currentTimeMillis()) {
+ return null;
+ } else {
+ // reached time, reinit
+ fetchingCurrentConsecutiveFailures.set(0);
+ }
+ }
+ Future<byte[]> fetched = fetchingThreadPool.submit(new FetchTask(stateStorage, key));
+ try {
+ result = fetched.get(fetchingMaxWaitMs, TimeUnit.MILLISECONDS);
+ fetchingCurrentConsecutiveFailures.set(0);
+ return result;
+ } catch (TimeoutException te) {
+ logger.error("Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms",
+ key.getStringRepresentation(), fetchingMaxWaitMs);
+ } catch (InterruptedException e) {
+ logger.error(
+ "Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms because of an interruption",
+ key.getStringRepresentation(), fetchingMaxWaitMs);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ logger.error("Cannot fetch checkpoint from backend for key [{}] due to {}", key.getStringRepresentation(),
+ e.getCause().getClass().getName() + "/" + e.getCause().getMessage());
+ }
+ if (fetchingCurrentConsecutiveFailures.incrementAndGet() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
+ logger.trace(
+ "Due to {} successive checkpoint fetching failures, fetching is temporarily disabled for {} ms",
+ fetchingMaxConsecutiveFailuresBeforeDisabling, fetchingDisabledDurationMs);
+ fetchingDisabledInitTime = System.currentTimeMillis();
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean isCheckpointable(ProcessingElement pe) {
+ if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.NONE)) {
+ return false;
+ }
+ if (pe.getCheckpointingConfig().frequency > 0 && pe.isDirty()) {
+ if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.EVENT_COUNT)) {
+ if (pe.getEventCount() % pe.getCheckpointingConfig().frequency == 0) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java
new file mode 100644
index 0000000..79931a5
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * Encapsulates a checkpoint request. It is scheduled by the checkpointing framework.
+ *
+ */
+public class SaveStateTask implements Runnable {
+
+ private static Logger logger = LoggerFactory.getLogger(SaveStateTask.class);
+
+ CheckpointId safeKeeperId;
+ byte[] serializedState;
+ Future<byte[]> futureSerializedState = null;
+ StorageCallback storageCallback;
+ StateStorage stateStorage;
+
+ public SaveStateTask(CheckpointId safeKeeperId, byte[] state, StorageCallback storageCallback,
+ StateStorage stateStorage) {
+ super();
+ this.safeKeeperId = safeKeeperId;
+ this.serializedState = state;
+ this.storageCallback = storageCallback;
+ this.stateStorage = stateStorage;
+ }
+
+ public SaveStateTask(CheckpointId safeKeeperId, Future<byte[]> futureSerializedState,
+ StorageCallback storageCallback, StateStorage stateStorage) {
+ this.safeKeeperId = safeKeeperId;
+ this.futureSerializedState = futureSerializedState;
+ this.storageCallback = storageCallback;
+ this.stateStorage = stateStorage;
+ }
+
+ @Override
+ public void run() {
+ if (futureSerializedState != null) {
+ try {
+ // TODO parameterizable timeout
+ stateStorage.saveState(safeKeeperId, futureSerializedState.get(1000, TimeUnit.MILLISECONDS),
+ storageCallback);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ logger.warn("Cannot save checkpoint : " + safeKeeperId, e);
+ } catch (TimeoutException e) {
+ logger.warn("Cannot save checkpoint {} : could not serialize before timeout", safeKeeperId);
+ }
+ }
+ }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java
new file mode 100644
index 0000000..6cdebb4
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.Callable;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * Encaspulate a PE serialization operation. This operation locks the PE instance in order to avoid any inconsistent
+ * serialized state. If serialization is successful, the PE is marked as "not dirty".
+ *
+ */
+public class SerializeTask implements Callable<byte[]> {
+
+ ProcessingElement pe;
+
+ public SerializeTask(ProcessingElement pe) {
+ super();
+ this.pe = pe;
+ }
+
+ @Override
+ public byte[] call() throws Exception {
+ synchronized (pe) {
+ byte[] state = pe.serializeState();
+ pe.clearDirty();
+ return state;
+ }
+ }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java
new file mode 100644
index 0000000..cbe2bda
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.Set;
+
+import com.google.inject.Inject;
+
+/**
+ * <p>
+ * Defines the methods that must be implemented by a backend storage for checkpoints.
+ * </p>
+ *
+ * NOTE: the backend implementation usually needs some kind of initialization. The recommended place to do this is in a
+ * custom method annotated with {@link Inject} annotation, which will be called after the instance is constructed.
+ *
+ */
+public interface StateStorage {
+
+ /**
+ * Stores a checkpoint.
+ *
+ * <p>
+ * NOTE: we don't handle any failure/success return value, because all failure/success notifications go through the
+ * StorageCallback reference
+ * </p>
+ *
+ * @param key
+ * safeKeeperId
+ * @param state
+ * checkpoint data as a byte array
+ * @param callback
+ * callback for receiving notifications of storage operations. This callback is configurable
+ */
+ public void saveState(CheckpointId key, byte[] state, StorageCallback callback);
+
+ /**
+ * Fetches data for a stored checkpoint.
+ * <p>
+ * Must return null if storage does not contain this key.
+ * </p>
+ *
+ * @param key
+ * safeKeeperId for this checkpoint
+ *
+ * @return stored checkpoint data, or null if the storage does not contain data for the given key
+ */
+ public byte[] fetchState(CheckpointId key);
+
+ /**
+ * Fetches all stored safeKeeper Ids.
+ *
+ * @return all stored safeKeeper Ids.
+ */
+ public Set<CheckpointId> fetchStoredKeys();
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java
new file mode 100644
index 0000000..6676dd0
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+/**
+ *
+ * Callback for reporting the result of an asynchronous storage operation
+ *
+ */
+public interface StorageCallback {
+
+ /**
+ * Notifies the result of a storage operation
+ *
+ * @param resultCode
+ * code for the result : {@link SafeKeeper.StorageResultCode SafeKeeper.StorageResultCode}
+ * @param message
+ * whatever message object is suitable
+ */
+ public void storageOperationResult(CheckpointingFramework.StorageResultCode resultCode, Object message);
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java
new file mode 100644
index 0000000..c448399
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+/**
+ * A factory for creating storage callbacks. Storage callback implementations
+ * that can take specific actions upon success or failure of asynchronous
+ * storage operations.
+ *
+ */
+public interface StorageCallbackFactory {
+
+ /**
+ * Factory method
+ *
+ * @return returns a StorageCallback instance
+ */
+ public StorageCallback createStorageCallback();
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 12f5d2f..70fe4ec 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -6,16 +6,17 @@
import junit.framework.Assert;
-import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.comm.BareCommModule;
import org.apache.s4.core.triggers.TriggeredApp;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.s4.wordcount.StringEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
-import com.google.common.io.Resources;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -45,9 +46,7 @@
protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
final ZooKeeper zk = CommTestUtils.createZkClient();
- Injector injector = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
- new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+ Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
app = injector.getInstance(TriggeredApp.class);
app.init();
app.start();
@@ -61,7 +60,7 @@
CountDownLatch signalEvent1Triggered = new CountDownLatch(1);
CommTestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
- CommTestUtils.injectIntoStringSocketAdapter(time1);
+ app.stream.receiveEvent(new EventMessage("-1", "stream", app.getSerDeser().serialize(new StringEvent(time1))));
// check event processed
Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
@@ -69,5 +68,4 @@
// return latch on trigger signal
return signalEvent1Triggered;
}
-
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java
new file mode 100644
index 0000000..431e31e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java
@@ -0,0 +1,20 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
+import org.apache.s4.fixtures.CommTestUtils;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+public class CheckpointingModuleWithUnrespondingFetchingStorageBackend extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
+ CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath());
+ bind(StateStorage.class).to(StorageWithUnrespondingFetching.class);
+ bind(CheckpointingFramework.class).to(SafeKeeper.class);
+ bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
new file mode 100644
index 0000000..ef3fabd
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -0,0 +1,153 @@
+package org.apache.s4.core.ft;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.comm.BareCommModule;
+import org.apache.s4.core.App;
+import org.apache.s4.core.BareCoreModule;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class CheckpointingTest {
+
+ private static Factory zookeeperServerConnectionFactory = null;
+ public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("user.dir") + File.separator + "tmp");
+ public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+ + "storage");
+
+ @Before
+ public void prepare() throws Exception {
+ zookeeperServerConnectionFactory = CoreTestUtils.startZookeeperServer();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ CoreTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ }
+
+ @Test
+ public void testCheckpointStorage() throws Exception {
+ final ZooKeeper zk = CoreTestUtils.createZkClient();
+
+ // 2. generate a simple event that creates and changes the state of
+ // the
+ // PE
+
+ // NOTE: coordinate through zookeeper
+ final CountDownLatch signalValue1Set = new CountDownLatch(1);
+
+ CoreTestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+ final CountDownLatch signalCheckpointed = new CountDownLatch(1);
+ CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
+
+ Injector injector = Guice.createInjector(new BareCommModule(),
+ new MockCoreModuleWithFileBaseCheckpointingBackend());
+ TestApp app = injector.getInstance(TestApp.class);
+ app.init();
+ app.start();
+
+ Event event = new Event();
+ event.put("command", String.class, "setValue1");
+ event.put("value", String.class, "message1");
+
+ app.testStream.receiveEvent(new EventMessage("", "stream1", app.getSerDeser().serialize(event)));
+
+ signalValue1Set.await();
+
+ StatefulTestPE pe = (StatefulTestPE) app.getPE("statefulPE1").getInstanceForKey("X");
+
+ Assert.assertEquals("message1", pe.getValue1());
+ Assert.assertEquals("", pe.getValue2());
+
+ // 3. generate a checkpoint event
+ event = new Event();
+ event.put("command", String.class, "checkpoint");
+ app.testStream.receiveEvent(new EventMessage("", "stream1", app.getSerDeser().serialize(event)));
+ Assert.assertTrue(signalCheckpointed.await(10, TimeUnit.SECONDS));
+
+ // NOTE: the backend has asynchronous save operations
+ Thread.sleep(1000);
+
+ CheckpointId safeKeeperId = new CheckpointId(pe);
+ File expected = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator + "storage"
+ + File.separator + safeKeeperId.getPrototypeId() + File.separator
+ + Base64.encodeBase64URLSafeString(safeKeeperId.getStringRepresentation().getBytes()));
+
+ // 4. verify that state was correctly persisted
+ Assert.assertTrue(expected.exists());
+
+ StatefulTestPE refPE = new StatefulTestPE();
+ refPE.onCreate();
+ refPE.setValue1("message1");
+
+ Field idField = ProcessingElement.class.getDeclaredField("id");
+ idField.setAccessible(true);
+ idField.set(refPE, "X");
+
+ byte[] refBytes = app.getSerDeser().serialize(refPE);
+
+ Assert.assertTrue(Arrays.equals(refBytes, Files.toByteArray(expected)));
+
+ }
+
+ private static class TestApp extends App {
+ Stream<Event> testStream;
+ int count;
+
+ @Override
+ protected void onStart() {
+ }
+
+ @Override
+ protected void onInit() {
+
+ StatefulTestPE pe = createPE(StatefulTestPE.class, "statefulPE1");
+ testStream = createStream("stream1", new KeyFinder<Event>() {
+ @Override
+ public List<String> get(Event event) {
+ return ImmutableList.of("X");
+ }
+ }, pe);
+ }
+
+ @Override
+ protected void onClose() {
+ }
+
+ }
+
+ private static class MockCoreModuleWithFileBaseCheckpointingBackend extends BareCoreModule {
+
+ protected void configure() {
+ super.configure();
+ bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
+ bind(CheckpointingFramework.class).to(SafeKeeper.class);
+ bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
+ }
+
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountApp.java
new file mode 100644
index 0000000..770b219
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountApp.java
@@ -0,0 +1,18 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+import org.apache.s4.wordcount.WordCountApp;
+
+public class FTWordCountApp extends WordCountApp {
+
+ @Override
+ protected void onInit() {
+ super.onInit();
+ getPE("classifierPE").setCheckpointingConfig(
+ new CheckpointingConfig.Builder(CheckpointingMode.EVENT_COUNT).frequency(1).build());
+ getPE("counterPE").setCheckpointingConfig(
+ new CheckpointingConfig.Builder(CheckpointingMode.EVENT_COUNT).frequency(1).build());
+
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
new file mode 100644
index 0000000..8ab093f
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -0,0 +1,131 @@
+package org.apache.s4.core.ft;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.DefaultCoreModule;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.s4.wordcount.WordCountTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class FTWordCountTest extends ZkBasedTest {
+
+ private Process forkedS4App;
+
+ @After
+ public void clean() throws IOException, InterruptedException {
+ CoreTestUtils.killS4App(forkedS4App);
+ }
+
+ @Test
+ public void testCheckpointAndRecovery() throws Exception {
+
+ Injector injector = Guice.createInjector(
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+ new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+
+ TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
+
+ final ZooKeeper zk = CoreTestUtils.createZkClient();
+
+ restartNode();
+
+ CountDownLatch signalTextProcessed = new CountDownLatch(1);
+ CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+
+ // add authorizations for processing
+ for (int i = 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+
+ CountDownLatch signalSentence1Processed = new CountDownLatch(1);
+ CoreTestUtils.watchAndSignalCreation("/classifierIteration_" + WordCountTest.SENTENCE_1_TOTAL_WORDS,
+ signalSentence1Processed, zk);
+
+ injectSentence(injector, emitter, WordCountTest.SENTENCE_1);
+ signalSentence1Processed.await(10, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+
+ // crash the app
+ forkedS4App.destroy();
+
+ restartNode();
+ // add authorizations for continuing processing. Without these, the
+ // WordClassifier processed keeps waiting
+ for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS + 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS
+ + WordCountTest.SENTENCE_2_TOTAL_WORDS; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+
+ CountDownLatch sentence2Processed = new CountDownLatch(1);
+ CoreTestUtils
+ .watchAndSignalCreation("/classifierIteration_"
+ + (WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS),
+ sentence2Processed, zk);
+
+ injectSentence(injector, emitter, WordCountTest.SENTENCE_2);
+
+ sentence2Processed.await(10, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+
+ // crash the app
+ forkedS4App.destroy();
+ restartNode();
+
+ // add authorizations for continuing processing. Without these, the
+ // WordClassifier processed keeps waiting
+ for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS + 1; i <= WordCountTest.TOTAL_WORDS; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+ injectSentence(injector, emitter, WordCountTest.SENTENCE_3);
+ signalTextProcessed.await(10, TimeUnit.SECONDS);
+ File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+ if (!results.exists()) {
+ // in case the results file isn't ready yet
+ Thread.sleep(1000);
+ results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+ }
+ String s = CoreTestUtils.readFile(results);
+ Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+ }
+
+ private void injectSentence(Injector injector, TCPEmitter emitter, String sentence) {
+ Event event;
+ event = new Event();
+ event.put("sentence", String.class, sentence);
+ emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+ .serialize(event)));
+ }
+
+ private void restartNode() throws IOException, InterruptedException {
+ CountDownLatch signalConsumerReady = RecoveryTest.getConsumerReadySignal("inputStream");
+
+ // recovering and making sure checkpointing still works
+ forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass",
+ FTWordCountApp.class.getName(), "-p",
+ "s4.checkpointing.filesystem.storageRootPath=" + CommTestUtils.DEFAULT_STORAGE_DIR,
+ "-extraModulesClasses", FileSystemBackendCheckpointingModule.class.getName() });
+ Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
new file mode 100644
index 0000000..3787ef7
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
@@ -0,0 +1,50 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ft.CheckpointingFramework.StorageResultCode;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.google.inject.name.Names;
+
+/**
+ * Creates the /checkpointed znode if a successful checkpointing callback is received. Does it only once.
+ *
+ */
+public class FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule extends
+ FileSystemBackendCheckpointingModule {
+
+ @Override
+ protected void configure() {
+ super.configure();
+ bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
+ bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
+ CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath());
+ }
+
+ public static class DummyZKStorageCallbackFactory implements StorageCallbackFactory {
+
+ @Override
+ public StorageCallback createStorageCallback() {
+ return new StorageCallback() {
+
+ @Override
+ public void storageOperationResult(StorageResultCode resultCode, Object message) {
+ if (resultCode == StorageResultCode.SUCCESS) {
+ try {
+ ZooKeeper zkClient = CoreTestUtils.createZkClient();
+ zkClient.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ };
+ };
+
+ }
+
+ }
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
new file mode 100644
index 0000000..326e1a1
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -0,0 +1,158 @@
+package org.apache.s4.core.ft;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.DefaultCoreModule;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class RecoveryTest extends ZkBasedTest {
+
+ private Process forkedS4App = null;
+
+ @After
+ public void cleanup() throws Exception {
+ CoreTestUtils.killS4App(forkedS4App);
+ }
+
+ @Test
+ public void testCheckpointRestorationThroughApplicationEvent() throws Exception {
+
+ testCheckpointingConfiguration(S4AppWithManualCheckpointing.class,
+ FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.class, true,
+ "value1=message1 ; value2=message2");
+
+ }
+
+ @Test
+ public void testEventCountBasedCheckpointingAndRecovery() throws Exception {
+
+ testCheckpointingConfiguration(S4AppWithCountBasedCheckpointing.class,
+ FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.class, false,
+ "value1=message1 ; value2=message2");
+
+ }
+
+ @Test
+ public void testTimeBasedCheckpointingAndRecovery() throws Exception {
+ testCheckpointingConfiguration(S4AppWithTimeBasedCheckpointing.class,
+ FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.class, false,
+ "value1=message1 ; value2=message2");
+ }
+
+ @Test
+ public void testTimingOutRecovery() throws Exception {
+ testCheckpointingConfiguration(S4AppWithCountBasedCheckpointing.class,
+ CheckpointingModuleWithUnrespondingFetchingStorageBackend.class, false, "value1= ; value2=message2");
+ }
+
+ private void insertCheckpointInstruction(Injector injector, TCPEmitter emitter) {
+ Event event;
+ event = new Event();
+ event.put("command", String.class, "checkpoint");
+ emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+ .serialize(event)));
+ }
+
+ private void testCheckpointingConfiguration(Class<?> appClass, Class<?> backendModuleClass,
+ boolean manualCheckpointing, String expectedFinalResult) throws IOException, InterruptedException,
+ KeeperException {
+ // here checkpointing is automatic for every event: no need to send a "checkpoint" event. The checkpointing
+ // configuration is specified in the app (S4AppWithCountBasedCheckpointing class)
+
+ final ZooKeeper zk = CoreTestUtils.createZkClient();
+
+ // use a latch for waiting for app to be ready
+ CountDownLatch signalConsumerReady = getConsumerReadySignal("inputStream");
+
+ // 1. instantiate remote S4 app
+ forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass", appClass.getName(),
+ "-extraModulesClasses", backendModuleClass.getName() });
+
+ Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
+
+ CountDownLatch signalValue1Set = new CountDownLatch(1);
+ CoreTestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+ final CountDownLatch signalCheckpointed = new CountDownLatch(1);
+ CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
+
+ Injector injector = Guice.createInjector(
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+ new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+
+ TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
+
+ Event event = new Event();
+ event.put("command", String.class, "setValue1");
+ event.put("value", String.class, "message1");
+ emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+ .serialize(event)));
+
+ if (manualCheckpointing) {
+ insertCheckpointInstruction(injector, emitter);
+ }
+
+ Assert.assertTrue(signalCheckpointed.await(10, TimeUnit.SECONDS));
+
+ forkedS4App.destroy();
+
+ zk.delete("/data", -1);
+
+ signalConsumerReady = getConsumerReadySignal("inputStream");
+ forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass",
+ S4AppWithManualCheckpointing.class.getName(), "-extraModulesClasses", backendModuleClass.getName() });
+
+ Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
+ // // trigger recovery by sending application event to set value 2
+ CountDownLatch signalValue2Set = new CountDownLatch(1);
+ CoreTestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
+
+ event = new Event();
+ event.put("command", String.class, "setValue2");
+ event.put("value", String.class, "message2");
+ emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+ .serialize(event)));
+
+ Assert.assertTrue(signalValue2Set.await(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(expectedFinalResult, new String(zk.getData("/data", false, null)));
+ }
+
+ public static CountDownLatch getConsumerReadySignal(String streamName) {
+ final CountDownLatch signalAppReady = new CountDownLatch(1);
+
+ ZkClient zkClient = new ZkClient("localhost:" + CoreTestUtils.ZK_PORT);
+ // TODO check a proper app state variable. This is hacky
+ zkClient.subscribeChildChanges("/s4/streams/" + streamName + "/consumers", new IZkChildListener() {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ if (currentChilds.size() == 1) {
+ signalAppReady.countDown();
+ }
+
+ }
+ });
+ return signalAppReady;
+ }
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithCountBasedCheckpointing.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithCountBasedCheckpointing.java
new file mode 100644
index 0000000..0ad4c0d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithCountBasedCheckpointing.java
@@ -0,0 +1,24 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+
+public class S4AppWithCountBasedCheckpointing extends App {
+
+ @Override
+ protected void onStart() {
+ }
+
+ @Override
+ protected void onInit() {
+ StatefulTestPE pe = createPE(StatefulTestPE.class);
+ pe.setSingleton(true);
+ pe.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.EVENT_COUNT).frequency(1).build());
+ createInputStream("inputStream", pe);
+ }
+
+ @Override
+ protected void onClose() {
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithManualCheckpointing.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithManualCheckpointing.java
new file mode 100644
index 0000000..2971c57
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithManualCheckpointing.java
@@ -0,0 +1,29 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.App;
+
+/**
+ *
+ *
+ */
+public class S4AppWithManualCheckpointing extends App {
+
+ @Override
+ protected void onStart() {
+
+ }
+
+ @Override
+ protected void onInit() {
+ StatefulTestPE pe = createPE(StatefulTestPE.class);
+ pe.setSingleton(true);
+ createInputStream("inputStream", pe);
+ }
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithTimeBasedCheckpointing.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithTimeBasedCheckpointing.java
new file mode 100644
index 0000000..910f882
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/S4AppWithTimeBasedCheckpointing.java
@@ -0,0 +1,28 @@
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+
+public class S4AppWithTimeBasedCheckpointing extends App {
+
+ @Override
+ protected void onStart() {
+ }
+
+ @Override
+ protected void onInit() {
+ StatefulTestPE pe = createPE(StatefulTestPE.class);
+ pe.setSingleton(true);
+ // checkpoints (if applicable) every 1 ms!
+ pe.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.TIME).frequency(1)
+ .timeUnit(TimeUnit.MILLISECONDS).build());
+ createInputStream("inputStream", pe);
+ }
+
+ @Override
+ protected void onClose() {
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StatefulTestPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StatefulTestPE.java
new file mode 100644
index 0000000..70e321d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StatefulTestPE.java
@@ -0,0 +1,97 @@
+package org.apache.s4.core.ft;
+
+import java.io.IOException;
+
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class StatefulTestPE extends ProcessingElement {
+
+ String id;
+ String value1 = "";
+ String value2 = "";
+ transient ZooKeeper zk = null;
+
+ public void onEvent(org.apache.s4.base.Event event) {
+ try {
+
+ if ("setValue1".equals(event.get("command"))) {
+ setValue1(event.get("value"));
+ zk.create("/value1Set", event.get("value").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } else if ("setValue2".equals(event.get("command"))) {
+ setValue2(event.get("value"));
+ zk.create("/value2Set", event.get("value").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } else if ("checkpoint".equals(event.get("command"))) {
+ checkpoint();
+ } else {
+ throw new RuntimeException("unidentified event: " + event);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public String getValue1() {
+ return value1;
+ }
+
+ public void setValue1(String value1) {
+ this.value1 = value1;
+ persistValues();
+ }
+
+ public String getValue2() {
+ return value2;
+ }
+
+ public void setValue2(String value2) {
+ this.value2 = value2;
+ persistValues();
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ // NOTE: we use a file as a simple way to keep track of changes
+ private void persistValues() {
+
+ try {
+ try {
+ zk.delete("/data", -1);
+ } catch (NoNodeException ignored) {
+ }
+ zk.create("/data", ("value1=" + value1 + " ; value2=" + value2).getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void onCreate() {
+ try {
+ zk = CoreTestUtils.createZkClient();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StorageWithUnrespondingFetching.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StorageWithUnrespondingFetching.java
new file mode 100644
index 0000000..2906fe8
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/StorageWithUnrespondingFetching.java
@@ -0,0 +1,45 @@
+package org.apache.s4.core.ft;
+
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class StorageWithUnrespondingFetching implements StateStorage {
+
+ @Inject
+ DefaultFileSystemStateStorage storage = new DefaultFileSystemStateStorage();
+
+ @Inject
+ @Named("s4.checkpointing.filesystem.storageRootPath")
+ String storageRootPath;
+
+ @Inject
+ private void initStorageRootPath() {
+ // manual init because we are not directly injecting the default file system storage, but creating a new
+ // instance in this class
+ storage.storageRootPath = this.storageRootPath;
+ }
+
+ @Override
+ public void saveState(CheckpointId key, byte[] state, StorageCallback callback) {
+ storage.saveState(key, state, callback);
+ }
+
+ @Override
+ public byte[] fetchState(CheckpointId key) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Thread.currentThread().interrupt();
+ }
+ return storage.fetchState(key);
+ }
+
+ @Override
+ public Set<CheckpointId> fetchStoredKeys() {
+ return storage.fetchStoredKeys();
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java
index e72073c..6f1cc9f 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggerablePE.java
@@ -31,8 +31,7 @@
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ Thread.currentThread().interrupt();
}
}
@@ -55,8 +54,7 @@
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ Thread.currentThread().interrupt();
}
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java
index 849d7b2..e2a1b38 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredApp.java
@@ -1,21 +1,18 @@
package org.apache.s4.core.triggers;
-import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.Stream;
import org.apache.s4.core.TriggerTest;
-import org.apache.s4.fixtures.SocketAdapter;
import org.apache.s4.wordcount.SentenceKeyFinder;
-import org.apache.s4.wordcount.StringEvent;
import com.google.inject.Inject;
public class TriggeredApp extends App {
- SocketAdapter<StringEvent> socketAdapter;
+ public Stream<Event> stream;
@Inject
public TriggeredApp() {
@@ -32,7 +29,7 @@
protected void onInit() {
TriggerablePE prototype = createPE(TriggerablePE.class);
- Stream<StringEvent> stream = createStream("stream", new SentenceKeyFinder(), prototype);
+ stream = createStream("stream", new SentenceKeyFinder(), prototype);
switch (TriggerTest.triggerType) {
case COUNT_BASED:
prototype.setTrigger(Event.class, 1, 0, TimeUnit.SECONDS);
@@ -43,11 +40,6 @@
break;
}
- try {
- socketAdapter = new SocketAdapter<StringEvent>(stream, new SocketAdapter.SentenceEventFactory());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
}
@Override
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 108a144..620494f 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -15,23 +15,30 @@
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.junit.After;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
@@ -45,12 +52,11 @@
* - ... or from a web server
*
*/
-public class TestAutomaticDeployment {
+public class TestAutomaticDeployment extends ZkBasedTest {
private Factory zookeeperServerConnectionFactory;
private Process forkedNode;
private ZkClient zkClient;
- private static final String CLUSTER_NAME = "clusterZ";
private HttpServer httpServer;
private static File tmpAppsDir;
@@ -95,7 +101,7 @@
ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/app/s4App", record, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/cluster1/app/s4App", record, CreateMode.PERSISTENT);
Assert.assertTrue(signalAppInitialized.await(20, TimeUnit.SECONDS));
Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
@@ -106,7 +112,16 @@
CommTestUtils
.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, CommTestUtils.createZkClient());
- CoreTestUtils.injectIntoStringSocketAdapter(time1);
+ Injector injector = Guice.createInjector(
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+ new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+
+ TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
+
+ Event event = new Event();
+ event.put("line", String.class, time1);
+ emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+ .serialize(event)));
// check event processed
Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
@@ -148,17 +163,13 @@
// current package .
// 1. start s4 nodes. Check that no app is deployed.
- TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
- taskSetup.clean(CLUSTER_NAME);
- taskSetup.setup(CLUSTER_NAME, 1, 1300);
-
zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
zkClient.setZkSerializer(new ZNRecordSerializer());
- List<String> processes = zkClient.getChildren("/s4/clusters/" + CLUSTER_NAME + "/process");
+ List<String> processes = zkClient.getChildren("/s4/clusters/cluster1/process");
Assert.assertTrue(processes.size() == 0);
final CountDownLatch signalProcessesReady = new CountDownLatch(1);
- zkClient.subscribeChildChanges("/s4/clusters/" + CLUSTER_NAME + "/process", new IZkChildListener() {
+ zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
@@ -169,7 +180,7 @@
}
});
- forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CLUSTER_NAME });
+ forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" });
// TODO synchro with ready state from zk
Thread.sleep(10000);
@@ -177,22 +188,19 @@
}
- @Before
- public void prepare() throws Exception {
- CommTestUtils.cleanupTmpDirs();
- zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
- final ZooKeeper zk = CommTestUtils.createZkClient();
- try {
- zk.delete("/simpleAppCreated", -1);
- } catch (Exception ignored) {
- }
-
- zk.close();
- }
+ // @Before
+ // public void clean() throws Exception {
+ // final ZooKeeper zk = CommTestUtils.createZkClient();
+ // try {
+ // zk.delete("/simpleAppCreated", -1);
+ // } catch (Exception ignored) {
+ // }
+ //
+ // zk.close();
+ // }
@After
public void cleanup() throws Exception {
- CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
CommTestUtils.killS4App(forkedNode);
if (httpServer != null) {
httpServer.stop(0);
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 832ce1b..00a7611 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -7,7 +7,6 @@
import junit.framework.Assert;
-import org.apache.s4.core.App;
import org.apache.s4.core.Main;
import org.gradle.tooling.BuildLauncher;
import org.gradle.tooling.GradleConnector;
@@ -25,10 +24,6 @@
*/
public class CoreTestUtils extends CommTestUtils {
- public static Process forkS4App(Class<?> moduleClass, Class<?> appClass) throws IOException, InterruptedException {
- return forkProcess(App.class.getName(), -1, moduleClass.getName(), appClass.getName());
- }
-
public static Process forkS4Node() throws IOException, InterruptedException {
return forkS4Node(new String[] {});
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
deleted file mode 100644
index c1815bf..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-import org.apache.s4.core.Stream;
-import org.apache.s4.wordcount.KeyValueEvent;
-import org.apache.s4.wordcount.StringEvent;
-
-public class SocketAdapter<T extends StringEvent> {
-
- static ServerSocket serverSocket;
-
- /**
- * Listens to incoming sentence and forwards them to a sentence Stream. Each sentence is sent through a new socket
- * connection
- *
- * @param stream
- * @throws IOException
- */
- public SocketAdapter(final Stream<T> stream, final StringEventFactory<T> stringEventFactory) throws IOException {
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- serverSocket = null;
- Socket connectedSocket;
- BufferedReader in = null;
- try {
- serverSocket = new ServerSocket(12000);
- while (true) {
- connectedSocket = serverSocket.accept();
- in = new BufferedReader(new InputStreamReader(connectedSocket.getInputStream()));
-
- String line = in.readLine();
- System.out.println("read: " + line);
- stream.put(stringEventFactory.create(line));
- connectedSocket.close();
- }
-
- } catch (IOException e) {
- e.printStackTrace();
- // System.exit(-1);
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- if (serverSocket != null) {
- try {
- serverSocket.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
- });
- t.start();
-
- }
-
- public void close() {
- if (serverSocket != null) {
- try {
- serverSocket.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- interface StringEventFactory<T> {
- T create(String string);
- }
-
- public static class SentenceEventFactory implements StringEventFactory<StringEvent> {
-
- @Override
- public StringEvent create(String string) {
- return new StringEvent(string);
- }
-
- }
-
- public static class KeyValueEventFactory implements StringEventFactory<KeyValueEvent> {
-
- @Override
- public KeyValueEvent create(String string) {
- return new KeyValueEvent(string);
- }
-
- }
-
-}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java
index 2401f70..b234ff3 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/SentenceKeyFinder.java
@@ -1,19 +1,19 @@
package org.apache.s4.wordcount;
-
-import java.util.ArrayList;
import java.util.List;
+import org.apache.s4.base.Event;
import org.apache.s4.base.KeyFinder;
-public class SentenceKeyFinder implements KeyFinder<StringEvent> {
+import com.google.common.collect.ImmutableList;
+
+public class SentenceKeyFinder implements KeyFinder<Event> {
private static final String SENTENCE_KEY = "sentence";
- @SuppressWarnings("serial")
@Override
- public List<String> get(StringEvent event) {
- return new ArrayList<String>(){{add(SENTENCE_KEY);}};
+ public List<String> get(Event event) {
+ return ImmutableList.of(SENTENCE_KEY);
}
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java
index 278b752..cda0ab4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountApp.java
@@ -1,17 +1,14 @@
package org.apache.s4.wordcount;
-import java.io.IOException;
-
+import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.Stream;
-import org.apache.s4.fixtures.SocketAdapter;
import com.google.inject.Inject;
public class WordCountApp extends App {
protected boolean checkpointing = false;
- SocketAdapter<StringEvent> socketAdapter;
@Inject
public WordCountApp() {
@@ -27,42 +24,21 @@
@Override
protected void onInit() {
- WordClassifierPE wordClassifierPrototype = createPE(WordClassifierPE.class);
+ WordClassifierPE wordClassifierPrototype = createPE(WordClassifierPE.class, "classifierPE");
Stream<WordCountEvent> wordCountStream = createStream("words counts stream", new WordCountKeyFinder(),
wordClassifierPrototype);
- WordCounterPE wordCounterPrototype = createPE(WordCounterPE.class);
+ WordCounterPE wordCounterPrototype = createPE(WordCounterPE.class, "counterPE");
// wordCounterPrototype.setTrigger(WordSeenEvent.class, 1, 0, null);
wordCounterPrototype.setWordClassifierStream(wordCountStream);
Stream<WordSeenEvent> wordSeenStream = createStream("words seen stream", new WordSeenKeyFinder(),
wordCounterPrototype);
WordSplitterPE wordSplitterPrototype = createPE(WordSplitterPE.class);
wordSplitterPrototype.setWordSeenStream(wordSeenStream);
- Stream<StringEvent> sentenceStream = createStream("sentences stream", new SentenceKeyFinder(),
- wordSplitterPrototype);
-
- try {
- socketAdapter = new SocketAdapter<StringEvent>(sentenceStream, new SocketAdapter.SentenceEventFactory());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- if (checkpointing) {
-
- // TODO move to subclass
-
- // checkpoint word classifier because it maintains a sync counter
- // for the test
- // LoggerFactory.getLogger(getClass()).info("setting checkpointing for word classifier and word counter");
- // wordClassifierPrototype.setCheckpointingFrequency(1);
- // // checkpoint word counter because it maintains word counts
- // wordCounterPrototype.setCheckpointingFrequency(1);
- }
-
+ Stream<Event> sentenceStream = createInputStream("inputStream", new SentenceKeyFinder(), wordSplitterPrototype);
}
@Override
protected void onClose() {
- socketAdapter.close();
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 2365a48..5987e4a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -3,22 +3,30 @@
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
-import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.core.Main;
import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class WordCountTest {
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class WordCountTest extends ZkBasedTest {
public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
public static final int SENTENCE_1_TOTAL_WORDS = SENTENCE_1.split(" ").length;
@@ -28,12 +36,25 @@
public static final int SENTENCE_3_TOTAL_WORDS = SENTENCE_3.split(" ").length;
public static final String FLAG = ";";
public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
- private static Factory zookeeperServerConnectionFactory;
+ private TCPEmitter emitter;
+ private Injector injector;
+
+ // private static Factory zookeeperServerConnectionFactory;
+
+ // @Before
+ // public void prepare() throws IOException, InterruptedException, KeeperException {
+ // CommTestUtils.cleanupTmpDirs();
+ // zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
+ //
+ // }
@Before
- public void prepare() throws IOException, InterruptedException, KeeperException {
- CommTestUtils.cleanupTmpDirs();
- zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
+ public void prepareEmitter() throws IOException {
+ injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+ .openStream(), "cluster1"), new DefaultCoreModule(Resources.getResource("default.s4.core.properties")
+ .openStream()));
+
+ emitter = injector.getInstance(TCPEmitter.class);
}
@@ -54,12 +75,8 @@
@Test
public void testSimple() throws Exception {
final ZooKeeper zk = CommTestUtils.createZkClient();
- TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
- String clusterName = "clusterA";
- taskSetup.clean("s4");
- taskSetup.setup(clusterName, 1, 10000);
- Main.main(new String[] { "-cluster=" + clusterName, "-appClass=" + WordCountApp.class.getName(),
+ Main.main(new String[] { "-cluster=cluster1", "-appClass=" + WordCountApp.class.getName(),
"-extraModulesClasses=" + WordCountModule.class.getName() });
CountDownLatch signalTextProcessed = new CountDownLatch(1);
@@ -69,20 +86,21 @@
for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
- CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
- CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
- CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
- signalTextProcessed.await();
+ injectSentence(SENTENCE_1);
+ injectSentence(SENTENCE_2);
+ injectSentence(SENTENCE_3);
+ Assert.assertTrue(signalTextProcessed.await(10, TimeUnit.SECONDS));
File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
String s = CommTestUtils.readFile(results);
Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
}
- @After
- public void cleanup() throws IOException, InterruptedException {
- CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
-
+ public void injectSentence(String sentence) throws IOException {
+ Event event = new Event();
+ event.put("sentence", String.class, sentence);
+ emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+ .serialize(event)));
}
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java
index 745f521..b2a43a9 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitterPE.java
@@ -1,26 +1,25 @@
package org.apache.s4.wordcount;
+import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Stream;
-
public class WordSplitterPE extends ProcessingElement {
-
+
private Stream<WordSeenEvent> wordSeenStream;
public WordSplitterPE(App app) {
super(app);
}
- public void onEvent(StringEvent event) {
- StringEvent sentence = event;
- String[] split = sentence.getString().split(" ");
+ public void onEvent(Event event) {
+ String[] split = event.get("sentence").split(" ");
for (String word : split) {
wordSeenStream.put(new WordSeenEvent(word));
}
}
-
+
public void setWordSeenStream(Stream<WordSeenEvent> stream) {
this.wordSeenStream = stream;
}
@@ -28,13 +27,13 @@
@Override
protected void onCreate() {
// TODO Auto-generated method stub
-
+
}
@Override
protected void onRemove() {
// TODO Auto-generated method stub
-
+
}
}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
index 668499a..d473065 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
@@ -96,11 +96,11 @@
String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+ "zookeeper" + File.separator + "log").getAbsolutePath();
- @Parameter(names = { "-t", "-testMode" }, description = "Launch Zookeeper instance and load a default cluster configuration for easy testing (2 clusters with following configs: {"
- + TEST_MODE_CLUSTER_CONF_1 + "} and {" + TEST_MODE_CLUSTER_CONF_2 + "}")
+ @Parameter(names = { "-t", "-testMode" }, description = "Launch Zookeeper instance and load a default cluster configuration for easy testing (2 clusters with following configs: "
+ + TEST_MODE_CLUSTER_CONF_1 + " and " + TEST_MODE_CLUSTER_CONF_2 + "")
boolean testMode = false;
- @Parameter(names = "-clusters", description = "Inline clusters configuration, comma-separated list of curly-braces enclosed cluster definitions with format: {c=<cluster name>:flp=<first listening port for this cluster>:nbTasks=<number of tasks>} (Overrides default configuration for test mode)", converter = ClusterConfigsConverter.class)
+ @Parameter(names = "-clusters", description = "Inline clusters configuration, comma-separated list of cluster definitions with format: c=<cluster name>:flp=<first listening port for this cluster>:nbTasks=<number of tasks> (Overrides default configuration for test mode)", converter = ClusterConfigsConverter.class)
List<ClusterConfig> clusterConfigs;
}
@@ -109,7 +109,7 @@
@Override
public ClusterConfig convert(String arg) {
- Pattern clusterConfigPattern = Pattern.compile("\\{(c=\\w+[:]flp=\\d+[:]nbTasks=\\d+)\\}");
+ Pattern clusterConfigPattern = Pattern.compile("(c=\\w+[:]flp=\\d+[:]nbTasks=\\d+)");
logger.info("processing cluster configuration {}", arg);
Matcher configMatcher = clusterConfigPattern.matcher(arg);
if (!configMatcher.find()) {
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java
deleted file mode 100644
index 5ee7197..0000000
--- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package org.apache.s4.deploy;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.Stream;
-
-public class SocketAdapter {
-
- static ServerSocket serverSocket;
-
- /**
- * Listens to incoming sentence and forwards them to a sentence Stream. Each sentence is sent through a new socket
- * connection
- *
- * @param stream
- * @throws IOException
- */
- public SocketAdapter(final Stream<Event> stream) throws IOException {
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- serverSocket = null;
- Socket connectedSocket;
- BufferedReader in = null;
- try {
- serverSocket = new ServerSocket(12000);
- while (true) {
- connectedSocket = serverSocket.accept();
- in = new BufferedReader(new InputStreamReader(connectedSocket.getInputStream()));
-
- String line = in.readLine();
- System.out.println("read: " + line);
- Event event = new Event();
- event.put("line", String.class, line);
- stream.put(event);
- connectedSocket.close();
- }
-
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(-1);
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- if (serverSocket != null) {
- try {
- serverSocket.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
- });
- t.start();
-
- }
-
- public void close() {
- if (serverSocket != null) {
- try {
- serverSocket.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
-}
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
index bbfaf17..389562b 100644
--- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -1,8 +1,5 @@
package org.apache.s4.deploy;
-import java.io.IOException;
-import java.util.ArrayList;
-
import org.I0Itec.zkclient.ZkClient;
import org.apache.s4.base.Event;
import org.apache.s4.base.KeyFinder;
@@ -10,10 +7,11 @@
import org.apache.s4.core.Stream;
import org.apache.zookeeper.CreateMode;
+import com.google.common.collect.ImmutableList;
+
public class TestApp extends App {
private ZkClient zkClient;
- private SocketAdapter socketAdapter;
@Override
protected void onClose() {
@@ -24,21 +22,12 @@
@Override
protected void onInit() {
try {
- try {
- SimplePE prototype = createPE(SimplePE.class);
- Stream<Event> stream = createStream("stream", new KeyFinder<Event>() {
- public java.util.List<String> get(Event event) {
- return new ArrayList<String>() {
- {
- add("line");
- }
- };
- }
- }, prototype);
- socketAdapter = new SocketAdapter(stream);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ SimplePE prototype = createPE(SimplePE.class);
+ Stream<Event> stream = createInputStream("inputStream", new KeyFinder<Event>() {
+ public java.util.List<String> get(Event event) {
+ return ImmutableList.of("line");
+ }
+ }, prototype);
zkClient = new ZkClient("localhost:" + 2181);
if (!zkClient.exists("/s4-test")) {
zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
index f6b211c..2218ca5 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
@@ -23,6 +23,10 @@
static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
Map<String, Integer> countedTopics = Maps.newHashMap();
+ public TopNTopicPE() {
+ // required for checkpointing. Requirement to be lifted in 0.6
+ }
+
public TopNTopicPE(App app) {
super(app);
logger.info("key: [{}]", getId());
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
index 28d87f2..90c3729 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
@@ -9,13 +9,17 @@
// keyed by topic name
public class TopicCountAndReportPE extends ProcessingElement {
- Stream<TopicEvent> downStream;
- int threshold = 10;
+ transient Stream<TopicEvent> downStream;
+ transient int threshold = 10;
int count;
boolean firstEvent = true;
static Logger logger = LoggerFactory.getLogger(TopicCountAndReportPE.class);
+ public TopicCountAndReportPE() {
+ // required for checkpointing in S4 0.5. Requirement to be removed in 0.6
+ }
+
public TopicCountAndReportPE(App app) {
super(app);
}
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
index 90c3199..fc4e3ae 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -1,24 +1,20 @@
package org.apache.s4.example.twitter;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import org.I0Itec.zkclient.ZkClient;
import org.apache.s4.base.KeyFinder;
import org.apache.s4.core.App;
import org.apache.s4.core.Stream;
+import org.apache.s4.core.ft.CheckpointingConfig;
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+
+import com.google.common.collect.ImmutableList;
public class TwitterCounterApp extends App {
- private ZkClient zkClient;
-
- private Thread t;
-
@Override
protected void onClose() {
- // TODO Auto-generated method stub
-
}
@Override
@@ -27,31 +23,29 @@
TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
+ // we checkpoint this PE every 20s
+ topNTopicPE.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.TIME).frequency(20)
+ .timeUnit(TimeUnit.SECONDS).build());
@SuppressWarnings("unchecked")
Stream<TopicEvent> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder<TopicEvent>() {
@Override
public List<String> get(final TopicEvent arg0) {
- return new ArrayList<String>() {
- {
- add("aggregationKey");
- }
- };
+ return ImmutableList.of("aggregationKey");
}
}, topNTopicPE);
TopicCountAndReportPE topicCountAndReportPE = createPE(TopicCountAndReportPE.class);
topicCountAndReportPE.setDownstream(aggregatedTopicStream);
topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
+ // we checkpoint instances every 2 events
+ topicCountAndReportPE.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.EVENT_COUNT)
+ .frequency(2).build());
Stream<TopicEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicEvent>() {
@Override
public List<String> get(final TopicEvent arg0) {
- return new ArrayList<String>() {
- {
- add(arg0.getTopic());
- }
- };
+ return ImmutableList.of(arg0.getTopic());
}
}, topicCountAndReportPE);