Improved kmeans added README file.
diff --git a/etc/s4-kmeans-example.png b/etc/s4-kmeans-example.png
new file mode 100644
index 0000000..505dd40
--- /dev/null
+++ b/etc/s4-kmeans-example.png
Binary files differ
diff --git a/src/main/java/io/s4/ProcessingElement.java b/src/main/java/io/s4/ProcessingElement.java
index 47a92b2..720e81b 100644
--- a/src/main/java/io/s4/ProcessingElement.java
+++ b/src/main/java/io/s4/ProcessingElement.java
@@ -20,8 +20,13 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public abstract class ProcessingElement implements Cloneable {
 
+	Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
+
     final protected App app;
     final protected Map<String, ProcessingElement> peInstances = new ConcurrentHashMap<String, ProcessingElement>();
     protected String id=""; // PE instance id
@@ -54,6 +59,11 @@
         return app;
     }
 
+    public int getNumPEInstances() {
+    	
+    	return peInstances.size();
+    }
+    
     synchronized public void handleInputEvent(Event event) {
 
         processInputEvent(event);
@@ -111,6 +121,8 @@
             pe.initPEInstance();
             peInstances.put(id, pe);
             pe.id = id;
+            
+            logger.trace("Num PE instances: {}.", getNumPEInstances());
         }
         return pe;
     }
diff --git a/src/main/java/io/s4/example/kmeans/ClusterPE.java b/src/main/java/io/s4/example/kmeans/ClusterPE.java
index 759a00f..d1aa9a7 100644
--- a/src/main/java/io/s4/example/kmeans/ClusterPE.java
+++ b/src/main/java/io/s4/example/kmeans/ClusterPE.java
@@ -31,6 +31,7 @@
 
 	Logger logger = LoggerFactory.getLogger(ClusterPE.class);
 
+	final private int numClusters;
 	final private int vectorSize;
 	final private long numVectors;
 	private Stream<ObsEvent> distanceStream;
@@ -38,13 +39,17 @@
 	private float[] centroid;
 	private long obsCount = 0;
 	private float[] obsSum;
+	private float totalDistance = 0f;
+	private int[][] confusionMatrix;
 
 	public ClusterPE(App app, int numClusters, int vectorSize, long numVectors,
 			float[][] centroids) {
 		super(app);
+		this.numClusters = numClusters;
 		this.vectorSize = vectorSize;
 		this.numVectors = numVectors;
 
+		confusionMatrix = new int[numClusters][numClusters];
 		/*
 		 * The ClusterPE instances are not event driven. That is they are
 		 * created here before events start to flow.
@@ -60,22 +65,23 @@
 	}
 
 	public void setStream(Stream<ObsEvent> distanceStream) {
-		
+
 		/* Init prototype. */
 		this.distanceStream = distanceStream;
-		
-		/* We also need to set the stream in the instances 
-		 * we created in the constructor. 
-		 * */
+
+		/*
+		 * We also need to set the stream in the instances we created in the
+		 * constructor.
+		 */
 		List<ProcessingElement> pes = this.getAllInstances();
-		
+
 		/* STEP 2: iterate and pass event to PE instance. */
-		for(ProcessingElement pe : pes) {
-										
-			((ClusterPE)pe).distanceStream = distanceStream;
+		for (ProcessingElement pe : pes) {
+
+			((ClusterPE) pe).distanceStream = distanceStream;
 		}
 	}
-	
+
 	public void setClusterId(int clusterId) {
 		this.clusterId = clusterId;
 	}
@@ -84,22 +90,50 @@
 		this.centroid = centroid;
 	}
 
-	synchronized private void updateTotalObsCount() {
-				
-		/* Total obs count. We use the obsCount field in the prototype. */
+	public long getObsCount() {
+		return obsCount;
+	}
+
+	synchronized private void updateTotalStats(ObsEvent event) {
+
+		/* Update global stats in the prototype in the prototype. */
 		ClusterPE clusterPEPrototype = (ClusterPE) pePrototype;
 		clusterPEPrototype.obsCount++;
+		clusterPEPrototype.totalDistance += event.getDistance();
+		clusterPEPrototype.confusionMatrix[event.getClassId()][event.getHypId()] += 1;
 
-		logger.trace("ClusterID: " + clusterId + ", Count: " +  clusterPEPrototype.obsCount);
+		logger.trace("Index: " + event.getIndex() + ", Label: "
+				+ event.getClassId() + ", Hyp: " + event.getHypId()
+				+ ", Total Count: " + clusterPEPrototype.obsCount
+				+ ", Total Dist: " + clusterPEPrototype.totalDistance);
+
+		/* Log info. */
+		if (clusterPEPrototype.obsCount % 10000 == 0) {
+			logger.info("Processed {} events", clusterPEPrototype.obsCount);
+			logger.info("Average distance is {}.",
+					clusterPEPrototype.totalDistance
+							/ clusterPEPrototype.obsCount);
+		}
 
 		if (clusterPEPrototype.obsCount == numVectors) {
 
 			/* Done processing training set. */
 
-			logger.trace(">>> ClusterID: " + clusterId + ", Final Count: " +  clusterPEPrototype.obsCount);
+			logger.info("Final Count: {}.", clusterPEPrototype.obsCount);
+			logger.info("Final Average Distance: {}.",
+					clusterPEPrototype.totalDistance
+							/ clusterPEPrototype.obsCount);
 
-			/* Reset global count. */
-			clusterPEPrototype.obsCount = 0;
+			for (int i = 0; i < numClusters; i++)
+				for (int j = 0; j < numClusters; j++) {
+					
+					Object[] paramArray = { i, j,
+							clusterPEPrototype.confusionMatrix[i][j] };
+					
+					logger.info(
+							"Final Count of class {} classified as {}: {}.",
+							paramArray);
+				}
 
 			/* Update centroids. */
 			for (Map.Entry<String, ProcessingElement> entry : peInstances
@@ -110,6 +144,11 @@
 				pe.updateCentroid();
 
 			}
+
+			/* Reset global stats. */
+			clusterPEPrototype.obsCount = 0;
+			clusterPEPrototype.totalDistance = 0f;
+			clusterPEPrototype.confusionMatrix = new int[numClusters][numClusters];
 		}
 	}
 
@@ -154,7 +193,7 @@
 			/* Process raw event. */
 			float dist = distance(obs);
 			ObsEvent outEvent = new ObsEvent(inEvent.getIndex(), obs, dist,
-					clusterId);
+					inEvent.getClassId(), clusterId);
 			logger.trace("IN: " + inEvent.toString());
 			logger.trace("OUT: " + outEvent.toString());
 			distanceStream.put(outEvent);
@@ -167,8 +206,14 @@
 			/* Update obs count for this class. */
 			obsCount++;
 
-			/* Update total obs count. */
-			updateTotalObsCount();
+			/* Log info. */
+			if (obsCount % 1000 == 0) {
+				logger.info("Labeled {} events with class id {}", obsCount,
+						clusterId);
+			}
+
+			/* Update total obs count and distance. */
+			updateTotalStats(inEvent);
 
 			for (int i = 0; i < vectorSize; i++) {
 				obsSum[i] += obs[i];
diff --git a/src/main/java/io/s4/example/kmeans/DataController.java b/src/main/java/io/s4/example/kmeans/DataController.java
index 5ac6155..00f4977 100644
--- a/src/main/java/io/s4/example/kmeans/DataController.java
+++ b/src/main/java/io/s4/example/kmeans/DataController.java
@@ -25,9 +25,12 @@
 
 public class DataController {
 
-	final private String TRAIN_FILENAME = "/covtype-train-1000.data.gz";
+	// final private String TRAIN_FILENAME = "/covtype-train-1000.data.gz"; //
+	// small file for debugging.
+	final private String TRAIN_FILENAME = "/covtype-train.data.gz";
 	final private String TEST_FILENAME = "/covtype-test.data.gz";
 	final private int MAX_NUM_CLASSES = 10;
+	final private int NUM_ITERATIONS = 10;
 	final private long numTrainVectors;
 	final private long numTestVectors;
 	private int vectorSize;
@@ -56,15 +59,33 @@
 
 			logger.info("Init app.");
 			app.init();
-			
-			injectData(app, TRAIN_FILENAME);
-			
+
+			for (int i = 0; i < NUM_ITERATIONS; i++) {
+				logger.info("Starting iteration {}.", i);
+				injectData(app, TRAIN_FILENAME);
+
+				/*
+				 * Make sure all the data has been processed. ClusterPE will
+				 * reset the total count after all the data is processed so we
+				 * wait until the count is equal to zero.
+				 */
+				while (app.getObsCount() > 0) {
+					Thread.sleep(100);
+				}
+			}
+
+			/* Done. */
+			app.remove();
+
 		} catch (FileNotFoundException e) {
-			// TODO Auto-generated catch block
 			e.printStackTrace();
+			logger.error(e.getMessage());
 		} catch (IOException e) {
-			// TODO Auto-generated catch block
 			e.printStackTrace();
+			logger.error(e.getMessage());
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+			logger.error(e.getMessage());
 		}
 	}
 
@@ -76,13 +97,17 @@
 		for (String line : data) {
 
 			String[] result = line.split("\\s");
+			
+			/* Class ID range starts in 1, shift to start in zero. */
+			int classID = Integer.parseInt(result[0]) - 1;
 
 			float[] vector = new float[vectorSize];
 			for (int j = 0; j < vectorSize; j++) {
 
 				vector[j] = Float.parseFloat(result[j + 1]);
 			}
-			app.injectData(count++, vector);
+			ObsEvent obsEvent = new ObsEvent(count++, vector, -1.0f, classID, -1);
+			app.injectData(obsEvent);
 		}
 		data.close();
 	}
diff --git a/src/main/java/io/s4/example/kmeans/HypIDKeyFinder.java b/src/main/java/io/s4/example/kmeans/HypIDKeyFinder.java
new file mode 100644
index 0000000..05760da
--- /dev/null
+++ b/src/main/java/io/s4/example/kmeans/HypIDKeyFinder.java
@@ -0,0 +1,22 @@
+package io.s4.example.kmeans;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.s4.KeyFinder;
+
+public class HypIDKeyFinder implements
+		KeyFinder<ObsEvent> {
+
+	@Override
+	public List<String> get(ObsEvent event) {
+		
+		List<String> results = new ArrayList<String>();
+        
+        /* Retrieve the user ID and add it to the list. */
+        results.add(Integer.toString(event.getHypId()));
+        
+        return results;   
+	}
+
+}
diff --git a/src/main/java/io/s4/example/kmeans/Main.java b/src/main/java/io/s4/example/kmeans/Main.java
index f4ad12e..6d0d621 100644
--- a/src/main/java/io/s4/example/kmeans/Main.java
+++ b/src/main/java/io/s4/example/kmeans/Main.java
@@ -16,6 +16,11 @@
  */
 package io.s4.example.kmeans;
 
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+
 public class Main {
 
 	
@@ -24,6 +29,9 @@
 	 */
 	public static void main(String[] args) {
 
+		Logger root = (Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+		root.setLevel(Level.DEBUG);
+		
 		DataController dataController = new DataController();
 		dataController.start();
 	}
diff --git a/src/main/java/io/s4/example/kmeans/MinimizerPE.java b/src/main/java/io/s4/example/kmeans/MinimizerPE.java
index 7beb1f2..0046efa 100644
--- a/src/main/java/io/s4/example/kmeans/MinimizerPE.java
+++ b/src/main/java/io/s4/example/kmeans/MinimizerPE.java
@@ -32,7 +32,7 @@
 	final private Stream<ObsEvent> assignmentStream;
 	private int numEventsReceived = 0;
 	private float minDistance = Float.MAX_VALUE;
-	private int minClusterID;
+	private int hypID;
 
 	public MinimizerPE(App app, int numClusters, Stream<ObsEvent> assignmentStream) {
 		super(app);
@@ -48,14 +48,14 @@
 		
 		if(inEvent.getDistance() < minDistance) {
 			minDistance = inEvent.getDistance();
-			minClusterID = inEvent.getClassId();
+			hypID = inEvent.getHypId();
 		}
 		
 		if( ++numEventsReceived == numClusters) {
 			
 			/* Got all the distances. Send class id with minimum distance. */
 			ObsEvent outEvent = new ObsEvent(inEvent.getIndex(), obs,
-					minDistance, minClusterID);
+					minDistance, inEvent.getClassId(), hypID);
 			
 			logger.trace("IN: " + inEvent.toString());
 			logger.trace("OUT: " + outEvent.toString());
diff --git a/src/main/java/io/s4/example/kmeans/ObsEvent.java b/src/main/java/io/s4/example/kmeans/ObsEvent.java
index 6e28b5e..fd58ef0 100644
--- a/src/main/java/io/s4/example/kmeans/ObsEvent.java
+++ b/src/main/java/io/s4/example/kmeans/ObsEvent.java
@@ -24,36 +24,61 @@
 	final private float distance;
 	final private long index;
 	final private int classId;
-	
-	public ObsEvent(long index, float[] obsVector, float distance, int classId) {
+	final private int hypId;
+
+	public ObsEvent(long index, float[] obsVector, float distance, int classId,
+			int hypId) {
 		this.obsVector = obsVector;
 		this.distance = distance;
 		this.index = index;
 		this.classId = classId;
+		this.hypId = hypId;
 	}
 
+	/**
+	 * @return the observed data vector.
+	 */
 	public float[] getObsVector() {
 		return obsVector;
 	}
 
+	/**
+	 * @return the distance between the observed vector and the class centroid.
+	 *         Use -1.0 when unknown.
+	 */
 	public float getDistance() {
 		return distance;
 	}
 
+	/**
+	 * @return the index of the data vector.
+	 */
 	public long getIndex() {
 		return index;
 	}
-	
+
+	/**
+	 * @return the true class of the vector.
+	 */
 	public int getClassId() {
 		return classId;
 	}
-	
+
+	/**
+	 * @return the hypothesized class of the vector. Use -1 when unknown.
+	 */
+	public int getHypId() {
+		return hypId;
+	}
+
 	public String toString() {
-		
-		StringBuilder vector = new StringBuilder();;
-		for(int i=0; i < obsVector.length; i++) {
+
+		StringBuilder vector = new StringBuilder();
+		;
+		for (int i = 0; i < obsVector.length; i++) {
 			vector.append(obsVector[i] + " ");
 		}
-        return "Idx: " + index + ", Label: " + classId + ", Dist: " + distance + ", Obs: " + vector.toString();
-    }
+		return "Idx: " + index + ", Class: " + classId + ", Hyp:" + hypId
+				+ ", Dist: " + distance + ", Obs: " + vector.toString();
+	}
 }
diff --git a/src/main/java/io/s4/example/kmeans/README.md b/src/main/java/io/s4/example/kmeans/README.md
new file mode 100644
index 0000000..c70ab88
--- /dev/null
+++ b/src/main/java/io/s4/example/kmeans/README.md
@@ -0,0 +1,66 @@
+Implementation of the K-Means Algorithm in S4
+=============================================
+
+The [k-means algorithm](http://en.wikipedia.org/wiki/K-means_clustering) can be used for unsupervised clustering of multivariate data. In this example
+we use a [data set to predict forest cover type](http://kdd.ics.uci.edu/databases/covertype/covertype.html).
+There is also a paper published for the author of this work ([PDF](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.128.2475&rep=rep1&type=pdf))
+
+## The Forest Cover Data Set
+
+Here are the steps I used to prepare the data files. They are located under src/main/resources/
+
+	# Download data set and uncoompress.
+	wget http://kdd.ics.uci.edu/databases/covertype/covtype.data.gz
+	gunzip covtype.data.gz 
+
+	# Remove some columns and put the class label in the first column.
+	gawk -F "," '{print $55, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10}' covtype.data  > covtype-modified.data
+
+	# Randomize data set.
+	sort -R covtype-modified.data > covtype-random.data
+
+	# Check number of data points.
+	wc -l covtype-*
+	#  581012 covtype-modified.data
+	#  581012 covtype-random.data
+
+	# Create a train and a test set.
+	tail -100000 covtype-random.data > covtype-test.data
+	head -481012 covtype-random.data > covtype-train.data
+
+	wc -l covtype-{train,test}.data
+	#  481012 covtype-train.data
+	#  100000 covtype-test.data
+	#  581012 total
+
+## Application Graph
+
+To estimate the centroids of the clusters using k-means, we need to follow these steps:
+
+* Initialize the centroids by picking k vectors at random from the data set.
+* Inject the observation data vectors.
+* Compute the euclidean distance between each observation and the centroids.
+* Select the id of the cluster with the smallest distance (the hypothesized class for that observation).
+* Repeat for all the vectors in the data set and compute the average distance for the data set.
+* Reestimate the centroids by computing the mean for each clusters using the observations and their hypothesized class.
+* Repeat the whole process by reinjecting the data until the average distance converges.
+
+Clearly, estimating the k-means centroids requires batch processing. That is we pass the same data through the application several times and we know exactly how many observation vectors we have available. With this knowledge we design the following application graph:
+
+![S4 Counter](https://github.com/leoneu/s4-piper/raw/master/etc/s4-kmeans-example.png)
+
+We choose to use the same EventType to communicate across Processing Elements:
+
+<script src="https://gist.github.com/1148118.js?file=ObsEvent.java"></script>
+
+The event is immutable and can only be created using the constructor. The fields are:
+
+* _obsVector_ is the observation vecotr. The size of the float array should be the same for all the vectors.
+* _distance_ is the euclidean distance between the vector and the centroid.
+* _index_ is a unique identifider for the event. 
+* _classId_ is the true class for the vector as it was labeled in the original data set.
+* _hypId_ is the hypothesized class for the vector after using the classification algorithm.
+
+ 
+
+
diff --git a/src/main/java/io/s4/example/kmeans/kMeansTrainer.java b/src/main/java/io/s4/example/kmeans/kMeansTrainer.java
index 38fe6c7..7b2c0f4 100644
--- a/src/main/java/io/s4/example/kmeans/kMeansTrainer.java
+++ b/src/main/java/io/s4/example/kmeans/kMeansTrainer.java
@@ -32,6 +32,8 @@
 	private float[][] initialCentroids;
 	private Stream<ObsEvent> obsStream;
 
+	private ClusterPE clusterPE;
+
 	public kMeansTrainer(int numClusters, int vectorSize, long numVectors,
 			float[][] initialCentroids) {
 		super();
@@ -41,12 +43,11 @@
 		this.initialCentroids = initialCentroids;
 	}
 
-	public void injectData(int index, float[] obs) {	
-		ObsEvent obsEvent = new ObsEvent(index, obs, -1.0f, -1);
+	public void injectData(ObsEvent obsEvent) {
 		logger.trace("Inject: " + obsEvent.toString());
-			obsStream.put(obsEvent);
+		obsStream.put(obsEvent);
 	}
-	
+
 	@Override
 	protected void start() {
 
@@ -55,11 +56,11 @@
 	@Override
 	protected void init() {
 
-		ClusterPE clusterPE = new ClusterPE(this, numClusters, vectorSize,
-				numVectors, initialCentroids);
+		clusterPE = new ClusterPE(this, numClusters, vectorSize, numVectors,
+				initialCentroids);
 
 		Stream<ObsEvent> assignmentStream = new Stream<ObsEvent>(this,
-				"Assignment Stream", new ClusterIDKeyFinder(), clusterPE);
+				"Assignment Stream", new HypIDKeyFinder(), clusterPE);
 
 		MinimizerPE minimizerPE = new MinimizerPE(this, numClusters,
 				assignmentStream);
@@ -73,11 +74,11 @@
 		 */
 		clusterPE.setStream(distanceStream);
 
-		/* This stream will send events of type ObsEvent to ALL the PE 
-		 * instances in clusterPE. 
-		 * */
-		obsStream = new Stream<ObsEvent>(this, "Observation Stream",
-				clusterPE);
+		/*
+		 * This stream will send events of type ObsEvent to ALL the PE instances
+		 * in clusterPE.
+		 */
+		obsStream = new Stream<ObsEvent>(this, "Observation Stream", clusterPE);
 	}
 
 	@Override
@@ -85,4 +86,13 @@
 		// TODO Auto-generated method stub
 
 	}
+
+	public long getObsCount() {
+
+		return clusterPE.getObsCount();
+	}
+
+	public void remove() {
+		removeAll();
+	}
 }
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 6dac308..6b246ee 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -8,7 +8,7 @@
     </encoder>
   </appender>
 
-  <root level="trace">
+  <root level="info">
     <appender-ref ref="STDOUT" />
   </root>
 </configuration>