In this example we show how to design a pluggable probabilistic modeling framework in S4. The task is to classify incoming objects into well defined categories. Each object is represented by an observation vector that represents the object, also called features of the object. The variability and consistency of the features within a category and between categories will determine the accuracy of the classifier and the complexity of the models. Because it is impossible to achieve perfect accuracy in real-world systems, we use probabilistic models to classify the objects. So instead of just assigning a category to each object, the model will provide the probability that the object belongs to a category. The final decision may depend on several factors, for example, the cost of wrongly assign a category to an object may not be the same for all categories. In this example, we assume that the cost is the same for all categories and simply select the category whose model has the highest probability.
Probabilistic models are widely used in many application areas. A distributed systems may be needed when decisions need to be made with low delay and by processing a large number of incoming events. S4 is designed to scale to an unlimited number of nodes providing the flexibility to run complex algorithms, process at high data rates, and deliver results with low latency. Typical applications include processing user events in web applications, analyzing financial market data, monitoring network traffic, and many more.
To learn more, get a copy of this classic book: “Pattern Classification” by R. Duda, P. Hart, and D. Stork.
In this example we implemented an application that uses a train data set to estimate model parameters. Because most estimation algorithms learn iteratively, we inject the train data set several times until a stop condition is achieved. To train the models, we run S4 in batch mode. That is, we push the data at the highest possible rate and when a queue fills up, we let the system block until more space in the queues become available. In other words, no event data is ever lost in this process. To achieve this, we remove all latency constraints and let the process run for as long as needed until all the data is processed. This approach is quite similar to MapReduce except that the data is injected sequentially from a single source.
Once the model parameters are estimated we are ready to run a test. In real-time applications, we would have no control over the speed of the incoming data vectors. If we didn't have sufficient computing resources to process all the data within the time constraints, we would be forced to either lose some of the data (load shedding) or switch to a less complex classification algorithm. In this example, we simply assume that there is no data loss.
To evaluate the application we use a publicly available labeled data set to predict forest cover type. For details about the data set please download the paper published by the author of this work. (PDF)
Here is a description of the data. There are 7 types of forest cover. Each observation vector represents an area of forest with a type of cover and ten measurements (Elevation, Aspect, Slope, etc.) There is a total of 581,012 observation vectors in the data set.
Here are the steps I used to download and prepare the data files. The files are located in the project under src/main/resources/.
# Download data set and uncoompress. wget http://kdd.ics.uci.edu/databases/covertype/covtype.data.gz gunzip covtype.data.gz # Remove some columns and put the class label in the first column. gawk -F "," '{print $55, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10}' covtype.data > covtype-modified.data # Randomize data set. sort -R covtype-modified.data > covtype-random.data # Check number of data points. wc -l covtype-* # 581012 covtype-modified.data # 581012 covtype-random.data # Create a train and a test set. tail -100000 covtype-random.data > covtype-test.data head -481012 covtype-random.data > covtype-train.data wc -l covtype-{train,test}.data # 481012 covtype-train.data # 100000 covtype-test.data # 581012 total
The modeling package io.s4.model provides a basic interface for implementing probabilistic models. The main methods are:
The abstract class io.s4.model.Model is used to build the models. There are no dependencies with any specific implementation of a model because the code only uses Model to estimate and run the classifier.
There are two concrete implementation of Model:
More model can be implemented and thanks to object oriented design, swapping models is as easy as editing one line of code in the Guice Module.
When deployed in a cluster the same code will be run on many physical servers without changing any application code. As an application developer you don't have to worry about how distributed processing works. Scientist can focus on writing model code and dropping it in the right place. Moreover, the same code can be used to run experiments in batch mode and to deploy in a real-time production environment.
Here is the basic structure of the program:
We choose to use events of type ObsEvent to communicate between Processing Elements and ResultEvent to send the results to the MetricsPE. The events are immutable and can only be created using the constructor. The ObsEvent fields are:
Here is a snippet of ObsEvent.java:
public class ObsEvent extends Event { final private float[] obsVector; final private float prob; final private long index; final private int classId; final private int hypId; public ObsEvent(long index, float[] obsVector, float prob, int classId, int hypId) { this.obsVector = obsVector; this.prob = prob; this.index = index; this.classId = classId; this.hypId = hypId; }
The application graph is defined in MyApp To define a graph we simply connect processing elements to streams. When teh objects in the graph are created, they are simply a representation of the application. The real work is done by the processing element instances that may reside anywhere in the cluster.
Notice that the application graph has a cycle (events go from ModelPE to MaximizerPE and back). This creates a minor challenge to create the application graph. To solve this problem we added a setter method to set the distanceStream in ClusterPE.
For testing we follow the following steps:
We first run the classifier using the Gaussian model, that is, we model each class using a Gaussian probability density function for which we need to estimate its parameters (mean and variance).
To run the experiment, we bind the Model type to the GaussianModel class in Module.java as follows:
With this binding the GaussianModel instance will be injected in the Controller constructor.
Next, we edit the properties file as follows:
model.train_data = /covtype-train.data.gz model.test_data = /covtype-test.data.gz model.logger.level = DEBUG model.num_iterations = 1 model.vector_size = 10 model.output_interval_in_seconds = 2
In the properties file we configure the data sets for training and testing, the logger level, the number of iterations (we only need one iteration to estimate the mean and variance), the vector size which is 10 for this data set and how often we want to print partial results, we choose 2-second intervals. A final result will be printed by the controller at the end of the experiment.
To run using Gradle, make sure you set the Main class in build.gradle to:
mainClassName = "io.s4.example.model.Main"
To run the experiment type:
gradlew run
and after a few seconds we get the result:
Confusion Matrix [%]: 0 1 2 3 4 5 6 ---------------------------------------- 0: 67.4 25.2 0.7 0.0 1.0 0.3 5.5 1: 24.1 65.8 4.6 0.0 2.3 1.9 1.4 2: 0.0 19.6 64.3 3.7 0.3 12.0 0.0 3: 0.0 0.4 38.6 48.8 0.0 12.2 0.0 4: 0.0 69.0 4.8 0.0 24.0 2.2 0.0 5: 0.0 18.3 47.3 2.3 0.5 31.5 0.0 6: 70.5 0.6 0.7 0.0 0.0 0.0 28.2 Accuracy: 63.1% - Num Observations: 100000
The observation vectors are correctly categorized in an independent data set at a rate of 63.1%. Note that 85% of the observations are in categories 0 and 1. The classifier learned this fact and relied on the prior probabilities to optimize the overall accuracy of the classifier. That's why accuracy is higher for these categories. For example, only 3.5% of the observations are in category 6 so the low accuracy of 28.2% has little impact on the overall accuracy. Depending on the application, this may or may not be the right optimization approach.
Next we, want to try the more sophisticated GaussianMixtureModel. We changed the properties file as follows:
model.train_data = /covtype-train.data.gz model.test_data = /covtype-test.data.gz model.logger.level = DEBUG model.num_gaussians = 1 model.num_iterations = 2 model.vector_size = 10 model.output_interval_in_seconds = 2
Note that we are only using one Gaussian per mixture which is equivalent to using the GaussianModel so we expect the results to be identical. We need 2 iterations because the model uses the first pass to estimate the mean and variance of the data. This is only useful when using more than one mixture component.
We changed the Module class as follows:
and we run the experiment again:
gradlew run
The result is identical as expected:
Confusion Matrix [%]: 0 1 2 3 4 5 6 ---------------------------------------- 0: 67.4 25.2 0.7 0.0 1.0 0.3 5.5 1: 24.1 65.8 4.6 0.0 2.3 1.9 1.4 2: 0.0 19.6 64.3 3.7 0.3 12.0 0.0 3: 0.0 0.4 38.6 48.8 0.0 12.2 0.0 4: 0.0 69.0 4.8 0.0 24.0 2.2 0.0 5: 0.0 18.3 47.3 2.3 0.5 31.5 0.0 6: 70.5 0.6 0.7 0.0 0.0 0.0 28.2 Accuracy: 63.1% - Num Observations: 100000
Now let's increase the number of mixture components to two Gaussian distributions per category:
model.train_data = /covtype-train.data.gz model.test_data = /covtype-test.data.gz model.logger.level = DEBUG model.num_gaussians = 2 model.num_iterations = 6 model.vector_size = 10 model.output_interval_in_seconds = 2
Confusion Matrix [%]: 0 1 2 3 4 5 6 ---------------------------------------- 0: 66.4 27.8 0.1 0.0 1.5 0.4 3.8 1: 24.9 63.3 3.6 0.1 4.5 3.0 0.6 2: 0.0 13.1 65.7 8.2 1.0 12.0 0.0 3: 0.0 0.4 17.0 80.9 0.0 1.7 0.0 4: 5.1 50.0 3.0 0.0 37.2 4.7 0.0 5: 0.0 15.8 39.4 7.5 1.0 36.3 0.0 6: 71.4 1.1 0.0 0.0 0.0 0.0 27.6 Accuracy: 62.2% - Num Observations: 100000
The overall accuracy went down from 63.1% to 62.2%. However, we can see a dramatic improvement in category 3 (from 48.8% to 80.9%) at the cost of a slight degradation in categories 0 and 1. Clearly, using two Gaussians per category helped category three.
To improve the accuracy of the classifier, one could do some additional analysis and come up with an improved model until the accuracy is acceptable for the target application. For example, why are so many category 6 observations classified as category 0? Maybe we need a different number of mixtures per category to allocate more parameters to the categories with more training data and fewer to the other ones. Give it a try and let me know. I will add any models that get better overall accuracy than this one.
I tested the execution speed on a single node configuration in a MacBook air with a Core i5 CPU and 4GB of RAM. The data is read from the solid state disk and uncompressed using gzip in every iteration. Initialization time is excluded from the measurements. Because I used only one node and all the data is local, there is no network overhead involved.
The following results are for the GaussianMixture Model with 2 components per mixture and 6 iterations.
Total training time was 33 seconds. Training time per observation was 69 microseconds. Training time per observation per iteration was 11 microseconds. Total testing time was 4 seconds. Testing time per observation was 8 microseconds.
Based on this number, the data rate at which we injected data for training was (1/11 microseconds) or 90,000 observations per second. If we look at the ObsEvent class, the effective number of bits transmitted per event is:
10 x float + 1 x float + 2 x int + 1 x long = 11 x 32 + 2 x 32 + 1 x 64 = 480 bits / observation
This results in an injected data rate of 90,000 x 480 bits/sec = 43 mbps
On an Ubuntu machine with an Intel Core i7-860 processor the average time per observation-iteration was 8 microseconds or about 30% faster running at a data rate of 56 mbps.
This is just to get an idea of the execution speed before even thinking about how to optimize. The throughput will vary greatly depending on the complexity of the algorithm and the hardware configuration.
Please share your feedback at: http://groups.google.com/group/s4-project/topics