HORN-14: Add MNIST performance evaluator
diff --git a/README.md b/README.md
index f30ebc6..cb217f6 100644
--- a/README.md
+++ b/README.md
@@ -23,30 +23,29 @@
     public void backward(
         Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
         throws IOException {
+      double gradient = 0;
       for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
         // Calculates error gradient for each neuron
-        double gradient = this.squashingFunction.applyDerivative(this
-            .getOutput()) * (m.getDelta() * m.getWeight());
-        this.backpropagate(gradient);
+        double gradient += (m.getDelta() * m.getWeight());
 
         // Weight corrections
         double weight = -this.getLearningRate() * this.getOutput()
             * m.getDelta() + this.getMomentumWeight() * m.getPrevWeight();
         this.push(weight);
       }
+
+      this.backpropagate(gradient
+          * this.squashingFunction.applyDerivative(this.getOutput()));
     }
   }
 ```
-The advantages of this programming model are:
-
- * Easy and intuitive to use
- * Flexible to make your own CUDA kernels
- * Allows multithreading to be used internally
+The advantages of this programming model is easy and intuitive to use.
 
 Also, Apache Horn provides a simplified and intuitive configuration interface. To create neural network job and submit it to existing Hadoop or Hama cluster, we just add the layer with its properties such as squashing function and neuron class. The below example configures the create 4-layer neural network with 500 neurons in hidden layers for train MNIST dataset:
 ```Java
   HornJob job = new HornJob(conf, MultiLayerPerceptron.class);
   job.setLearningRate(learningRate);
+  job.setTrainingMethod(TrainingMethod.GRADIENT_DESCENT);
   ..
 
   job.inputLayer(784, Sigmoid.class, StandardNeuron.class);
@@ -56,6 +55,20 @@
   job.setCostFunction(CrossEntropy.class);
 ```
 
+## Quick Run Example
+
+Download a MNIST training and label datasets, and convert into a HDFS sequence file with following command:
+```
+ % bin/horn jar horn-0.x.0.jar MNISTConverter train-images.idx3-ubyte train-labels.idx1-ubyte /tmp/mnist.seq 
+```
+
+Then, train it with following command (in this example, we used η 0.002, λ 0.1, 100 hidden units, and minibatch 10):
+```
+ % bin/horn jar horn-0.x.0.jar MultiLayerPerceptron /tmp/model /tmp/mnist.seq \
+   0.002 0.0 0.1 784 100 10 10 12000
+ 
+```
+
 ## High Scalability
 
 The Apache Horn is an Sync and Async hybrid distributed training framework. Within single BSP job, each task group works asynchronously using region barrier synchronization instead of global barrier synchronization, and trains large-scale neural network model using assigned data sets in synchronous way.
diff --git a/conf/horn-env.sh b/conf/horn-env.sh
index 9f8d9c2..a033fe0 100644
--- a/conf/horn-env.sh
+++ b/conf/horn-env.sh
@@ -22,5 +22,5 @@
 # Set environment variables here.
 
 # The java implementation to use.  Required.
-export JAVA_HOME=/usr/lib/jvm/java-8-oracle
+export JAVA_HOME=/usr/lib/jvm/java-8-oracle/
 
diff --git a/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
index b0162ab..e415a25 100644
--- a/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
+++ b/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
@@ -28,6 +28,8 @@
 import org.apache.hama.commons.math.DoubleFunction;
 import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
+import org.apache.horn.core.Constants.LearningStyle;
+import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.funcs.FunctionFactory;
 
 import com.google.common.base.Preconditions;
@@ -45,10 +47,14 @@
  */
 abstract class AbstractLayeredNeuralNetwork extends AbstractNeuralNetwork {
 
+  private static final double DEFAULT_REGULARIZATION_WEIGHT = 0;
   private static final double DEFAULT_MOMENTUM_WEIGHT = 0.1;
 
   double trainingError;
 
+  /* The weight of regularization */
+  protected double regularizationWeight;
+
   /* The momentumWeight */
   protected double momentumWeight;
 
@@ -62,16 +68,8 @@
   
   protected LearningStyle learningStyle;
 
-  public static enum TrainingMethod {
-    GRADIENT_DESCENT
-  }
-  
-  public static enum LearningStyle {
-    UNSUPERVISED,
-    SUPERVISED
-  }
-  
   public AbstractLayeredNeuralNetwork() {
+    this.regularizationWeight = DEFAULT_REGULARIZATION_WEIGHT;
     this.momentumWeight = DEFAULT_MOMENTUM_WEIGHT;
     this.trainingMethod = TrainingMethod.GRADIENT_DESCENT;
     this.learningStyle = LearningStyle.SUPERVISED;
@@ -81,6 +79,38 @@
     super(conf, modelPath);
   }
 
+  /**
+   * Set the regularization weight. Recommend in the range [0, 0.1), More
+   * complex the model is, less weight the regularization is.
+   * 
+   * @param regularizationWeight
+   */
+  public void setRegularizationWeight(double regularizationWeight) {
+    Preconditions.checkArgument(regularizationWeight >= 0
+        && regularizationWeight < 1.0,
+        "Regularization weight must be in range [0, 1.0)");
+    this.regularizationWeight = regularizationWeight;
+  }
+
+  public double getRegularizationWeight() {
+    return this.regularizationWeight;
+  }
+
+  /**
+   * Set the momemtum weight for the model. Recommend in range [0, 0.5].
+   * 
+   * @param momentumWeight
+   */
+  public void setMomemtumWeight(double momentumWeight) {
+    Preconditions.checkArgument(momentumWeight >= 0 && momentumWeight <= 1.0,
+        "Momentum weight must be in range [0, 1.0]");
+    this.momentumWeight = momentumWeight;
+  }
+
+  public double getMomemtumWeight() {
+    return this.momentumWeight;
+  }
+
   public void setTrainingMethod(TrainingMethod method) {
     this.trainingMethod = method;
   }
@@ -117,7 +147,6 @@
    *          is f(x) = x by default.
    * @return The layer index, starts with 0.
    */
-  @SuppressWarnings("rawtypes")
   public abstract int addLayer(int size, boolean isFinalLayer,
       DoubleFunction squashingFunction, Class<? extends Neuron> neuronClass);
 
@@ -182,6 +211,8 @@
   @Override
   public void readFields(DataInput input) throws IOException {
     super.readFields(input);
+    // read regularization weight
+    this.regularizationWeight = input.readDouble();
     // read momentum weight
     this.momentumWeight = input.readDouble();
 
@@ -203,6 +234,8 @@
   @Override
   public void write(DataOutput output) throws IOException {
     super.write(output);
+    // write regularization weight
+    output.writeDouble(this.regularizationWeight);
     // write momentum weight
     output.writeDouble(this.momentumWeight);
 
diff --git a/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java b/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
index 45f56a3..5624e49 100644
--- a/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
+++ b/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
@@ -22,6 +22,9 @@
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -47,9 +50,10 @@
  * 
  */
 public abstract class AbstractNeuralNetwork implements Writable {
+
   protected HamaConfiguration conf;
   protected FileSystem fs;
-
+  
   private static final double DEFAULT_LEARNING_RATE = 0.5;
 
   protected double learningRate;
@@ -68,21 +72,31 @@
     this.featureTransformer = new DefaultFeatureTransformer();
   }
 
-  public AbstractNeuralNetwork(String modelPath) {
-    this.modelPath = modelPath;
-  }
-
   public AbstractNeuralNetwork(HamaConfiguration conf, String modelPath) {
     try {
       this.conf = conf;
-      this.fs = FileSystem.get(conf);
       this.modelPath = modelPath;
-
       this.readFromModel();
     } catch (IOException e) {
       e.printStackTrace();
     }
+  }
 
+  /**
+   * Set the degree of aggression during model training, a large learning rate
+   * can increase the training speed, but it also decrease the chance of model
+   * converge. Recommend in range (0, 0.3).
+   * 
+   * @param learningRate
+   */
+  public void setLearningRate(double learningRate) {
+    Preconditions.checkArgument(learningRate > 0,
+        "Learning rate must be larger than 0.");
+    this.learningRate = learningRate;
+  }
+
+  public double getLearningRate() {
+    return this.learningRate;
   }
 
   public void isLearningRateDecay(boolean decay) {
@@ -102,19 +116,21 @@
    * @throws ClassNotFoundException 
    * @throws IOException
    */
-  public BSPJob train(Configuration conf) throws ClassNotFoundException, IOException, InterruptedException {
+  public BSPJob train(HamaConfiguration conf) throws ClassNotFoundException, IOException, InterruptedException {
     Preconditions.checkArgument(this.modelPath != null,
         "Please set the model path before training.");
-
     // train with BSP job
-    return trainInternal((HamaConfiguration) conf);
+      return trainInternal(conf);
   }
 
   /**
    * Train the model with the path of given training data and parameters.
+   * 
+   * @param dataInputPath
+   * @param trainingParams
    */
-  protected abstract BSPJob trainInternal(HamaConfiguration hamaConf)
-      throws IOException, InterruptedException, ClassNotFoundException;
+  protected abstract BSPJob trainInternal(HamaConfiguration conf) throws IOException,
+      InterruptedException, ClassNotFoundException;
 
   /**
    * Read the model meta-data from the specified location.
@@ -124,9 +140,18 @@
   protected void readFromModel() throws IOException {
     Preconditions.checkArgument(this.modelPath != null,
         "Model path has not been set.");
-    FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath)));
-    this.readFields(is);
-    Closeables.close(is, false);
+    Configuration conf = new Configuration();
+    FSDataInputStream is = null;
+    try {
+      URI uri = new URI(this.modelPath);
+      FileSystem fs = FileSystem.get(uri, conf);
+      is = new FSDataInputStream(fs.open(new Path(modelPath)));
+      this.readFields(is);
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    } finally {
+      Closeables.close(is, false);
+    }
   }
 
   /**
@@ -137,9 +162,16 @@
   public void writeModelToFile() throws IOException {
     Preconditions.checkArgument(this.modelPath != null,
         "Model path has not been set.");
-
-    FSDataOutputStream is = fs.create(new Path(this.modelPath), true);
-    this.write(is);
+    Configuration conf = new Configuration();
+    FSDataOutputStream is = null;
+    try {
+      URI uri = new URI(this.modelPath);
+      FileSystem fs = FileSystem.get(uri, conf);
+      is = fs.create(new Path(this.modelPath), true);
+      this.write(is);
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
 
     Closeables.close(is, false);
   }
diff --git a/src/main/java/org/apache/horn/core/AutoEncoder.java b/src/main/java/org/apache/horn/core/AutoEncoder.java
index 35a6f66..1b7a406 100644
--- a/src/main/java/org/apache/horn/core/AutoEncoder.java
+++ b/src/main/java/org/apache/horn/core/AutoEncoder.java
@@ -28,6 +28,7 @@
 import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
 import org.apache.hama.ml.util.FeatureTransformer;
+import org.apache.horn.core.Constants.LearningStyle;
 import org.apache.horn.funcs.FunctionFactory;
 
 import com.google.common.base.Preconditions;
@@ -59,7 +60,7 @@
     model.addLayer(inputDimensions, true,
         FunctionFactory.createDoubleFunction("Sigmoid"), null);
     model
-        .setLearningStyle(AbstractLayeredNeuralNetwork.LearningStyle.UNSUPERVISED);
+        .setLearningStyle(LearningStyle.UNSUPERVISED);
     model.setCostFunction(FunctionFactory
         .createDoubleDoubleFunction("SquaredError"));
   }
diff --git a/src/main/java/org/apache/horn/core/Constants.java b/src/main/java/org/apache/horn/core/Constants.java
new file mode 100644
index 0000000..09883af
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/Constants.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+public class Constants {
+  
+  public static enum LearningStyle {
+    UNSUPERVISED,
+    SUPERVISED
+  }
+  
+  public static enum TrainingMethod {
+    GRADIENT_DESCENT
+  }
+  
+}
diff --git a/src/main/java/org/apache/horn/core/HornJob.java b/src/main/java/org/apache/horn/core/HornJob.java
index c95ae36..30e9e88 100644
--- a/src/main/java/org/apache/horn/core/HornJob.java
+++ b/src/main/java/org/apache/horn/core/HornJob.java
@@ -22,6 +22,8 @@
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.commons.math.Function;
+import org.apache.horn.core.Constants.LearningStyle;
+import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.funcs.FunctionFactory;
 
 public class HornJob extends BSPJob {
@@ -77,16 +79,28 @@
     this.conf.setInt("training.batch.size", batchSize);
   }
 
-  public void setLearningRate(double learningRate) {
-    this.conf.setDouble("mlp.learning.rate", learningRate);
+  public void setTrainingMethod(TrainingMethod method) {
+    this.neuralNetwork.setTrainingMethod(method);
   }
-
+  
+  public void setLearningStyle(LearningStyle style) {
+    this.neuralNetwork.setLearningStyle(style);
+  }
+  
+  public void setLearningRate(double learningRate) {
+    this.neuralNetwork.setLearningRate(learningRate);
+  }
+  
   public void setConvergenceCheckInterval(int n) {
     this.conf.setInt("convergence.check.interval", n);
   }
 
   public void setMomentumWeight(double momentumWeight) {
-    this.conf.setDouble("mlp.momentum.weight", momentumWeight);
+    this.neuralNetwork.setMomemtumWeight(momentumWeight);
+  }
+  
+  public void setRegularizationWeight(double regularizationWeight) {
+    this.neuralNetwork.setRegularizationWeight(regularizationWeight);
   }
 
   public LayeredNeuralNetwork getNeuralNetwork() {
@@ -95,7 +109,7 @@
 
   public boolean waitForCompletion(boolean verbose) throws IOException,
       InterruptedException, ClassNotFoundException {
-    BSPJob job = neuralNetwork.train(this.conf);
+    BSPJob job = neuralNetwork.train((HamaConfiguration) this.conf);
     if (verbose) {
       return job.waitForCompletion(true);
     } else {
@@ -103,10 +117,6 @@
     }
   }
 
-  public void setRegularizationWeight(double regularizationWeight) {
-    this.conf.setDouble("regularization.weight", regularizationWeight);
-  }
-
   public void setModelPath(String modelPath) {
     this.conf.set("model.path", modelPath);
     neuralNetwork.setModelPath(modelPath);
diff --git a/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
index c858d11..fe5d3a3 100644
--- a/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
+++ b/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
@@ -33,7 +33,6 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.commons.io.MatrixWritable;
@@ -44,6 +43,8 @@
 import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
 import org.apache.hama.util.ReflectionUtils;
+import org.apache.horn.core.Constants.LearningStyle;
+import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.funcs.FunctionFactory;
 
 import com.google.common.base.Preconditions;
@@ -75,11 +76,9 @@
   protected List<DoubleFunction> squashingFunctionList;
 
   protected List<Class<? extends Neuron>> neuronClassList;
-
+  
   protected int finalLayerIdx;
 
-  protected double regularizationWeight;
-
   public LayeredNeuralNetwork() {
     this.layerSizeList = Lists.newArrayList();
     this.weightMatrixList = Lists.newArrayList();
@@ -90,7 +89,6 @@
 
   public LayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
     super(conf, modelPath);
-    this.regularizationWeight = conf.getDouble("regularization.weight", 0);
   }
 
   @Override
@@ -107,17 +105,6 @@
 
     this.layerSizeList.add(size);
     int layerIdx = this.layerSizeList.size() - 1;
-    
-    if(layerIdx == 0) {
-      LOG.info("Input Layer: " + (size - 1) + " features");
-    } else {
-      if(!isFinalLayer) {
-        LOG.info("Hidden Layer: " + (size - 1) + " neurons with 1 bias");
-      } else {
-        LOG.info("Output Layer: " + size);
-      }
-    }
-    
     if (isFinalLayer) {
       this.finalLayerIdx = layerIdx;
     }
@@ -165,7 +152,6 @@
 
   /**
    * Set the previous weight matrices.
-   * 
    * @param prevUpdates
    */
   void setPrevWeightMatrices(DoubleMatrix[] prevUpdates) {
@@ -277,12 +263,12 @@
     for (Class<? extends Neuron> clazz : this.neuronClassList) {
       output.writeUTF(clazz.getName());
     }
-
+    
     // write squashing functions
     output.writeInt(this.squashingFunctionList.size());
     for (DoubleFunction aSquashingFunctionList : this.squashingFunctionList) {
-      WritableUtils.writeString(output,
-          aSquashingFunctionList.getFunctionName());
+      WritableUtils.writeString(output, aSquashingFunctionList
+              .getFunctionName());
     }
 
     // write weight matrices
@@ -344,10 +330,9 @@
       intermediateOutput = forward(i, intermediateOutput);
       outputCache.add(intermediateOutput);
     }
-
     return outputCache;
   }
-
+  
   /**
    * @param neuronClass
    * @return a new neuron instance
@@ -369,7 +354,10 @@
   protected DoubleVector forward(int fromLayer, DoubleVector intermediateOutput) {
     DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer);
 
-    // TODO use the multithread processing
+    // LOG.info("intermediate: " + intermediateOutput.toString());
+    // DoubleVector vec = weightMatrix.multiplyVectorUnsafe(intermediateOutput);
+    // vec = vec.applyToElements(this.squashingFunctionList.get(fromLayer));
+   
     DoubleVector vec = new DenseDoubleVector(weightMatrix.getRowCount());
     for (int row = 0; row < weightMatrix.getRowCount(); row++) {
       List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
@@ -390,14 +378,13 @@
       }
       vec.set(row, n.getOutput());
     }
-
+    
     // add bias
     DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1);
     vecWithBias.set(0, 1);
     for (int i = 0; i < vec.getDimension(); ++i) {
       vecWithBias.set(i + 1, vec.get(i));
     }
-
     return vecWithBias;
   }
 
@@ -520,7 +507,6 @@
     }
 
     this.setPrevWeightMatrices(weightUpdateMatrices);
-
     return weightUpdateMatrices;
   }
 
@@ -539,6 +525,8 @@
       DenseDoubleMatrix weightUpdateMatrix) {
 
     // get layer related information
+    DoubleFunction squashingFunction = this.squashingFunctionList
+        .get(curLayerIdx);
     DoubleVector curLayerOutput = outputCache.get(curLayerIdx);
     DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx);
     DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx);
@@ -548,10 +536,10 @@
       nextLayerDelta = nextLayerDelta.slice(1,
           nextLayerDelta.getDimension() - 1);
     }
-
-    // DoubleMatrix transposed = weightMatrix.transpose();
+    
     DoubleVector deltaVector = new DenseDoubleVector(
         weightMatrix.getColumnCount());
+    
     for (int row = 0; row < weightMatrix.getColumnCount(); ++row) {
       Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance(this.neuronClassList
           .get(curLayerIdx));
@@ -559,7 +547,7 @@
       n.setLearningRate(this.learningRate);
       n.setMomentumWeight(this.momentumWeight);
 
-      n.setSquashingFunction(this.squashingFunctionList.get(curLayerIdx));
+      n.setSquashingFunction(squashingFunction);
       n.setOutput(curLayerOutput.get(row));
 
       List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
@@ -567,7 +555,6 @@
       n.setWeightVector(weightMatrix.getRowCount());
 
       for (int col = 0; col < weightMatrix.getRowCount(); ++col) {
-        // sum += (transposed.get(row, col) * nextLayerDelta.get(col));
         msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
             new DoubleWritable(nextLayerDelta.get(col)), new DoubleWritable(
                 weightMatrix.get(col, row)), new DoubleWritable(
@@ -581,7 +568,7 @@
         // TODO Auto-generated catch block
         e.printStackTrace();
       }
-
+      
       // update weights
       weightUpdateMatrix.setColumn(row, n.getWeights());
       deltaVector.set(row, n.getDelta());
@@ -591,9 +578,9 @@
   }
 
   @Override
-  protected BSPJob trainInternal(HamaConfiguration hamaConf)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    this.conf = hamaConf;
+  protected BSPJob trainInternal(HamaConfiguration conf) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    this.conf = conf;
     this.fs = FileSystem.get(conf);
 
     String modelPath = conf.get("model.path");
@@ -614,11 +601,6 @@
     job.setJarByClass(LayeredNeuralNetworkTrainer.class);
     job.setBspClass(LayeredNeuralNetworkTrainer.class);
 
-    // additional for parameter server
-    // TODO at this moment, we use 1 task as a parameter server
-    // In the future, the number of parameter server should be configurable
-    job.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
-
     job.setInputPath(new Path(conf.get("training.input.path")));
     job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
     job.setInputKeyClass(LongWritable.class);
@@ -646,5 +628,5 @@
   public DoubleFunction getSquashingFunction(int idx) {
     return this.squashingFunctionList.get(idx);
   }
-
+  
 }
diff --git a/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
index 68287ad..ce6d6e4 100644
--- a/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
+++ b/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
@@ -18,8 +18,6 @@
 package org.apache.horn.core;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,9 +31,6 @@
 import org.apache.hama.commons.math.DenseDoubleMatrix;
 import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ipc.RPC;
-
-import com.google.common.base.Preconditions;
 
 /**
  * The trainer that train the {@link LayeredNeuralNetwork} based on BSP
@@ -46,39 +41,23 @@
     extends
     BSP<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> {
 
-  private static final Log LOG = LogFactory
-      .getLog(LayeredNeuralNetworkTrainer.class);
-
-  /* When given peer is master worker: base of parameter merge */
-  /* When given peer is slave worker: neural network for training */
+  private static final Log LOG = LogFactory.getLog(LayeredNeuralNetworkTrainer.class);
+  
   private LayeredNeuralNetwork inMemoryModel;
-
-  /* Job configuration */
   private HamaConfiguration conf;
-
   /* Default batch size */
   private int batchSize;
 
-  /* whether it is converging or not */
-  private AtomicBoolean isConverge;
+  /* check the interval between intervals */
+  private double prevAvgTrainingError;
+  private double curAvgTrainingError;
+  private long convergenceCheckInterval;
+  private long iterations;
+  private long maxIterations;
+  private long epoch;
+  private boolean isConverge;
 
-  /* When given peer is master worker: Asynchronous parameter merger */
-  /* When given peer is slave worker: null */
-  private RPC.Server merger;
-
-  /* When given peer is master worker: null */
-  /* When given peer is slave worker: proxy to Asynchronous parameter merger */
-  private ParameterMerger proxy;
-
-  /**
-   * Returns true if this worker is master worker.
-   *
-   * @param peer
-   * */
-  private boolean isMaster(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
-    return peer.getPeerIndex() == peer.getNumPeers() - 1;
-  }
+  private String modelPath;
 
   @Override
   /**
@@ -86,43 +65,20 @@
    */
   public void setup(
       BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
-    // At least one master & slave worker exist.
-    Preconditions.checkArgument(peer.getNumPeers() >= 2);
-    this.conf = peer.getConfiguration();
-
-    String modelPath = conf.get("model.path");
-    this.inMemoryModel = new LayeredNeuralNetwork(conf, modelPath);
-
-    this.batchSize = conf.getInt("training.batch.size", 50);
-    this.isConverge = new AtomicBoolean(false);
-
-    int slaveCount = peer.getNumPeers() - 1;
-    int mergeLimit = conf.getInt("training.max.iterations", 100000);
-    int convergenceCheckInterval = peer.getNumPeers()
-        * conf.getInt("convergence.check.interval", 2000);
-    String master = peer.getPeerName();
-    String masterAddr = master.substring(0, master.indexOf(':'));
-    int port = conf.getInt("sync.server.port", 40089);
-
-    if (isMaster(peer)) {
-      try {
-        this.merger = RPC.getServer(new ParameterMergerServer(inMemoryModel,
-            isConverge, slaveCount, mergeLimit, convergenceCheckInterval),
-            masterAddr, port, slaveCount, false, conf);
-        merger.start();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
+    if (peer.getPeerIndex() == 0) {
       LOG.info("Begin to train");
-    } else {
-      InetSocketAddress addr = new InetSocketAddress(masterAddr, port);
-      try {
-        this.proxy = (ParameterMerger) RPC.getProxy(ParameterMerger.class,
-            ParameterMerger.versionID, addr, conf);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
     }
+    this.isConverge = false;
+    this.conf = peer.getConfiguration();
+    this.iterations = 0;
+    this.epoch = 0;
+    this.modelPath = conf.get("model.path");
+    this.maxIterations = conf.getLong("training.max.iterations", 100000);
+    this.convergenceCheckInterval = conf.getLong("convergence.check.interval",
+        100);
+    this.inMemoryModel = new LayeredNeuralNetwork(conf, modelPath);
+    this.prevAvgTrainingError = Integer.MAX_VALUE;
+    this.batchSize = conf.getInt("training.batch.size", 50);
   }
 
   @Override
@@ -132,9 +88,12 @@
   public void cleanup(
       BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
     // write model to modelPath
-    if (isMaster(peer)) {
+    if (peer.getPeerIndex() == 0) {
       try {
-        LOG.info("Write model back to " + inMemoryModel.getModelPath());
+        LOG.info(String.format("End of training, number of iterations: %d.\n",
+            this.iterations));
+        LOG.info(String.format("Write model back to %s\n",
+            inMemoryModel.getModelPath()));
         this.inMemoryModel.writeModelToFile();
       } catch (IOException e) {
         e.printStackTrace();
@@ -146,19 +105,20 @@
   public void bsp(
       BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
       throws IOException, SyncException, InterruptedException {
-    while (!this.isConverge.get()) {
-      // each slave-worker calculate the matrices updates according to local
-      // data
-      // and merge them with master
-      if (!isMaster(peer)) {
-        calculateUpdates(peer);
+    while (this.iterations++ < maxIterations) {
+      // each groom calculate the matrices updates according to local data
+      calculateUpdates(peer);
+      peer.sync();
+
+      // master merge the updates model
+      if (peer.getPeerIndex() == 0) {
+        mergeUpdates(peer);
+      }
+      peer.sync();
+      if (this.isConverge) {
+        break;
       }
     }
-    peer.sync();
-    if (isMaster(peer)) {
-      merger.stop();
-    }
-    peer.sync(); // finalize the bsp program.
   }
 
   /**
@@ -170,6 +130,19 @@
   private void calculateUpdates(
       BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
       throws IOException {
+    // receive update information from master
+    if (peer.getNumCurrentMessages() != 0) {
+      ParameterMessage inMessage = peer.getCurrentMessage();
+      DoubleMatrix[] newWeights = inMessage.getCurMatrices();
+      DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
+      this.inMemoryModel.setWeightMatrices(newWeights);
+      this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
+      this.isConverge = inMessage.isConverge();
+      // check converge
+      if (isConverge) {
+        return;
+      }
+    }
 
     DoubleMatrix[] weightUpdates = new DoubleMatrix[this.inMemoryModel.weightMatrixList
         .size()];
@@ -186,6 +159,10 @@
     for (int recordsRead = 0; recordsRead < batchSize; ++recordsRead) {
       if (!peer.readNext(key, value)) {
         peer.reopenInput();
+        if (peer.getPeerIndex() == 0) {
+          epoch++;
+          LOG.info("Training loss: " + curAvgTrainingError + " at " + (epoch) + " epoch.");
+        }
         peer.readNext(key, value);
       }
       DoubleVector trainingInstance = value.getVector();
@@ -200,17 +177,78 @@
       weightUpdates[i] = weightUpdates[i].divide(batchSize);
     }
 
-    // exchange parameter update with master
-    ParameterMessage msg = new ParameterMessage(
-        avgTrainingError, false, weightUpdates,
-        this.inMemoryModel.getPrevMatricesUpdates());
+    DoubleMatrix[] prevWeightUpdates = this.inMemoryModel
+        .getPrevMatricesUpdates();
+    ParameterMessage outMessage = new ParameterMessage(
+        avgTrainingError, false, weightUpdates, prevWeightUpdates);
+    peer.send(peer.getPeerName(0), outMessage);
+  }
 
-    ParameterMessage inMessage = proxy.merge(msg);
-    DoubleMatrix[] newWeights = inMessage.getCurMatrices();
-    DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
-    this.inMemoryModel.setWeightMatrices(newWeights);
-    this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
-    this.isConverge.set(inMessage.isConverge());
+  /**
+   * Merge the updates according to the updates of the grooms.
+   * 
+   * @param peer
+   * @throws IOException
+   */
+  private void mergeUpdates(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
+      throws IOException {
+    int numMessages = peer.getNumCurrentMessages();
+    boolean isConverge = false;
+    if (numMessages == 0) { // converges
+      isConverge = true;
+      return;
+    }
+
+    double avgTrainingError = 0;
+    DoubleMatrix[] matricesUpdates = null;
+    DoubleMatrix[] prevMatricesUpdates = null;
+
+    while (peer.getNumCurrentMessages() > 0) {
+      ParameterMessage message = peer.getCurrentMessage();
+      if (matricesUpdates == null) {
+        matricesUpdates = message.getCurMatrices();
+        prevMatricesUpdates = message.getPrevMatrices();
+      } else {
+        LayeredNeuralNetwork.matricesAdd(matricesUpdates,
+            message.getCurMatrices());
+        LayeredNeuralNetwork.matricesAdd(prevMatricesUpdates,
+            message.getPrevMatrices());
+      }
+      
+      avgTrainingError += message.getTrainingError();
+    }
+
+    if (numMessages > 1) {
+      avgTrainingError /= numMessages;
+      for (int i = 0; i < matricesUpdates.length; ++i) {
+        matricesUpdates[i] = matricesUpdates[i].divide(numMessages);
+        prevMatricesUpdates[i] = prevMatricesUpdates[i].divide(numMessages);
+      }
+    }
+
+    this.inMemoryModel.updateWeightMatrices(matricesUpdates);
+    this.inMemoryModel.setPrevWeightMatrices(prevMatricesUpdates);
+    
+    // check convergence
+    if (iterations % convergenceCheckInterval == 0) {
+      if (prevAvgTrainingError < curAvgTrainingError) {
+        // error cannot decrease any more
+        isConverge = true;
+      }
+      // update
+      prevAvgTrainingError = curAvgTrainingError;
+      curAvgTrainingError = 0;
+    }
+    curAvgTrainingError += avgTrainingError / convergenceCheckInterval;
+
+    // broadcast updated weight matrices
+    for (String peerName : peer.getAllPeerNames()) {
+      ParameterMessage msg = new ParameterMessage(
+          0, isConverge, this.inMemoryModel.getWeightMatrices(),
+          this.inMemoryModel.getPrevMatricesUpdates());
+      peer.send(peerName, msg);
+    }
   }
 
 }
diff --git a/src/main/java/org/apache/horn/core/ParameterMergerServer.java b/src/main/java/org/apache/horn/core/ParameterMergerServer.java
index c76a4d0..7bd5543 100644
--- a/src/main/java/org/apache/horn/core/ParameterMergerServer.java
+++ b/src/main/java/org/apache/horn/core/ParameterMergerServer.java
@@ -89,15 +89,15 @@
     LOG.info("Start merging: " + this.mergeCount);
 
     if (!this.isConverge.get()) {
-      for (int i = 0; i < weightUpdates.length; ++i) {
-        weightUpdates[i] = weightUpdates[i].divide(this.SlaveCount);
-        prevWeightUpdates[i] = prevWeightUpdates[i].divide(this.SlaveCount);
-      }
-
       synchronized (inMemoryModel) {
-        this.inMemoryModel.updateWeightMatrices(weightUpdates);
-        this.inMemoryModel.setPrevWeightMatrices(prevWeightUpdates);
 
+        LOG.info(">>>> before: " + this.inMemoryModel.getWeightMatrices()[0].get(0, 0));
+        
+        // this.inMemoryModel.addWeights(weightUpdates);
+        // this.inMemoryModel.addPrevWeights(prevWeightUpdates);
+        
+        LOG.info(", after: " + this.inMemoryModel.getWeightMatrices()[0].get(0, 0));
+        
         // add trainingError to trainingErrors
         this.trainingErrors[this.curTrainingError++] = trainingError;
 
diff --git a/src/main/java/org/apache/horn/core/ParameterMessage.java b/src/main/java/org/apache/horn/core/ParameterMessage.java
index 3905e25..524c443 100644
--- a/src/main/java/org/apache/horn/core/ParameterMessage.java
+++ b/src/main/java/org/apache/horn/core/ParameterMessage.java
@@ -39,6 +39,8 @@
   protected boolean converge;
 
   public ParameterMessage() {
+    this.converge = false;
+    this.trainingError = 0.0d;
   }
 
   public ParameterMessage(double trainingError, boolean converge,
@@ -53,15 +55,19 @@
   public void readFields(DataInput input) throws IOException {
     trainingError = input.readDouble();
     converge = input.readBoolean();
-    int numMatrices = input.readInt();
-    boolean hasPrevMatrices = input.readBoolean();
-    curMatrices = new DenseDoubleMatrix[numMatrices];
-    // read matrice updates
-    for (int i = 0; i < curMatrices.length; ++i) {
-      curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
+    boolean hasCurMatrices = input.readBoolean();
+    if(hasCurMatrices) {
+      int numMatrices = input.readInt();
+      curMatrices = new DenseDoubleMatrix[numMatrices];
+      // read matrice updates
+      for (int i = 0; i < curMatrices.length; ++i) {
+        curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
+      }
     }
-
+    
+    boolean hasPrevMatrices = input.readBoolean();
     if (hasPrevMatrices) {
+      int numMatrices = input.readInt();
       prevMatrices = new DenseDoubleMatrix[numMatrices];
       // read previous matrices updates
       for (int i = 0; i < prevMatrices.length; ++i) {
@@ -74,16 +80,21 @@
   public void write(DataOutput output) throws IOException {
     output.writeDouble(trainingError);
     output.writeBoolean(converge);
-    output.writeInt(curMatrices.length);
+    if (curMatrices == null) {
+      output.writeBoolean(false);
+    } else {
+      output.writeBoolean(true);
+      output.writeInt(curMatrices.length);
+      for (DoubleMatrix matrix : curMatrices) {
+        MatrixWritable.write(matrix, output);
+      }
+    }
+    
     if (prevMatrices == null) {
       output.writeBoolean(false);
     } else {
       output.writeBoolean(true);
-    }
-    for (DoubleMatrix matrix : curMatrices) {
-      MatrixWritable.write(matrix, output);
-    }
-    if (prevMatrices != null) {
+      output.writeInt(prevMatrices.length);
       for (DoubleMatrix matrix : prevMatrices) {
         MatrixWritable.write(matrix, output);
       }
diff --git a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
index c24fa16..4c0df95 100644
--- a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
+++ b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
@@ -21,6 +21,7 @@
 
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hama.HamaConfiguration;
+import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.core.HornJob;
 import org.apache.horn.core.Neuron;
 import org.apache.horn.core.Synapse;
@@ -48,40 +49,43 @@
     public void backward(
         Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
         throws IOException {
+      double gradient = 0;
       for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
         // Calculates error gradient for each neuron
-        double gradient = this.squashingFunction.applyDerivative(this
-            .getOutput()) * (m.getDelta() * m.getWeight());
-        this.backpropagate(gradient);
+        gradient += (m.getDelta() * m.getWeight());
 
         // Weight corrections
         double weight = -this.getLearningRate() * this.getOutput()
             * m.getDelta() + this.getMomentumWeight() * m.getPrevWeight();
         this.push(weight);
       }
+
+      this.backpropagate(gradient
+          * this.squashingFunction.applyDerivative(this.getOutput()));
     }
   }
 
   public static HornJob createJob(HamaConfiguration conf, String modelPath,
       String inputPath, double learningRate, double momemtumWeight,
-      double regularizationWeight, int features, int labels, int maxIteration,
-      int numOfTasks) throws IOException {
+      double regularizationWeight, int features, int hu, int labels,
+      int miniBatch, int maxIteration) throws IOException {
 
     HornJob job = new HornJob(conf, MultiLayerPerceptron.class);
     job.setTrainingSetPath(inputPath);
     job.setModelPath(modelPath);
 
-    job.setNumBspTask(numOfTasks);
     job.setMaxIteration(maxIteration);
     job.setLearningRate(learningRate);
     job.setMomentumWeight(momemtumWeight);
     job.setRegularizationWeight(regularizationWeight);
-
-    job.setConvergenceCheckInterval(1000);
-    job.setBatchSize(300);
+    
+    job.setConvergenceCheckInterval(600);
+    job.setBatchSize(miniBatch);
+    
+    job.setTrainingMethod(TrainingMethod.GRADIENT_DESCENT);
 
     job.inputLayer(features, Sigmoid.class, StandardNeuron.class);
-    job.addLayer(15, Sigmoid.class, StandardNeuron.class);
+    job.addLayer(hu, Sigmoid.class, StandardNeuron.class);
     job.outputLayer(labels, Sigmoid.class, StandardNeuron.class);
 
     job.setCostFunction(CrossEntropy.class);
@@ -92,18 +96,18 @@
   public static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException {
     if (args.length < 9) {
-      System.out
-          .println("Usage: <MODEL_PATH> <INPUT_PATH> "
-              + "<LEARNING_RATE> <MOMEMTUM_WEIGHT> <REGULARIZATION_WEIGHT> "
-              + "<FEATURE_DIMENSION> <LABEL_DIMENSION> <MAX_ITERATION> <NUM_TASKS>");
-      System.exit(1);
+      System.out.println("Usage: <MODEL_PATH> <INPUT_PATH> "
+          + "<LEARNING_RATE> <MOMEMTUM_WEIGHT> <REGULARIZATION_WEIGHT> "
+          + "<FEATURE_DIMENSION> <HIDDEN_UNITS> <LABEL_DIMENSION> "
+          + "<BATCH_SIZE> <MAX_ITERATION>");
+      System.exit(-1);
     }
 
     HornJob ann = createJob(new HamaConfiguration(), args[0], args[1],
         Double.parseDouble(args[2]), Double.parseDouble(args[3]),
         Double.parseDouble(args[4]), Integer.parseInt(args[5]),
         Integer.parseInt(args[6]), Integer.parseInt(args[7]),
-        Integer.parseInt(args[8]));
+        Integer.parseInt(args[8]), Integer.parseInt(args[9]));
 
     long startTime = System.currentTimeMillis();
     if (ann.waitForCompletion(true)) {
diff --git a/src/main/java/org/apache/horn/funcs/CrossEntropy.java b/src/main/java/org/apache/horn/funcs/CrossEntropy.java
index 567db29..7cc5e6a 100644
--- a/src/main/java/org/apache/horn/funcs/CrossEntropy.java
+++ b/src/main/java/org/apache/horn/funcs/CrossEntropy.java
@@ -32,27 +32,21 @@
   @Override
   public double apply(double target, double actual) {
     double adjustedTarget = (target == 0 ? 0.000001 : target);
-    adjustedTarget = (target == 1.0 ? 0.999999 : target);
+    adjustedTarget = (target == 1.0 ? 0.999999 : adjustedTarget);
     double adjustedActual = (actual == 0 ? 0.000001 : actual);
-    adjustedActual = (actual == 1 ? 0.999999 : actual);
+    adjustedActual = (actual == 1 ? 0.999999 : adjustedActual);
+    
     return -adjustedTarget * Math.log(adjustedActual) - (1 - adjustedTarget)
         * Math.log(1 - adjustedActual);
   }
 
   @Override
   public double applyDerivative(double target, double actual) {
-    double adjustedTarget = target;
-    double adjustedActual = actual;
-    if (adjustedActual == 1) {
-      adjustedActual = 0.999;
-    } else if (actual == 0) {
-      adjustedActual = 0.001;
-    }
-    if (adjustedTarget == 1) {
-      adjustedTarget = 0.999;
-    } else if (adjustedTarget == 0) {
-      adjustedTarget = 0.001;
-    }
+    double adjustedTarget = (target == 0 ? 0.000001 : target);
+    adjustedTarget = (target == 1.0 ? 0.999999 : adjustedTarget);
+    double adjustedActual = (actual == 0 ? 0.000001 : actual);
+    adjustedActual = (actual == 1 ? 0.999999 : adjustedActual);
+    
     return -adjustedTarget / adjustedActual + (1 - adjustedTarget)
         / (1 - adjustedActual);
   }
diff --git a/src/main/java/org/apache/horn/utils/MNISTConverter.java b/src/main/java/org/apache/horn/utils/MNISTConverter.java
index 3a2f8b8..6bbe891 100644
--- a/src/main/java/org/apache/horn/utils/MNISTConverter.java
+++ b/src/main/java/org/apache/horn/utils/MNISTConverter.java
@@ -33,9 +33,13 @@
 
   private static int PIXELS = 28 * 28;
 
+  private static double rescale(double x) {
+    return 1 - (255 - x) / 255;
+  }
+
   public static void main(String[] args) throws Exception {
     if (args.length < 3) {
-      System.out.println("Usage: TRAINING_DATA LABELS_DATA OUTPUT_PATH");
+      System.out.println("Usage: <TRAINING_DATA> <LABELS_DATA> <OUTPUT_PATH>");
       System.out
           .println("ex) train-images.idx3-ubyte train-labels.idx1-ubyte /tmp/mnist.seq");
       System.exit(1);
@@ -76,14 +80,15 @@
     for (int i = 0; i < count; i++) {
       double[] vals = new double[PIXELS + 10];
       for (int j = 0; j < PIXELS; j++) {
-        vals[j] = (images[i][j] & 0xff);
+        vals[j] = rescale((images[i][j] & 0xff));
       }
       int label = (labels[i] & 0xff);
+      // embedding to one-hot vector
       for (int j = 0; j < 10; j++) {
         if (j == label)
-          vals[PIXELS + j] = 1;
+          vals[PIXELS + j] = 1.0;
         else
-          vals[PIXELS + j] = 0;
+          vals[PIXELS + j] = 0.0;
       }
 
       writer.append(new LongWritable(), new VectorWritable(
@@ -94,4 +99,5 @@
     labelsIn.close();
     writer.close();
   }
+
 }
diff --git a/src/main/java/org/apache/horn/utils/MNISTEvaluator.java b/src/main/java/org/apache/horn/utils/MNISTEvaluator.java
new file mode 100644
index 0000000..a5b68e0
--- /dev/null
+++ b/src/main/java/org/apache/horn/utils/MNISTEvaluator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.utils;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.horn.core.LayeredNeuralNetwork;
+
+public class MNISTEvaluator {
+
+  private static int PIXELS = 28 * 28;
+  
+  private static double rescale(double x) {
+    return 1 - (255 - x) / 255;
+  }
+  
+  public static void main(String[] args) throws IOException {
+    if (args.length < 3) {
+      System.out.println("Usage: <TRAINED_MODEL> <TEST_IMAGES> <TEST_LABELS>");
+      System.out
+          .println("ex) /tmp/model t10k-images.idx3-ubyte t10k-labels.idx1-ubyte");
+      System.exit(1);
+    }
+
+    String modelPath = args[0];
+    String training_data = args[1];
+    String labels_data = args[2];
+
+    DataInputStream imagesIn = new DataInputStream(new FileInputStream(
+        new File(training_data)));
+    DataInputStream labelsIn = new DataInputStream(new FileInputStream(
+        new File(labels_data)));
+    
+    imagesIn.readInt(); // Magic number
+    int count = imagesIn.readInt();
+    labelsIn.readInt(); // Magic number
+    labelsIn.readInt(); // Count
+    imagesIn.readInt(); // Rows
+    imagesIn.readInt(); // Cols
+    
+    System.out.println("Evaluating " + count + " images");
+
+    byte[][] images = new byte[count][PIXELS];
+    byte[] labels = new byte[count];
+    for (int n = 0; n < count; n++) {
+      imagesIn.readFully(images[n]);
+      labels[n] = labelsIn.readByte();
+    }
+
+    HamaConfiguration conf = new HamaConfiguration();
+    LayeredNeuralNetwork ann = new LayeredNeuralNetwork(conf, modelPath);
+    
+    int correct = 0;
+    for (int i = 0; i < count; i++) {
+      double[] vals = new double[PIXELS];
+      for (int j = 0; j < PIXELS; j++) {
+        vals[j] = rescale((images[i][j] & 0xff));
+      }
+      int label = (labels[i] & 0xff);
+
+      DoubleVector instance = new DenseDoubleVector(vals);
+      DoubleVector result = ann.getOutput(instance);
+      
+      if(getNumber(result) == label) {
+        correct++;
+      }
+    }
+
+    System.out.println((double) correct / count);
+    // TODO System.out.println("Precision = " + (tp / (tp + fp)));
+    //System.out.println("Recall = " + (tp / (tp + fn)));
+    //System.out.println("Accuracy = " + ((tp + tn) / (tp + tn + fp + fn)));
+    
+    imagesIn.close();
+    labelsIn.close();
+  }
+
+  private static int getNumber(DoubleVector result) {
+    double max = 0;
+    int index = -1;
+    for(int x = 0; x < result.getLength(); x++) {
+      double curr = result.get(x);
+      if(max < curr) {
+        max = curr;
+        index = x;
+      }
+    }
+    return index;
+  }
+}
diff --git a/src/test/java/org/apache/horn/core/TestNeuron.java b/src/test/java/org/apache/horn/core/TestNeuron.java
index 5f2bb59..0e4ba8e 100644
--- a/src/test/java/org/apache/horn/core/TestNeuron.java
+++ b/src/test/java/org/apache/horn/core/TestNeuron.java
@@ -24,6 +24,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.horn.funcs.CrossEntropy;
 import org.apache.horn.funcs.Sigmoid;
 
 public class TestNeuron extends TestCase {
@@ -43,9 +44,10 @@
         sum += m.getInput() * m.getWeight();
       }
       sum += (bias * theta);
+      System.out.println(new CrossEntropy().apply(0.000001, 1.0));
       this.feedforward(new Sigmoid().apply(sum));
     }
-
+    
     @Override
     public void backward(
         Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
diff --git a/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
index 7e0cac9..a6914ef 100644
--- a/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
+++ b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
@@ -46,8 +46,8 @@
 import org.apache.hama.commons.math.DoubleVector;
 import org.apache.hama.ml.util.DefaultFeatureTransformer;
 import org.apache.hama.ml.util.FeatureTransformer;
-import org.apache.horn.core.AbstractLayeredNeuralNetwork.LearningStyle;
-import org.apache.horn.core.AbstractLayeredNeuralNetwork.TrainingMethod;
+import org.apache.horn.core.Constants.LearningStyle;
+import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.funcs.FunctionFactory;
 import org.junit.Test;
 import org.mortbay.log.Log;
diff --git a/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java b/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
index 80a08f2..bb404d6 100644
--- a/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
+++ b/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
@@ -23,12 +23,15 @@
 import java.io.InputStreamReader;
 import java.net.URI;
 
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hama.Constants;
-import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.commons.io.VectorWritable;
 import org.apache.hama.commons.math.DenseDoubleVector;
@@ -39,7 +42,8 @@
 /**
  * Test the functionality of NeuralNetwork Example.
  */
-public class MultiLayerPerceptronTest extends HamaCluster {
+public class MultiLayerPerceptronTest extends TestCase { // HamaCluster {
+  private static final Log LOG = LogFactory.getLog(MultiLayerPerceptronTest.class);
   private HamaConfiguration conf;
   private FileSystem fs;
   private String MODEL_PATH = "/tmp/neuralnets.model";
@@ -47,10 +51,10 @@
   private String SEQTRAIN_DATA = "/tmp/test-neuralnets.data";
 
   public MultiLayerPerceptronTest() {
-    conf = new HamaConfiguration();
+    conf = new HamaConfiguration();/*
     conf.set("bsp.master.address", "localhost");
     conf.setBoolean("hama.child.redirect.log.console", true);
-    conf.setBoolean("hama.messenger.runtime.compression", true);
+    conf.setBoolean("hama.messenger.runtime.compression", false);
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         conf.get("bsp.master.address"));
     conf.set("bsp.local.dir", "/tmp/hama-test");
@@ -58,7 +62,7 @@
     conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
     conf.set("hama.sync.client.class",
         org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
-            .getCanonicalName());
+            .getCanonicalName());*/
   }
 
   @Override
@@ -77,8 +81,7 @@
 
     String featureDataPath = "src/test/resources/neuralnets_classification_test.txt";
     try {
-      LayeredNeuralNetwork ann = new LayeredNeuralNetwork(conf,
-          MODEL_PATH);
+      LayeredNeuralNetwork ann = new LayeredNeuralNetwork(conf, MODEL_PATH);
 
       // process data in streaming approach
       FileSystem fs = FileSystem.get(new URI(featureDataPath), conf);
@@ -126,7 +129,7 @@
     } finally {
       fs.delete(new Path(RESULT_PATH), true);
       fs.delete(new Path(MODEL_PATH), true);
-      //fs.delete(new Path(SEQTRAIN_DATA), true);
+      fs.delete(new Path(SEQTRAIN_DATA), true);
     }
   }
 
@@ -161,8 +164,8 @@
 
     try {
       HornJob ann = MultiLayerPerceptron.createJob(conf, MODEL_PATH,
-          SEQTRAIN_DATA, 0.4, 0.2, 0.01, featureDimension, labelDimension,
-          1000, 2);
+          SEQTRAIN_DATA, 0.4, 0.2, 0.01, featureDimension, 8, labelDimension,
+          300, 10000);
 
       long startTime = System.currentTimeMillis();
       if (ann.waitForCompletion(true)) {