added out method
diff --git a/src/main/java/io/s4/App.java b/src/main/java/io/s4/App.java
deleted file mode 100644
index c414543..0000000
--- a/src/main/java/io/s4/App.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/*
- * Container base class to hold all processing elements. We will implement administrative methods here.
- */
-public abstract class App {
-
- final private List<ProcessingElement> pePrototypes = new ArrayList<ProcessingElement>();
- final private List<Stream<? extends Event>> streams = new ArrayList<Stream<? extends Event>>();
-
- /**
- * @return the pePrototypes
- */
- public List<ProcessingElement> getPePrototypes() {
- return pePrototypes;
- }
-
- protected abstract void start();
-
- protected abstract void init();
-
- protected abstract void close();
-
- public void removeAll() {
-
- for (ProcessingElement pe : pePrototypes) {
-
- /* Remove all instances. */
- pe.removeAll();
-
- }
-
- for (Stream<? extends Event> stream : streams) {
-
- /* Close all streams. */
- stream.close();
- }
-
- /* Give prototype a chance to clean up after itself. */
- close();
-
- /* Finally remove from App. */
- pePrototypes.clear();
- streams.clear();
- }
-
- void addPEPrototype(ProcessingElement pePrototype) {
-
- pePrototypes.add(pePrototype);
-
- }
-
- void addStream(Stream<? extends Event> stream) {
-
- streams.add(stream);
-
- }
-
- public List<Stream<? extends Event>> getStreams() {
- return streams;
- }
-}
diff --git a/src/main/java/io/s4/ProcessingElement.java b/src/main/java/io/s4/ProcessingElement.java
deleted file mode 100644
index e2c7842..0000000
--- a/src/main/java/io/s4/ProcessingElement.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class ProcessingElement implements Cloneable {
-
- Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
-
- final protected App app;
- final protected Map<String, ProcessingElement> peInstances = new ConcurrentHashMap<String, ProcessingElement>();
- protected String id = ""; // PE instance id
- final protected ProcessingElement pePrototype;
-
- /*
- * Base class for implementing processing in S4. All instances are organized
- * as follows. A PE prototype is a special type of instance that defines the
- * topology of the graph and manages the creation and destruction of the
- * actual instances that do the processing. PE instances are clones of the
- * prototype. PE instance variables should be initialized in the
- * initPEInstance() method. Be aware that Class variables are simply copied
- * to the clones, even references.
- */
- public ProcessingElement(App app) {
-
- this.app = app;
- app.addPEPrototype(this);
-
- /*
- * Only the PE Prototype uses the constructor. The PEPrototype field
- * will be cloned by the instances and point to the prototype.
- */
- this.pePrototype = this;
- }
-
- /**
- * @return the app
- */
- public App getApp() {
- return app;
- }
-
- public int getNumPEInstances() {
-
- return peInstances.size();
- }
-
- synchronized public void handleInputEvent(Event event) {
-
- processInputEvent(event);
- }
-
- abstract protected void processInputEvent(Event event);
-
- abstract public void sendEvent(); // consider having several output
- // policies...
-
- abstract protected void initPEInstance();
-
- abstract protected void removeInstanceForKey(String id);
-
- private void removeInstanceForKeyInternal(String id) {
-
- if (id == null)
- return;
-
- /* First let the PE instance clean after itself. */
- removeInstanceForKey(id);
-
- /* Remove PE instance. */
- peInstances.remove(id);
- }
-
- protected void removeAll() {
-
- /* Remove all the instances. */
- for (Map.Entry<String, ProcessingElement> entry : peInstances
- .entrySet()) {
-
- String key = entry.getKey();
-
- if (key != null)
- removeInstanceForKeyInternal(key);
- }
-
- /*
- * TODO: This object (the PE prototype) may still be referenced by other
- * objects at this point. For example a stream object may still be
- * referencing PEs.
- */
- }
-
- protected void close() {
- removeInstanceForKeyInternal(id);
- }
-
- synchronized public ProcessingElement getInstanceForKey(String id) {
-
- /* Check if instance for key exists, otherwise create one. */
- ProcessingElement pe = peInstances.get(id);
- if (pe == null) {
- /* PE instance for key does not yet exist, cloning one. */
- pe = (ProcessingElement) this.clone();
- peInstances.put(id, pe);
- pe.id = id;
- pe.initPEInstance();
-
- logger.trace("Num PE instances: {}.", getNumPEInstances());
- }
- return pe;
- }
-
- synchronized protected List<ProcessingElement> getAllInstances() {
-
- return new ArrayList<ProcessingElement>(peInstances.values());
- }
-
- /**
- * Unique ID for a PE instance.
- *
- * @return the id
- */
- public String getId() {
- return id;
- }
-
- /**
- * @param id
- * the id to set
- */
- public void setId(String id) {
- this.id = id;
- }
-
- /**
- * This method exists simply to make <code>clone()</code> protected.
- */
- protected Object clone() {
- try {
- Object clone = super.clone();
- return clone;
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
- }
- }
-
- // TODO: Change equals and hashCode in ProcessingElement and
- // Stream so we can use sets as collection and make sure there are no
- // duplicate prototypes.
- // Great article: http://www.artima.com/lejava/articles/equality.html
-
-}
diff --git a/src/main/java/io/s4/Stream.java b/src/main/java/io/s4/Stream.java
deleted file mode 100644
index 6f09a30..0000000
--- a/src/main/java/io/s4/Stream.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4;
-
-import io.s4.example.model.Controller;
-
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Stream<T extends Event> implements Runnable {
-
- Logger logger = LoggerFactory.getLogger(Stream.class);
-
- final static private String DEFAULT_SEPARATOR = "^";
- final static private int CAPACITY = 1000;
- final private String name;
- final private Key<T> key;
- final private ProcessingElement[] targetPEs;
- final private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(CAPACITY);
- final private Thread thread;
-
- /*
- * Streams send event of a given type using a specific key to target
- * processing elements.
- */
- public Stream(App app, String name, KeyFinder<T> finder,
- ProcessingElement... processingElements) {
-
- app.addStream(this);
- this.name = name;
-
- if (finder == null) {
- this.key = null;
- } else {
- this.key = new Key<T>(finder, DEFAULT_SEPARATOR);
- }
- this.targetPEs = processingElements;
-
- /* Start streaming. */
- /*
- * TODO: This is only for prototyping. Comm layer will take care of this
- * in the real implementation.
- */
- // logger.trace("Start thread for stream " + name);
- thread = new Thread(this, name);
- thread.start();
- }
-
- /*
- * This constructor will create a broadcast stream. That is, the events will
- * be sent to all the PE instances.
- */
- public Stream(App app, String name, ProcessingElement... processingElements) {
- this(app, name, null, processingElements);
- }
-
- public void put(T event) {
- try {
- // System.out.println("Remaining capacity in stream " + name + ":" +
- // queue.remainingCapacity());
- // System.out.println("PUT: " + event);
- queue.put(event);
- } catch (InterruptedException e) {
- e.printStackTrace();
- System.exit(-1);
- }
- }
-
- /**
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * @return the key
- */
- public Key<T> getKey() {
- return key;
- }
-
- /**
- * @return the list of target processing elements.
- */
- public ProcessingElement[] getTargetPEs() {
- return targetPEs;
- }
-
- public void close() {
- thread.interrupt();
- }
-
- @Override
- public void run() {
- while (true) {
- try {
- /* Get oldest event in queue. */
- T event = queue.take();
-
- /* Send event to each target PE. */
- for (int i = 0; i < targetPEs.length; i++) {
-
- if (key == null) {
-
- /* Broadcast to all PE instances! */
-
- /* STEP 1: find all PE instances. */
-
- List<ProcessingElement> pes = targetPEs[i].getAllInstances();
-
- /* STEP 2: iterate and pass event to PE instance. */
- for(ProcessingElement pe : pes) {
-
- pe.handleInputEvent(event);
- }
-
- } else {
-
- /* We have a key, send to target PE. */
-
- /* STEP 1: find the PE instance for key. */
- ProcessingElement pe = targetPEs[i]
- .getInstanceForKey(key.get(event));
-
- /* STEP 2: pass event to PE instance. */
- pe.handleInputEvent(event);
- }
- }
-
- } catch (InterruptedException e) {
- logger.info("Closing stream {}.", name);
- return;
- }
- }
- }
-}