Merge branch 'S4-71' into piper
diff --git a/settings.gradle b/settings.gradle
index ec6cb7d..588d254 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -23,8 +23,8 @@
//include ':test-apps:simple-adapter-1'
include ':test-apps:simple-deployable-app-1'
include ':test-apps:simple-deployable-app-2'
-include ':test-apps:s4-showtime'
-include ':test-apps:s4-counter'
+include ':test-apps:producer-app'
+include ':test-apps:consumer-app'
rootProject.name = 's4'
rootProject.children.each {project ->
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/JarResources.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/JarResources.java
index 2e96c3c..3f45575 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/JarResources.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/JarResources.java
@@ -5,8 +5,8 @@
import java.util.zip.*;
/**
- * JarResources: JarResources maps all resources included in a Zip or Jar file.
- * Additionaly, it provides a method to extract one as a blob.
+ * JarResources: JarResources maps all resources included in a Zip or Jar file. Additionaly, it provides a method to
+ * extract one as a blob.
*
* <p>
* CREDITS
@@ -30,8 +30,7 @@
private String jarFileName;
/**
- * creates a JarResources. It extracts all resources from a Jar into an
- * internal hashtable, keyed by resource names.
+ * creates a JarResources. It extracts all resources from a Jar into an internal hashtable, keyed by resource names.
*
* @param jarFileName
* a jar or zip file
@@ -79,8 +78,7 @@
}
if (debugOn) {
- System.out.println("ze.getName()=" + ze.getName() + ","
- + "getSize()=" + ze.getSize());
+ System.out.println("ze.getName()=" + ze.getName() + "," + "getSize()=" + ze.getSize());
}
int size = (int) ze.getSize();
@@ -104,10 +102,11 @@
htJarContents.put(ze.getName(), b);
if (debugOn) {
- System.out.println(ze.getName() + " rb=" + rb + ",size="
- + size + ",csize=" + ze.getCompressedSize());
+ System.out.println(ze.getName() + " rb=" + rb + ",size=" + size + ",csize="
+ + ze.getCompressedSize());
}
}
+ zis.close();
} catch (NullPointerException e) {
System.out.println("done.");
} catch (FileNotFoundException e) {
@@ -148,12 +147,11 @@
}
/**
- * Is a test driver. Given a jar file and a resource name, it trys to
- * extract the resource and then tells us whether it could or not.
+ * Is a test driver. Given a jar file and a resource name, it trys to extract the resource and then tells us whether
+ * it could or not.
*
- * <strong>Example</strong> Let's say you have a JAR file which jarred up a
- * bunch of gif image files. Now, by using JarResources, you could extract,
- * create, and display those images on-the-fly.
+ * <strong>Example</strong> Let's say you have a JAR file which jarred up a bunch of gif image files. Now, by using
+ * JarResources, you could extract, create, and display those images on-the-fly.
*
* <pre>
* ...
@@ -167,8 +165,7 @@
*/
public static void main(String[] args) throws IOException {
if (args.length != 2) {
- System.err
- .println("usage: java JarResources <jar file name> <resource name>");
+ System.err.println("usage: java JarResources <jar file name> <resource name>");
System.exit(1);
}
@@ -177,8 +174,7 @@
if (buff == null) {
System.out.println("Could not find " + args[1] + ".");
} else {
- System.out.println("Found " + args[1] + " (length=" + buff.length
- + ").");
+ System.out.println("Found " + args[1] + " (length=" + buff.length + ").");
}
}
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/TestClassLoader.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/TestClassLoader.java
index 4fe08e1..2dfeeb4 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/TestClassLoader.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/TestClassLoader.java
@@ -1,14 +1,15 @@
package org.apache.s4.base.util;
import java.io.FileInputStream;
+import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestClassLoader extends MultiClassLoader {
- private static final Logger logger = LoggerFactory
- .getLogger(TestClassLoader.class);
+ private static final Logger logger = LoggerFactory.getLogger(TestClassLoader.class);
+
@Override
/** Simple method to read a class file from a known location. */
public byte[] loadClassBytes(String className) {
@@ -17,22 +18,29 @@
String filename = "/tmp/" + className + ".impl";
logger.debug("Reading: " + filename);
+ FileInputStream fi = null;
try {
- FileInputStream fi = new FileInputStream(filename);
+ fi = new FileInputStream(filename);
bytes = new byte[fi.available()];
fi.read(bytes);
return bytes;
} catch (Exception e) {
/*
- * If we caught an exception, either the class wasn't found or it
- * was unreadable by our process.
+ * If we caught an exception, either the class wasn't found or it was unreadable by our process.
*/
logger.error("Unable to load class: {}.", filename);
e.printStackTrace();
return null;
+ } finally {
+ if (fi != null) {
+ try {
+ fi.close();
+ } catch (IOException e) {
+ logger.warn("Exception while closing input stream", e);
+ }
+ }
}
}
-
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index df38071..863bc29 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -71,10 +71,13 @@
/* Use Kryo to serialize events. */
bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ // a node holds a single partition assignment
+ // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
bind(Assignment.class).to(AssignmentFromZK.class);
- bind(Clusters.class).to(ClustersFromZK.class);
bind(Cluster.class).to(ClusterFromZK.class);
+ bind(Clusters.class).to(ClustersFromZK.class);
+
try {
Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
.getString("comm.emitter.class"));
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 5caafc9..5c83b09 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -157,6 +157,8 @@
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
+
+ refreshCluster();
}
private class Message implements ChannelFutureListener {
@@ -455,6 +457,10 @@
@Override
public void onChange() {
+ refreshCluster();
+ }
+
+ private void refreshCluster() {
for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
Integer partition = clusterNode.getPartition();
if (partition == null) {
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index 2aa14f0..3e905e6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -84,6 +84,7 @@
}
}
+ @Override
public int getPartitionId() {
return node.getPartition();
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index aadcd4d..010274a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -36,7 +36,7 @@
zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks", true);
zkclient.createPersistent("/s4/clusters/" + cluster + "/process", true);
- zkclient.createPersistent("/s4/clusters/" + cluster + "/apps", true);
+ zkclient.createPersistent("/s4/clusters/" + cluster + "/app", true);
for (int i = 0; i < tasks; i++) {
String taskId = "Task-" + i;
ZNRecord record = new ZNRecord(taskId);
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index d172d73..3a198bb 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -21,8 +21,10 @@
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.google.inject.name.Named;
+@Singleton
public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
private static final Logger logger = LoggerFactory.getLogger(AssignmentFromZK.class);
/**
@@ -89,6 +91,10 @@
zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
ZkSerializer serializer = new ZNRecordSerializer();
zkClient.setZkSerializer(serializer);
+ }
+
+ @Inject
+ void init() throws Exception {
zkClient.subscribeStateChanges(this);
if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
throw new Exception("cannot connect to zookeeper");
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
index a65fca4..be020cb 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@ -20,22 +20,23 @@
private static Logger logger = LoggerFactory.getLogger(TCPCommTest.class);
DeliveryTestUtil util;
public final static String CLUSTER_NAME = "cluster1";
- Injector injector;
public TCPCommTest() throws IOException {
super();
- injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
- .openStream(), CLUSTER_NAME), new TCPCommTestModule());
}
public TCPCommTest(int numTasks) throws IOException {
super(numTasks);
- injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
- .openStream(), CLUSTER_NAME), new TCPCommTestModule());
}
- public Injector getInjector() {
- return injector;
+ public Injector newInjector() {
+ try {
+ return Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+ .openStream(), CLUSTER_NAME), new TCPCommTestModule());
+ } catch (IOException e) {
+ Assert.fail();
+ return null;
+ }
}
class TCPCommTestModule extends AbstractModule {
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
index 71a3489..673c7a3 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
@@ -8,7 +8,6 @@
import org.apache.s4.comm.tools.TaskSetup;
import org.apache.s4.fixtures.CommTestUtils;
-import org.junit.Ignore;
import org.junit.Test;
import com.google.common.base.Splitter;
@@ -17,7 +16,6 @@
public class AssignmentsFromZKTest extends ZKBaseTest {
@Test
- @Ignore
public void testAssignmentFor1Cluster() throws Exception {
TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
final String topologyNames = "cluster1";
@@ -51,6 +49,7 @@
for (String topologyName : names) {
assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+ assignmentFromZK.init();
ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
latch.countDown();
}
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
index 38d7621..fb81228 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
@@ -69,6 +69,7 @@
try {
for (String clusterName : clusterNames) {
assignmentFromZK = new AssignmentFromZK(clusterName, CommTestUtils.ZK_STRING, 30000, 30000);
+ assignmentFromZK.init();
ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
latch.countDown();
}
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
index 90ebc3f..c0ed6b4 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@ -15,23 +15,19 @@
public abstract class UDPCommTest extends ProtocolTestUtil {
DeliveryTestUtil util;
- private Injector injector;
public UDPCommTest() throws IOException {
super();
- injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
- .openStream(), "cluster1"), new UDPCommTestModule());
}
public UDPCommTest(int numTasks) throws IOException {
super(numTasks);
- injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
- .openStream(), "cluster1"), new UDPCommTestModule());
}
@Override
- protected Injector getInjector() throws IOException {
- return injector;
+ protected Injector newInjector() throws IOException {
+ return Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(),
+ "cluster1"), new UDPCommTestModule());
}
class UDPCommTestModule extends AbstractModule {
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
index 421aacd..6bc074e 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
@@ -27,12 +27,12 @@
expectedMessages = new int[super.numTasks];
partitions = new PartitionInfo[super.numTasks];
for (int i = 0; i < this.numTasks; i++) {
- partitions[i] = getInjector().getInstance(PartitionInfo.class);
+ partitions[i] = newInjector().getInstance(PartitionInfo.class);
partitions[i].setProtocolTestUtil(this);
}
}
- protected abstract Injector getInjector() throws IOException;
+ protected abstract Injector newInjector() throws IOException;
protected void decreaseExpectedMessages(int partition, long amount) {
synchronized (expectedMessages) {
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index f7cf147..2791da6 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -2,9 +2,6 @@
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
@@ -12,6 +9,7 @@
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -29,6 +27,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.io.Files;
+
/**
* Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
* conversion, directory recursive delete etc... - starting local instances for zookeeper and bookkeeper - distributed
@@ -50,14 +50,17 @@
logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
}
- protected static Process forkProcess(String mainClass, String... args) throws IOException, InterruptedException {
+ protected static Process forkProcess(String mainClass, int debugPort, String... args) throws IOException,
+ InterruptedException {
List<String> cmdList = new ArrayList<String>();
cmdList.add("java");
cmdList.add("-cp");
cmdList.add(System.getProperty("java.class.path"));
- // cmdList.add("-Xdebug");
- // cmdList.add("-Xnoagent");
- // cmdList.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n");
+ if (debugPort != -1) {
+ cmdList.add("-Xdebug");
+ cmdList.add("-Xnoagent");
+ cmdList.add("-Xrunjdwp:transport=dt_socket,address=" + debugPort + ",server=y,suspend=n");
+ }
cmdList.add(mainClass);
for (String arg : args) {
@@ -106,56 +109,11 @@
}
public static void writeStringToFile(String s, File f) throws IOException {
- if (f.exists()) {
- if (!f.delete()) {
- throw new RuntimeException("Cannot delete file " + f.getAbsolutePath());
- }
- }
-
- FileWriter fw = null;
- try {
- if (!f.createNewFile()) {
- throw new RuntimeException("Cannot create new file : " + f.getAbsolutePath());
- }
- fw = new FileWriter(f);
-
- fw.write(s);
- } catch (IOException e) {
- throw (e);
- } finally {
- if (fw != null) {
- try {
- fw.close();
- } catch (IOException e) {
- throw (e);
- }
- }
- }
+ Files.write(s, f, Charset.defaultCharset());
}
public static String readFile(File f) throws IOException {
- BufferedReader br = null;
- try {
- br = new BufferedReader(new FileReader(f));
- StringBuilder sb = new StringBuilder();
- String line = br.readLine();
- while (line != null) {
- sb.append(line);
- line = br.readLine();
- if (line != null) {
- sb.append("\n");
- }
- }
- return sb.toString();
- } finally {
- if (br != null) {
- try {
- br.close();
- } catch (IOException e) {
- throw (e);
- }
- }
- }
+ return Files.toString(f, Charset.defaultCharset());
}
@@ -198,59 +156,12 @@
}
public static String readFileAsString(File f) throws IOException {
- FileReader fr = new FileReader(f);
- StringBuilder sb = new StringBuilder("");
- BufferedReader br = new BufferedReader(fr);
- String line = br.readLine();
- while (line != null) {
- sb.append(line);
- line = br.readLine();
- if (line != null) {
- sb.append("\n");
- }
- }
- return sb.toString();
+ return Files.toString(f, Charset.defaultCharset());
}
- // TODO factor this code (see BasicFSStateStorage) - or use commons io or
- // guava
public static byte[] readFileAsByteArray(File file) throws Exception {
- FileInputStream in = null;
- try {
- in = new FileInputStream(file);
-
- long length = file.length();
-
- /*
- * Arrays can only be created using int types, so ensure that the file size is not too big before we
- * downcast to create the array.
- */
- if (length > Integer.MAX_VALUE) {
- throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
- }
-
- byte[] buffer = new byte[(int) length];
- int offSet = 0;
- int numRead = 0;
-
- while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
- offSet += numRead;
- }
-
- if (offSet < buffer.length) {
- throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
- + buffer.length + " bytes read");
- }
-
- in.close();
- return buffer;
-
- } finally {
- if (in != null) {
- in.close();
- }
- }
+ return Files.toByteArray(file);
}
public static ZooKeeper createZkClient() throws IOException {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index fe4ceed..eae1dda 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -49,9 +49,6 @@
/* All the internal streams in this app. */
final private List<Streamable<Event>> streams = new ArrayList<Streamable<Event>>();
- /* All the the event sources exported by this app. */
- final private List<EventSource> eventSources = new ArrayList<EventSource>();
-
/* Pes indexed by name. */
Map<String, ProcessingElement> peByName = Maps.newHashMap();
@@ -125,11 +122,6 @@
streams.add(stream);
}
- /* Should only be used within the core package. */
- void addEventSource(EventSource es) {
- eventSources.add(es);
- }
-
/* Returns list of PE prototypes. Should only be used within the core package. */
List<ProcessingElement> getPePrototypes() {
return pePrototypes;
@@ -147,12 +139,6 @@
return streams;
}
- /* Returns list of the event sources to be exported. Should only be used within the core package. */
- // TODO visibility
- public List<EventSource> getEventSources() {
- return eventSources;
- }
-
protected abstract void onStart();
/**
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
deleted file mode 100644
index 1f89d86..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.s4.core;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.s4.base.Event;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * A producer app uses one or more EventSource classes to provide events to streamables. At runtime, consumer apps
- * subscribe to an event source by providing a streamable object. Each EventSource instance may correspond to a
- * different type of event stream. Each EventSource may have an unlimited number of subscribers.
- *
- */
-public class EventSource implements Streamable {
-
- /* No need to synchronize this object because we expect a single thread. */
- private Set<Streamable> streamables = new HashSet<Streamable>();
- private static final Logger logger = LoggerFactory.getLogger(EventSource.class);
- final private String name;
-
- public EventSource(App app, String name) {
- this.name = name;
- app.addEventSource(this);
- }
-
- /**
- * Subscribe a streamable to this event source.
- *
- * @param aStream
- */
- public void subscribeStream(Streamable aStream) {
- logger.info("Subscribing stream: {} to event source: {}.", aStream.getName(), getName());
- streamables.add(aStream);
- }
-
- /**
- * Unsubscribe a streamable from this event source.
- *
- * @param stream
- */
- public void unsubscribeStream(Streamable stream) {
- logger.info("Unsubsubscribing stream: {} to event source: {}.", stream.getName(), getName());
- streamables.remove(stream);
- }
-
- /**
- * Send an event to all the subscribed streamables.
- *
- * @param event
- */
- @Override
- public void put(Event event) {
- for (Streamable<Event> stream : streamables) {
- stream.put(event);
- }
- }
-
- /**
- *
- * @return the number of streamables subscribed to this event source.
- */
- public int getNumSubscribers() {
- return streamables.size();
- }
-
- /**
- * @return the name of this event source.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Close all the streamables subscribed to this event source.
- */
- @Override
- public void close() {
- for (Streamable stream : streamables) {
- logger.info("Closing stream: {} in event source: {}.", stream.getName(), getName());
- stream.close();
- }
- }
-
- /**
- *
- * @return the set of streamables subscribed to this event source.
- */
- public Set<Streamable> getStreamables() {
- return streamables;
- }
-
- @Override
- public void start() {
- // TODO Auto-generated method stub
-
- }
-}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 1104716..10da9a2 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -29,7 +29,7 @@
/**
*
* @param emitter
- * the emitter implements the low level commiunication layer.
+ * the emitter implements the low level communication layer.
* @param serDeser
* a serialization mechanism.
* @param hasher
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 3a87648..44e7b56 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -2,13 +2,13 @@
import java.io.File;
import java.io.IOException;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.jar.Attributes.Name;
import java.util.jar.JarFile;
import org.I0Itec.zkclient.ZkClient;
import org.apache.s4.base.util.S4RLoader;
+import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.deploy.DeploymentManager;
import org.slf4j.Logger;
@@ -16,8 +16,6 @@
import ch.qos.logback.classic.Level;
-import com.google.common.collect.Maps;
-import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.name.Named;
@@ -30,14 +28,8 @@
private static final Logger logger = LoggerFactory.getLogger(Server.class);
- private final String commModuleName;
private final String logLevel;
public static final String MANIFEST_S4_APP_CLASS = "S4-App-Class";
- // local applications directory
- private final String appsDir;
- Map<String, App> apps = Maps.newHashMap();
- Map<String, Streamable> streams = Maps.newHashMap();
- Map<String, EventSource> eventSources = Maps.newHashMap();
CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
private Injector injector;
@@ -45,7 +37,8 @@
@Inject
private DeploymentManager deploymentManager;
- private String clusterName;
+ @Inject
+ private AssignmentFromZK assignment;
private ZkClient zkClient;
@@ -53,15 +46,11 @@
*
*/
@Inject
- public Server(String commModuleName, @Named("s4.logger_level") String logLevel, @Named("appsDir") String appsDir,
+ public Server(String commModuleName, @Named("s4.logger_level") String logLevel,
@Named("cluster.name") String clusterName, @Named("cluster.zk_address") String zookeeperAddress,
@Named("cluster.zk_session_timeout") int sessionTimeout,
@Named("cluster.zk_connection_timeout") int connectionTimeout) {
- // TODO do we need to separate the comm module?
- this.commModuleName = commModuleName;
this.logLevel = logLevel;
- this.appsDir = appsDir;
- this.clusterName = clusterName;
zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
zkClient.setZkSerializer(new ZNRecordSerializer());
@@ -75,49 +64,19 @@
.getLogger(Logger.ROOT_LOGGER_NAME);
root.setLevel(Level.toLevel(logLevel));
- AbstractModule module = null;
-
- /* Initialize communication layer module. */
- // TODO do we need a separate comm layer?
- // try {
- // module = (AbstractModule) Class.forName(commModuleName).newInstance();
- // } catch (Exception e) {
- // logger.error("Unable to instantiate communication layer module.", e);
- // }
- //
- // /* After some indirection we get the injector. */
- // injector = Guice.createInjector(module);
-
- if (!new File(appsDir).exists()) {
- if (!new File(appsDir).mkdirs()) {
- logger.error("Cannot create apps directory [{}]", appsDir);
- }
- }
-
- // disabled app loading from local files
-
if (deploymentManager != null) {
deploymentManager.start();
}
- // wait for at least 1 app to be loaded (otherwise the server would not have anything to do and just die)
+ // wait for an app to be loaded (otherwise the server would not have anything to do and just die)
signalOneAppLoaded.await();
}
- public String getS4RDir() {
- return appsDir;
- }
-
- public App loadApp(File s4r) {
- logger.info("Local app deployment: using s4r file name [{}] as application name",
- s4r.getName().substring(0, s4r.getName().indexOf(".s4r")));
- return loadApp(s4r, s4r.getName().substring(0, s4r.getName().indexOf(".s4r")));
- }
-
public App loadApp(File s4r, String appName) {
// TODO handle application upgrade
+ logger.info("Loading application [{}] from file [{}]", appName, s4r.getAbsolutePath());
S4RLoader cl = new S4RLoader(s4r.getAbsolutePath());
try {
@@ -144,7 +103,6 @@
return null;
}
- App previous = apps.put(appName, app);
logger.info("Loaded application from file {}", s4r.getAbsolutePath());
signalOneAppLoaded.countDown();
return app;
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index a316005..979cc5d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -5,13 +5,8 @@
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
@@ -21,8 +16,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.inject.Inject;
@@ -31,7 +24,7 @@
/**
*
* <p>
- * Monitors applications on a given s4 cluster and starts them.
+ * Monitors application availability on a given s4 cluster. Starts the application when available.
* </p>
*
* <p>
@@ -62,11 +55,10 @@
private final String clusterName;
- final Set<String> apps = Sets.newHashSet();
private final ZkClient zkClient;
- private final String appsPath;
+ private final String appPath;
private final Server server;
- CountDownLatch signalInitialAppsLoaded = new CountDownLatch(1);
+ boolean deployed = false;
@Inject
public DistributedDeploymentManager(@Named("cluster.name") String clusterName,
@@ -79,58 +71,58 @@
zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
zkClient.setZkSerializer(new ZNRecordSerializer());
- IZkChildListener appListener = new AppsChangeListener();
- appsPath = "/s4/clusters/" + clusterName + "/apps";
- if (!zkClient.exists(appsPath)) {
- zkClient.create(appsPath, null, CreateMode.PERSISTENT);
+ String appDir = "/s4/clusters/" + clusterName + "/app";
+ if (!zkClient.exists(appDir)) {
+ zkClient.create(appDir, null, CreateMode.PERSISTENT);
}
- zkClient.subscribeChildChanges(appsPath, appListener);
+ appPath = appDir + "/s4App";
+ zkClient.subscribeDataChanges(appPath, new AppChangeListener());
}
- public void deployApplication(String newApp) throws DeploymentFailedException {
- ZNRecord appData = zkClient.readData(appsPath + "/" + newApp);
+ public void deployApplication() throws DeploymentFailedException {
+ ZNRecord appData = zkClient.readData(appPath);
String uriString = appData.getSimpleField(S4R_URI);
+ String appName = appData.getSimpleField("name");
try {
URI uri = new URI(uriString);
// fetch application
- final File s4rFile = new File(server.getS4RDir() + File.separator + clusterName + File.separator + newApp
- + ".s4r");
- if (s4rFile.exists()) {
- s4rFile.delete();
- }
+ File localS4RFileCopy;
try {
- Files.createParentDirs(s4rFile);
- s4rFile.createNewFile();
+ localS4RFileCopy = File.createTempFile("tmp", "s4r");
} catch (IOException e1) {
- throw new DeploymentFailedException("Cannot deploy application [" + newApp + "] from URI ["
- + uri.toString() + "] ", e1);
+ logger.error(
+ "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
+ appName, e1.getClass().getName() + "->" + e1.getMessage());
+ throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e1);
}
+ localS4RFileCopy.deleteOnExit();
try {
- if (ByteStreams.copy(fetchS4App(uri), Files.newOutputStreamSupplier(s4rFile)) == 0) {
+ if (ByteStreams.copy(fetchS4App(uri), Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
- + s4rFile.getAbsolutePath() + "] (nothing was copied)");
+ + localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
}
} catch (IOException e) {
- throw new DeploymentFailedException("Cannot deploy application [" + newApp + "] from URI ["
+ throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
+ uri.toString() + "] ", e);
}
// install locally
- App loaded = server.loadApp(s4rFile);
+ App loaded = server.loadApp(localS4RFileCopy, appName);
if (loaded != null) {
- logger.info("Successfully installed application {}", newApp);
- server.startApp(loaded, newApp, clusterName);
+ logger.info("Successfully installed application {}", appName);
+ server.startApp(loaded, appName, clusterName);
} else {
- throw new DeploymentFailedException("Cannot deploy application [" + newApp + "] from URI ["
+ throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
+ uri.toString() + "] : cannot start application");
}
// TODO sync with other nodes? (e.g. wait for other apps deployed before starting?
} catch (URISyntaxException e) {
- logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] { newApp,
- uriString, e.getMessage() });
- throw new DeploymentFailedException("Cannot deploy application [" + newApp + "]", e);
+ logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] {
+ appName, uriString, e.getMessage() });
+ throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e);
}
+ deployed = true;
}
// NOTE: in theory, we could support any protocol by implementing a chained visitor scheme,
@@ -146,36 +138,19 @@
throw new DeploymentFailedException("Unsupported protocol " + scheme);
}
- // synchronizes with startup apps loading
- private void deployNewApps(Set<String> newApps) {
- try {
- signalInitialAppsLoaded.await();
- } catch (InterruptedException e1) {
- logger.error("Interrupted while waiting for initialization of initial application. Cancelling deployment of new applications.");
- Thread.currentThread().interrupt();
- return;
- }
- deployApps(newApps);
- }
-
- private void deployApps(Set<String> newApps) {
- for (String newApp : newApps) {
- try {
- deployApplication(newApp);
- apps.add(newApp);
- } catch (DeploymentFailedException e) {
- logger.error("Application deployment failed for {}", newApp);
- }
- }
- }
-
- private final class AppsChangeListener implements IZkChildListener {
+ private final class AppChangeListener implements IZkDataListener {
@Override
- public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
- SetView<String> newApps = Sets.difference(Sets.newHashSet(currentChildren), apps);
- logger.info("Detected new application(s) to deploy {}" + Arrays.toString(newApps.toArray(new String[] {})));
+ public void handleDataDeleted(String dataPath) throws Exception {
+ logger.error("Application undeployment is not supported yet");
+ }
- deployNewApps(newApps);
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ if (!deployed) {
+ deployApplication();
+ } else {
+ logger.error("There is already an application deployed on this S4 node");
+ }
}
@@ -183,9 +158,12 @@
@Override
public void start() {
- List<String> initialApps = zkClient.getChildren(appsPath);
- deployApps(new HashSet<String>(initialApps));
- signalInitialAppsLoaded.countDown();
+ if (zkClient.exists(appPath)) {
+ try {
+ deployApplication();
+ } catch (DeploymentFailedException e) {
+ logger.error("Cannot deploy application", e);
+ }
+ }
}
-
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 8601d91..495c1ad 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -28,7 +28,6 @@
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import com.google.common.io.ByteStreams;
@@ -65,29 +64,6 @@
+ "/test-apps/simple-deployable-app-1/build.gradle"), "installS4R", new String[] { "appsDir="
+ tmpAppsDir.getAbsolutePath() });
- CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
- + "/test-apps/simple-deployable-app-2/build.gradle"), "installS4R", new String[] { "appsDir="
- + tmpAppsDir.getAbsolutePath() });
- }
-
- // ignore this test since now we only deploy from artifacts published through zookeeper
- @Test
- @Ignore
- public void testInitialDeploymentFromFileSystem() throws Exception {
-
- // File s4rToDeploy = new File(loadConfig().getString("appsDir") + File.separator + "testapp"
- // + System.currentTimeMillis() + ".s4r");
- //
- // Assert.assertTrue(ByteStreams.copy(
- // Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
- // + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
- //
- // initializeS4Node();
- //
- // final String uri = s4rToDeploy.toURI().toString();
- //
- // assertDeployment(uri, true);
-
}
@Test
@@ -105,12 +81,11 @@
final String uri = s4rToDeploy.toURI().toString();
- assertDeployment(uri, false);
+ assertDeployment(uri);
}
- private void assertDeployment(final String uri, boolean initial) throws KeeperException, InterruptedException,
- IOException {
+ private void assertDeployment(final String uri) throws KeeperException, InterruptedException, IOException {
CountDownLatch signalAppInitialized = new CountDownLatch(1);
CountDownLatch signalAppStarted = new CountDownLatch(1);
CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppInitialized,
@@ -118,11 +93,9 @@
CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppStarted,
CommTestUtils.createZkClient());
- if (!initial) {
- ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp", record, CreateMode.PERSISTENT);
- }
+ ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+ record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+ zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/app/s4App", record, CreateMode.PERSISTENT);
Assert.assertTrue(signalAppInitialized.await(20, TimeUnit.SECONDS));
Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
@@ -139,40 +112,6 @@
Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
}
- private void assertMultipleAppsDeployment(String uri1, String uri2) throws KeeperException, InterruptedException,
- IOException {
- CountDownLatch signalApp1Initialized = new CountDownLatch(1);
- CountDownLatch signalApp1Started = new CountDownLatch(1);
-
- CountDownLatch signalApp2Initialized = new CountDownLatch(1);
- CountDownLatch signalApp2Started = new CountDownLatch(1);
-
- CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalApp1Initialized,
- CommTestUtils.createZkClient());
- CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_2, signalApp1Started,
- CommTestUtils.createZkClient());
-
- CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_2, signalApp2Initialized,
- CommTestUtils.createZkClient());
- CommTestUtils.watchAndSignalCreation(AppConstants.STARTED_ZNODE_2, signalApp2Started,
- CommTestUtils.createZkClient());
-
- ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app1");
- record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uri1);
- zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp1", record1, CreateMode.PERSISTENT);
-
- ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app2");
- record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uri2);
- zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp2", record2, CreateMode.PERSISTENT);
-
- Assert.assertTrue(signalApp1Initialized.await(20, TimeUnit.SECONDS));
- Assert.assertTrue(signalApp1Started.await(10, TimeUnit.SECONDS));
-
- Assert.assertTrue(signalApp2Initialized.await(20, TimeUnit.SECONDS));
- Assert.assertTrue(signalApp2Started.await(10, TimeUnit.SECONDS));
-
- }
-
@Test
public void testZkTriggeredDeploymentFromHttp() throws Exception {
initializeS4Node();
@@ -195,62 +134,7 @@
httpServer.setExecutor(Executors.newCachedThreadPool());
httpServer.start();
- assertDeployment("http://localhost:8080/s4/" + s4rToDeploy.getName(), false);
-
- }
-
- /**
- * * * Tests that classes with same signature are loaded in different class loaders (through the S4RLoader), even
- * when referenced through reflection, and even when referencing classes present in the classpath of the S4 nod * *
- * Works in the following manne * * - we have app1 and app2, very simple a * * - app1 and app2 have 3 classes with
- * same name: A, AppConstants and Tes * * - app1 in addition has a PE and a socket adapter so that it can react to
- * injected e * * - upon initialization of the application, TestApp writes a znode in Zookeeper, corresponding to
- * the application index (1 or 2), using the corresponding constant from the AppConstant class (which is part of the
- * S4 node classpath, and therefore loaded by the standard classloader, not from an s4 app classl *
- *
- * - upon startup of the application, TestApp creates A by reflection, and A writes a znode specific to the current
- * p
- *
- * - app1 and app2 are generated through gradle scripts, called by executing the "gradlew" executable at the root of
- * the project, and using the build.gradle file available for these appl * ns
- *
- * - app1 and app2 s4r archives are copied to a web server and published to * per
- *
- * - they automatically get deployed, and we verify that 2 apps are correctly started, therefore that classes
- * TestApp and A were independently loaded for independent ap * ions
- *
- */
-
- @Test
- public void testZkTriggeredDeploymentFromHttpForMultipleApps() throws Exception {
- initializeS4Node();
- Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
- Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_2));
-
- File tmpDir = Files.createTempDir();
-
- File s4rToDeployForApp1 = new File(tmpDir, String.valueOf(System.currentTimeMillis()) + "-app1");
- File s4rToDeployForApp2 = new File(tmpDir, String.valueOf(System.currentTimeMillis()) + "-app2");
-
- Assert.assertTrue(ByteStreams.copy(
- Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
- + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")),
- Files.newOutputStreamSupplier(s4rToDeployForApp1)) > 0);
- Assert.assertTrue(ByteStreams.copy(
- Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
- + "/simple-deployable-app-2-0.0.0-SNAPSHOT.s4r")),
- Files.newOutputStreamSupplier(s4rToDeployForApp2)) > 0);
-
- // we start a
- InetSocketAddress addr = new InetSocketAddress(8080);
- httpServer = HttpServer.create(addr, 0);
-
- httpServer.createContext("/s4", new MyHandler(tmpDir));
- httpServer.setExecutor(Executors.newCachedThreadPool());
- httpServer.start();
-
- assertMultipleAppsDeployment("http://localhost:8080/s4/" + s4rToDeployForApp1.getName(),
- "http://localhost:8080/s4/" + s4rToDeployForApp2.getName());
+ assertDeployment("http://localhost:8080/s4/" + s4rToDeploy.getName());
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index 5062660..52792b1 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -4,6 +4,7 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
@@ -18,6 +19,7 @@
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
@@ -33,9 +35,11 @@
public class TestProducerConsumer {
private Factory zookeeperServerConnectionFactory;
- private Process forkedNode;
+ private Process forkedProducerNode;
+ private Process forkedConsumerNode;
private ZkClient zkClient;
- private final static String CLUSTER_NAME = "prodconcluster";
+ private final static String PRODUCER_CLUSTER = "producerCluster";
+ private final static String CONSUMER_CLUSTER = "consumerCluster";
private HttpServer httpServer;
private static File tmpAppsDir;
@@ -47,11 +51,11 @@
File gradlewFile = CoreTestUtils.findGradlewInRootDir();
CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
- + "/test-apps/s4-showtime/build.gradle"), "installS4R",
+ + "/test-apps/producer-app/build.gradle"), "installS4R",
new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
- + "/test-apps/s4-counter/build.gradle"), "installS4R",
+ + "/test-apps/consumer-app/build.gradle"), "installS4R",
new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
}
@@ -85,7 +89,8 @@
@After
public void cleanup() throws Exception {
CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
- CommTestUtils.killS4App(forkedNode);
+ CommTestUtils.killS4App(forkedProducerNode);
+ CommTestUtils.killS4App(forkedConsumerNode);
}
private PropertiesConfiguration loadConfig() throws org.apache.commons.configuration.ConfigurationException,
@@ -98,36 +103,38 @@
@Test
public void testInitialDeploymentFromFileSystem() throws Exception {
- File showtimeS4R = new File(loadConfig().getString("appsDir") + File.separator + "showtime"
+ File producerS4R = new File(loadConfig().getString("appsDir") + File.separator + "producer"
+ System.currentTimeMillis() + ".s4r");
System.out.println(tmpAppsDir.getAbsolutePath());
Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
- + "/s4-showtime-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(showtimeS4R)) > 0);
- String uriShowtime = showtimeS4R.toURI().toString();
+ + "/producer-app-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(producerS4R)) > 0);
+ String uriProducer = producerS4R.toURI().toString();
- File counterS4R = new File(loadConfig().getString("appsDir") + File.separator + "counter"
+ File consumerS4R = new File(loadConfig().getString("appsDir") + File.separator + "consumer"
+ System.currentTimeMillis() + ".s4r");
- Assert.assertTrue(ByteStreams.copy(
- Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath() + "/s4-counter-0.0.0-SNAPSHOT.s4r")),
- Files.newOutputStreamSupplier(counterS4R)) > 0);
+ Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+ + "/consumer-app-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(consumerS4R)) > 0);
- String uriCounter = counterS4R.toURI().toString();
+ String uriConsumer = consumerS4R.toURI().toString();
initializeS4Node();
ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriShowtime);
- zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/showtime", record1, CreateMode.PERSISTENT);
+ record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriProducer);
+ zkClient.create("/s4/clusters/" + PRODUCER_CLUSTER + "/app/s4App", record1, CreateMode.PERSISTENT);
ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriCounter);
- zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/counter", record2, CreateMode.PERSISTENT);
+ record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriConsumer);
+ zkClient.create("/s4/clusters/" + CONSUMER_CLUSTER + "/app/s4App", record2, CreateMode.PERSISTENT);
- // TODO validate test through some Zookeeper notifications
- Thread.sleep(10000);
+ CountDownLatch signalConsumptionComplete = new CountDownLatch(1);
+ CommTestUtils.watchAndSignalCreation("/1000TicksReceived", signalConsumptionComplete,
+ CommTestUtils.createZkClient());
+ Assert.assertTrue(signalConsumptionComplete.await(20, TimeUnit.SECONDS));
+
}
- private void initializeS4Node() throws ConfigurationException, IOException, InterruptedException {
+ private void initializeS4Node() throws ConfigurationException, IOException, InterruptedException, KeeperException {
// 0. package s4 app
// TODO this is currently done offline, and the app contains the TestApp class copied from the one in the
// current package .
@@ -135,31 +142,45 @@
// 1. start s4 nodes. Check that no app is deployed.
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
taskSetup.clean("s4");
- taskSetup.setup(CLUSTER_NAME, 1, 1300);
+ taskSetup.setup(PRODUCER_CLUSTER, 1, 1300);
+
+ TaskSetup taskSetup2 = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+ taskSetup2.setup(CONSUMER_CLUSTER, 1, 1400);
zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
zkClient.setZkSerializer(new ZNRecordSerializer());
- List<String> processes = zkClient.getChildren("/s4/clusters/" + CLUSTER_NAME + "/process");
+ List<String> processes = zkClient.getChildren("/s4/clusters/" + PRODUCER_CLUSTER + "/process");
Assert.assertTrue(processes.size() == 0);
final CountDownLatch signalProcessesReady = new CountDownLatch(1);
- zkClient.subscribeChildChanges("/s4/clusters/" + CLUSTER_NAME + "/process", new IZkChildListener() {
+ zkClient.subscribeChildChanges("/s4/clusters/" + PRODUCER_CLUSTER + "/process", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
- if (currentChilds.size() == 2) {
+ if (currentChilds.size() == 1) {
signalProcessesReady.countDown();
}
}
});
- forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CLUSTER_NAME });
+ zkClient.subscribeChildChanges("/s4/clusters/" + CONSUMER_CLUSTER + "/process", new IZkChildListener() {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ if (currentChilds.size() == 1) {
+ signalProcessesReady.countDown();
+ }
+
+ }
+ });
+
+ forkedProducerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + PRODUCER_CLUSTER });
+ forkedConsumerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CONSUMER_CLUSTER });
// TODO synchro with ready state from zk
- Thread.sleep(10000);
- // Assert.assertTrue(signalProcessesReady.await(10, TimeUnit.SECONDS));
+ // Thread.sleep(10000);
+ Assert.assertTrue(signalProcessesReady.await(20, TimeUnit.SECONDS));
}
-
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index d9bf9a6..832ce1b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -26,15 +26,19 @@
public class CoreTestUtils extends CommTestUtils {
public static Process forkS4App(Class<?> moduleClass, Class<?> appClass) throws IOException, InterruptedException {
- return forkProcess(App.class.getName(), moduleClass.getName(), appClass.getName());
+ return forkProcess(App.class.getName(), -1, moduleClass.getName(), appClass.getName());
}
public static Process forkS4Node() throws IOException, InterruptedException {
- return forkProcess(Main.class.getName(), new String[] {});
+ return forkS4Node(new String[] {});
}
public static Process forkS4Node(String[] args) throws IOException, InterruptedException {
- return forkProcess(Main.class.getName(), args);
+ return forkS4Node(-1, args);
+ }
+
+ public static Process forkS4Node(int debugPort, String[] args) throws IOException, InterruptedException {
+ return forkProcess(Main.class.getName(), debugPort, args);
}
public static File findGradlewInRootDir() {
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 9471910..d1d8857 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -5,8 +5,6 @@
import java.util.Arrays;
import java.util.List;
-import junit.framework.Assert;
-
import org.I0Itec.zkclient.ZkClient;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
@@ -21,6 +19,7 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.FileConverter;
+import com.google.common.base.Strings;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
@@ -44,15 +43,23 @@
tmpAppsDir = Files.createTempDir();
- File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
+ if (!Strings.isNullOrEmpty(deployArgs.s4rPath) && !Strings.isNullOrEmpty(deployArgs.generatedS4R)) {
+ logger.error("-s4r and -generatedS4R options are mutually exclusive");
+ System.exit(1);
+ }
- String s4rPath = null;
+ File s4rToDeploy;
if (deployArgs.s4rPath != null) {
- s4rPath = deployArgs.s4rPath;
- logger.info(
- "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
- s4rPath);
+ s4rToDeploy = new File(deployArgs.s4rPath);
+ if (!s4rToDeploy.exists()) {
+ logger.error("Specified S4R file does not exist in {}", s4rToDeploy.getAbsolutePath());
+ System.exit(1);
+ } else {
+ logger.info(
+ "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
+ s4rToDeploy.getAbsolutePath());
+ }
} else {
List<String> params = new ArrayList<String>();
// prepare gradle -P parameters, including passed gradle opts
@@ -61,19 +68,39 @@
params.add("appsDir=" + tmpAppsDir.getAbsolutePath());
params.add("appName=" + deployArgs.appName);
ExecGradle.exec(deployArgs.gradleBuildFile, "installS4R", params.toArray(new String[] {}));
- s4rPath = tmpAppsDir.getAbsolutePath() + "/" + deployArgs.appName + ".s4r";
+ File tmpS4R = new File(tmpAppsDir.getAbsolutePath() + "/" + deployArgs.appName + ".s4r");
+ if (!Strings.isNullOrEmpty(deployArgs.generatedS4R)) {
+ logger.info("Copying generated S4R to [{}]", deployArgs.generatedS4R);
+ s4rToDeploy = new File(deployArgs.generatedS4R);
+ if (!(ByteStreams.copy(Files.newInputStreamSupplier(tmpS4R),
+ Files.newOutputStreamSupplier(s4rToDeploy)) > 0)) {
+ logger.error("Cannot copy generated s4r from {} to {}", tmpS4R.getAbsolutePath(),
+ s4rToDeploy.getAbsolutePath());
+ System.exit(1);
+ }
+ } else {
+ s4rToDeploy = tmpS4R;
+ }
}
- Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(s4rPath)),
- Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
final String uri = s4rToDeploy.toURI().toString();
ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- zkClient.create("/s4/clusters/" + deployArgs.clusterName + "/apps/" + deployArgs.appName, record,
- CreateMode.PERSISTENT);
- logger.info("uploaded application [{}] to cluster [{}], using zookeeper znode [{}]", new String[] {
- deployArgs.appName, deployArgs.clusterName,
- "/s4/clusters/" + deployArgs.clusterName + "/apps/" + deployArgs.appName });
+ record.putSimpleField("name", deployArgs.appName);
+ String deployedAppPath = "/s4/clusters/" + deployArgs.clusterName + "/app/s4App";
+ if (zkClient.exists(deployedAppPath)) {
+ ZNRecord readData = zkClient.readData(deployedAppPath);
+ logger.error("Cannot deploy app [{}], because app [{}] is already deployed", deployArgs.appName,
+ readData.getSimpleField("name"));
+ System.exit(1);
+ }
+
+ zkClient.create("/s4/clusters/" + deployArgs.clusterName + "/app/s4App", record, CreateMode.PERSISTENT);
+ logger.info(
+ "uploaded application [{}] to cluster [{}], using zookeeper znode [{}], and s4r file [{}]",
+ new String[] { deployArgs.appName, deployArgs.clusterName,
+ "/s4/clusters/" + deployArgs.clusterName + "/app/" + deployArgs.appName,
+ s4rToDeploy.getAbsolutePath() });
} catch (Exception e) {
LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
@@ -87,13 +114,16 @@
@Parameter(names = { "-b", "-buildFile" }, description = "Full path to gradle build file for the S4 application", required = false, converter = FileConverter.class, validateWith = FileExistsValidator.class)
File gradleBuildFile;
- @Parameter(names = "-s4r", description = "Path to s4r file", required = false)
+ @Parameter(names = "-s4r", description = "Path to existing s4r file", required = false)
String s4rPath;
+ @Parameter(names = { "-generatedS4R", "-g" }, description = "Location of generated s4r (incompatible with -s4r option). By default, s4r is generated in a temporary directory on the local file system. In a distributed environment, you probably want to specify a location accessible through a distributed file system like NFS. That's the purpose of this option.", required = false)
+ String generatedS4R;
+
@Parameter(names = { "-a", "-appClass" }, description = "Full class name of the application class (extending App or AdapterApp)", required = false)
String appClass = "";
- @Parameter(names = "-appName", description = "Name of S4 application. This will be the name of the s4r file as well", required = true)
+ @Parameter(names = "-appName", description = "Name of S4 application.", required = true)
String appName;
@Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
diff --git a/test-apps/s4-showtime/build.gradle b/test-apps/consumer-app/build.gradle
similarity index 100%
rename from test-apps/s4-showtime/build.gradle
rename to test-apps/consumer-app/build.gradle
diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java b/test-apps/consumer-app/src/main/java/s4app/ConsumerApp.java
similarity index 64%
rename from test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
rename to test-apps/consumer-app/src/main/java/s4app/ConsumerApp.java
index 6652f59..c13c53e 100644
--- a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
+++ b/test-apps/consumer-app/src/main/java/s4app/ConsumerApp.java
@@ -2,24 +2,24 @@
import org.apache.s4.core.App;
-public class ShowTimeApp extends App {
+public class ConsumerApp extends App {
- private ShowPE showPE;
+ private ConsumerPE consumerPE;
@Override
protected void onStart() {
System.out.println("Starting ShowTimeApp...");
- showPE.getInstanceForKey("single");
}
@Override
protected void onInit() {
System.out.println("Initing ShowTimeApp...");
- showPE = new ShowPE(this);
+ ConsumerPE consumerPE = createPE(ConsumerPE.class, "consumer");
+ consumerPE.setSingleton(true);
/* This stream will receive events from another app. */
- createStream("clockStream", showPE);
+ createInputStream("tickStream", consumerPE);
}
@Override
diff --git a/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
new file mode 100644
index 0000000..d0b1577
--- /dev/null
+++ b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
@@ -0,0 +1,44 @@
+package s4app;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumerPE extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(ConsumerPE.class);
+ long eventCount = 0;
+
+ public ConsumerPE(App app) {
+ super(app);
+ }
+
+ public void onEvent(Event event) {
+ eventCount++;
+ logger.info(
+ "Received event with tick {} and time {} for event # {}",
+ new String[] { String.valueOf(event.get("tick", Long.class)), String.valueOf(event.getTime()),
+ String.valueOf(eventCount) });
+ if (eventCount == 1000) {
+ logger.info("Just reached 1000 events");
+ ZkClient zkClient = new ZkClient("localhost:2181");
+ zkClient.create("/1000TicksReceived", new byte[0], CreateMode.PERSISTENT);
+ }
+
+ }
+
+ @Override
+ protected void onRemove() {
+
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+}
diff --git a/test-apps/s4-counter/build.gradle b/test-apps/producer-app/build.gradle
similarity index 100%
rename from test-apps/s4-counter/build.gradle
rename to test-apps/producer-app/build.gradle
diff --git a/test-apps/producer-app/src/main/java/s4app/ProducerApp.java b/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
new file mode 100644
index 0000000..4b54112
--- /dev/null
+++ b/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
@@ -0,0 +1,32 @@
+package s4app;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.core.App;
+
+public class ProducerApp extends App {
+
+ private ProducerPE producerPE;
+
+ @Override
+ protected void onStart() {
+ System.out.println("Starting CounterApp...");
+ producerPE.getInstanceForKey("single");
+ }
+
+ // generic array due to varargs generates a warning.
+ @Override
+ protected void onInit() {
+ System.out.println("Initing CounterApp...");
+
+ producerPE = createPE(ProducerPE.class, "producer");
+ producerPE.setStreams(createOutputStream("tickStream"));
+ producerPE.setTimerInterval(10, TimeUnit.MILLISECONDS);
+
+ }
+
+ @Override
+ protected void onClose() {
+ }
+
+}
diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockPE.java b/test-apps/producer-app/src/main/java/s4app/ProducerPE.java
similarity index 62%
rename from test-apps/s4-counter/src/main/java/s4app/ClockPE.java
rename to test-apps/producer-app/src/main/java/s4app/ProducerPE.java
index cc81672..7812bc6 100644
--- a/test-apps/s4-counter/src/main/java/s4app/ClockPE.java
+++ b/test-apps/producer-app/src/main/java/s4app/ProducerPE.java
@@ -7,14 +7,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ClockPE extends ProcessingElement {
+public class ProducerPE extends ProcessingElement {
- private static final Logger logger = LoggerFactory.getLogger(ClockPE.class);
+ private static final Logger logger = LoggerFactory.getLogger(ProducerPE.class);
private Streamable[] targetStreams;
private long tick = 0;
- public ClockPE(App app) {
+ public ProducerPE(App app) {
super(app);
}
@@ -27,12 +27,14 @@
}
public void onTime() {
- Event event = new Event();
- event.put("tick", Long.class, tick++);
+ if (tick < 1000) {
+ Event event = new Event();
+ event.put("tick", Long.class, tick++);
- logger.info("Sending event with tick {} and time {}.", event.get("tick", Long.class), event.getTime());
- for (int i = 0; i < targetStreams.length; i++) {
- targetStreams[i].put(event);
+ logger.info("Sending event with tick {} and time {}.", event.get("tick", Long.class), event.getTime());
+ for (int i = 0; i < targetStreams.length; i++) {
+ targetStreams[i].put(event);
+ }
}
}
diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
deleted file mode 100644
index cab4eab..0000000
--- a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package s4app;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.EventSource;
-import org.apache.s4.core.Streamable;
-
-public class ClockApp extends App {
-
- private EventSource eventSource;
- private ClockPE clockPE;
-
- @Override
- protected void onStart() {
- System.out.println("Starting CounterApp...");
- clockPE.getInstanceForKey("single");
- }
-
- // generic array due to varargs generates a warning.
- @Override
- protected void onInit() {
- System.out.println("Initing CounterApp...");
-
- clockPE = new ClockPE(this);
- clockPE.setTimerInterval(1, TimeUnit.SECONDS);
-
- eventSource = new EventSource(this, "clockStream");
- clockPE.setStreams((Streamable) eventSource);
- }
-
- @Override
- protected void onClose() {
- System.out.println("Closing CounterApp...");
- eventSource.close();
- }
-
-}
diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java b/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java
deleted file mode 100644
index f68516c..0000000
--- a/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package s4app;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ShowPE extends ProcessingElement {
-
- private static final Logger logger = LoggerFactory.getLogger(ShowPE.class);
-
- public ShowPE(App app) {
- super(app);
- }
-
- public void onEvent(Event event) {
-
- logger.info("Received event with tick {} and time {}.", event.get("tick", Long.class), event.getTime());
-
- }
-
- @Override
- protected void onRemove() {
-
- }
-
- @Override
- protected void onCreate() {
- // TODO Auto-generated method stub
-
- }
-}