added out method
diff --git a/src/main/java/io/s4/App.java b/src/main/java/io/s4/App.java
deleted file mode 100644
index c414543..0000000
--- a/src/main/java/io/s4/App.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/*
- * Container base class to hold all processing elements. We will implement administrative methods here. 
- */
-public abstract class App {
-
-    final private List<ProcessingElement> pePrototypes = new ArrayList<ProcessingElement>();
-    final private List<Stream<? extends Event>> streams = new ArrayList<Stream<? extends Event>>();
-
-    /**
-     * @return the pePrototypes
-     */
-    public List<ProcessingElement> getPePrototypes() {
-        return pePrototypes;
-    }
-
-    protected abstract void start();
-
-    protected abstract void init();
-
-    protected abstract void close();
-
-    public void removeAll() {
-
-        for (ProcessingElement pe : pePrototypes) {
-
-            /* Remove all instances. */
-            pe.removeAll();
-
-        }
-
-        for (Stream<? extends Event> stream : streams) {
-
-            /* Close all streams. */
-            stream.close();
-        }
-
-        /* Give prototype a chance to clean up after itself. */
-        close();
-
-        /* Finally remove from App. */
-        pePrototypes.clear();
-        streams.clear();
-    }
-
-    void addPEPrototype(ProcessingElement pePrototype) {
-
-        pePrototypes.add(pePrototype);
-
-    }
-
-    void addStream(Stream<? extends Event> stream) {
-
-        streams.add(stream);
-
-    }
-
-    public List<Stream<? extends Event>> getStreams() {
-        return streams;
-    }
-}
diff --git a/src/main/java/io/s4/ProcessingElement.java b/src/main/java/io/s4/ProcessingElement.java
deleted file mode 100644
index e2c7842..0000000
--- a/src/main/java/io/s4/ProcessingElement.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class ProcessingElement implements Cloneable {
-
-    Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
-
-    final protected App app;
-    final protected Map<String, ProcessingElement> peInstances = new ConcurrentHashMap<String, ProcessingElement>();
-    protected String id = ""; // PE instance id
-    final protected ProcessingElement pePrototype;
-
-    /*
-     * Base class for implementing processing in S4. All instances are organized
-     * as follows. A PE prototype is a special type of instance that defines the
-     * topology of the graph and manages the creation and destruction of the
-     * actual instances that do the processing. PE instances are clones of the
-     * prototype. PE instance variables should be initialized in the
-     * initPEInstance() method. Be aware that Class variables are simply copied
-     * to the clones, even references.
-     */
-    public ProcessingElement(App app) {
-
-        this.app = app;
-        app.addPEPrototype(this);
-
-        /*
-         * Only the PE Prototype uses the constructor. The PEPrototype field
-         * will be cloned by the instances and point to the prototype.
-         */
-        this.pePrototype = this;
-    }
-
-    /**
-     * @return the app
-     */
-    public App getApp() {
-        return app;
-    }
-
-    public int getNumPEInstances() {
-
-        return peInstances.size();
-    }
-
-    synchronized public void handleInputEvent(Event event) {
-
-        processInputEvent(event);
-    }
-
-    abstract protected void processInputEvent(Event event);
-
-    abstract public void sendEvent(); // consider having several output
-                                      // policies...
-
-    abstract protected void initPEInstance();
-
-    abstract protected void removeInstanceForKey(String id);
-
-    private void removeInstanceForKeyInternal(String id) {
-
-        if (id == null)
-            return;
-
-        /* First let the PE instance clean after itself. */
-        removeInstanceForKey(id);
-
-        /* Remove PE instance. */
-        peInstances.remove(id);
-    }
-
-    protected void removeAll() {
-
-        /* Remove all the instances. */
-        for (Map.Entry<String, ProcessingElement> entry : peInstances
-                .entrySet()) {
-
-            String key = entry.getKey();
-
-            if (key != null)
-                removeInstanceForKeyInternal(key);
-        }
-
-        /*
-         * TODO: This object (the PE prototype) may still be referenced by other
-         * objects at this point. For example a stream object may still be
-         * referencing PEs.
-         */
-    }
-
-    protected void close() {
-        removeInstanceForKeyInternal(id);
-    }
-
-    synchronized public ProcessingElement getInstanceForKey(String id) {
-
-        /* Check if instance for key exists, otherwise create one. */
-        ProcessingElement pe = peInstances.get(id);
-        if (pe == null) {
-            /* PE instance for key does not yet exist, cloning one. */
-            pe = (ProcessingElement) this.clone();
-            peInstances.put(id, pe);
-            pe.id = id;
-            pe.initPEInstance();
-
-            logger.trace("Num PE instances: {}.", getNumPEInstances());
-        }
-        return pe;
-    }
-
-    synchronized protected List<ProcessingElement> getAllInstances() {
-
-        return new ArrayList<ProcessingElement>(peInstances.values());
-    }
-
-    /**
-     * Unique ID for a PE instance.
-     * 
-     * @return the id
-     */
-    public String getId() {
-        return id;
-    }
-
-    /**
-     * @param id
-     *            the id to set
-     */
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    /**
-     * This method exists simply to make <code>clone()</code> protected.
-     */
-    protected Object clone() {
-        try {
-            Object clone = super.clone();
-            return clone;
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    // TODO: Change equals and hashCode in ProcessingElement and
-    // Stream so we can use sets as collection and make sure there are no
-    // duplicate prototypes.
-    // Great article: http://www.artima.com/lejava/articles/equality.html
-
-}
diff --git a/src/main/java/io/s4/Stream.java b/src/main/java/io/s4/Stream.java
deleted file mode 100644
index 6f09a30..0000000
--- a/src/main/java/io/s4/Stream.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4;
-
-import io.s4.example.model.Controller;
-
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Stream<T extends Event> implements Runnable {
-
-	Logger logger = LoggerFactory.getLogger(Stream.class);
-
-	final static private String DEFAULT_SEPARATOR = "^";
-	final static private int CAPACITY = 1000;
-	final private String name;
-	final private Key<T> key;
-	final private ProcessingElement[] targetPEs;
-	final private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(CAPACITY);
-	final private Thread thread;
-
-	/*
-	 * Streams send event of a given type using a specific key to target
-	 * processing elements.
-	 */
-	public Stream(App app, String name, KeyFinder<T> finder,
-			ProcessingElement... processingElements) {
-
-		app.addStream(this);
-		this.name = name;
-
-		if (finder == null) {
-			this.key = null;
-		} else {
-			this.key = new Key<T>(finder, DEFAULT_SEPARATOR);
-		}
-		this.targetPEs = processingElements;
-
-		/* Start streaming. */
-		/*
-		 * TODO: This is only for prototyping. Comm layer will take care of this
-		 * in the real implementation.
-		 */
-		// logger.trace("Start thread for stream " + name);
-		thread = new Thread(this, name);
-		thread.start();
-	}
-
-	/*
-	 * This constructor will create a broadcast stream. That is, the events will
-	 * be sent to all the PE instances.
-	 */
-	public Stream(App app, String name, ProcessingElement... processingElements) {
-		this(app, name, null, processingElements);
-	}
-
-	public void put(T event) {
-		try {
-			// System.out.println("Remaining capacity in stream " + name + ":" +
-			// queue.remainingCapacity());
-			// System.out.println("PUT: " + event);
-			queue.put(event);
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-			System.exit(-1);
-		}
-	}
-
-	/**
-	 * @return the name
-	 */
-	public String getName() {
-		return name;
-	}
-
-	/**
-	 * @return the key
-	 */
-	public Key<T> getKey() {
-		return key;
-	}
-
-	/**
-	 * @return the list of target processing elements.
-	 */
-	public ProcessingElement[] getTargetPEs() {
-		return targetPEs;
-	}
-
-	public void close() {
-		thread.interrupt();
-	}
-
-	@Override
-	public void run() {
-		while (true) {
-			try {
-				/* Get oldest event in queue. */
-				T event = queue.take();
-
-				/* Send event to each target PE. */
-				for (int i = 0; i < targetPEs.length; i++) {
-
-					if (key == null) {
-						
-						/* Broadcast to all PE instances! */
-						
-						/* STEP 1: find all PE instances. */
-						
-						List<ProcessingElement> pes = targetPEs[i].getAllInstances();
-						
-						/* STEP 2: iterate and pass event to PE instance. */
-						for(ProcessingElement pe : pes) {
-														
-							pe.handleInputEvent(event);
-						}
-
-					} else {
-						
-						/* We have a key, send to target PE. */
-						
-						/* STEP 1: find the PE instance for key. */
-						ProcessingElement pe = targetPEs[i]
-								.getInstanceForKey(key.get(event));
-
-						/* STEP 2: pass event to PE instance. */
-						pe.handleInputEvent(event);
-					}
-				}
-
-			} catch (InterruptedException e) {
-				logger.info("Closing stream {}.", name);
-				return;
-			}
-		}
-	}
-}