HORN-12: Add neuron-centric model based MLP example
diff --git a/bin/horn b/bin/horn
new file mode 100755
index 0000000..539e186
--- /dev/null
+++ b/bin/horn
@@ -0,0 +1,182 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# The Horn command script
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/horn-config.sh
+
+cygwin=false
+case "`uname`" in
+CYGWIN*) cygwin=true;;
+esac
+
+# if no args specified, show usage
+if [ $# = 0 ]; then
+  echo "Usage: horn [--config confdir] COMMAND"
+  echo "where COMMAND is one of:"
+  echo "  jar <jar>            run a jar file"
+  echo " or"
+  echo "  CLASSNAME            run the class named CLASSNAME"
+  echo "Most commands print help when invoked w/o parameters."
+  exit 1
+fi
+
+# get arguments
+COMMAND=$1
+shift
+
+if [ -f "${HORN_CONF_DIR}/horn-env.sh" ]; then
+  . "${HORN_CONF_DIR}/horn-env.sh"
+fi
+
+# some Java parameters
+if [ "$JAVA_HOME" != "" ]; then
+  #echo "run java in $JAVA_HOME"
+  JAVA_HOME=$JAVA_HOME
+fi
+  
+if [ "$JAVA_HOME" = "" ]; then
+  echo "Error: JAVA_HOME is not set."
+  exit 1
+fi
+
+JAVA=$JAVA_HOME/bin/java
+JAVA_HEAP_MAX=-Xmx512m
+
+# check envvars which might override default args
+if [ "$HORN_HEAPSIZE" != "" ]; then
+  #echo "run with heapsize $HORN_HEAPSIZE"
+  JAVA_HEAP_MAX="-Xmx""$HORN_HEAPSIZE""m"
+  #echo $JAVA_HEAP_MAX
+fi
+
+# CLASSPATH initially contains $HORN_CONF_DIR
+CLASSPATH="${HORN_CONF_DIR}"
+CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar
+
+# for developers, add Horn classes to CLASSPATH
+if [ -d "$HORN_HOME/core/target/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HORN_HOME/core/target/classes
+fi
+if [ -d "$HORN_HOME/core/target/test-classes/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HORN_HOME/core/target/test-classes
+fi
+
+# for developers, add Commons classes to CLASSPATH
+if [ -d "$HORN_HOME/commons/target/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HORN_HOME/commons/target/classes
+fi
+if [ -d "$HORN_HOME/commons/target/test-classes/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HORN_HOME/commons/target/test-classes
+fi
+
+# for developers, add Graph classes to CLASSPATH
+if [ -d "$HORN_HOME/graph/target/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HORN_HOME/graph/target/classes
+fi
+if [ -d "$HORN_HOME/graph/target/test-classes/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HORN_HOME/graph/target/test-classes
+fi
+
+# for developers, add ML classes to CLASSPATH
+if [ -d "$HORN_HOME/ml/target/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HORN_HOME/ml/target/classes
+fi
+if [ -d "$HORN_HOME/ml/target/test-classes/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HORN_HOME/ml/target/test-classes
+fi
+
+# add mesos classes to CLASSPATH
+if [ -d "$HORN_HOME/mesos/target/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HORN_HOME/mesos/target/classes
+fi
+if [ -d "$HORN_HOME/mesos/target/test-classes/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HORN_HOME/mesos/target/test-classes
+fi
+
+# so that filenames w/ spaces are handled correctly in loops below
+IFS=
+
+# for releases, add core hama jar to CLASSPATH
+for f in $HORN_HOME/hama-**.jar; do
+  CLASSPATH=${CLASSPATH}:$f;
+done
+
+# add libs to CLASSPATH
+for f in $HORN_HOME/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}:$f;
+done
+
+# add user-specified CLASSPATH last
+if [ "$HORN_CLASSPATH" != "" ]; then
+  CLASSPATH=${CLASSPATH}:${HORN_CLASSPATH}
+fi
+
+# default log directory & file
+if [ "$HORN_LOG_DIR" = "" ]; then
+  HORN_LOG_DIR="$HORN_HOME/logs"
+fi
+if [ "$HAMA_LOGFILE" = "" ]; then
+  HAMA_LOGFILE='hama.log'
+fi
+
+# default policy file for service-level authorization
+if [ "$HAMA_POLICYFILE" = "" ]; then
+  HAMA_POLICYFILE="hama-policy.xml"
+fi
+
+# restore ordinary behaviour
+unset IFS
+
+# figure out which class to run
+if [ "$COMMAND" = "jar" ] ; then
+  CLASS=org.apache.hama.util.RunJar
+  BSP_OPTS="$BSP_OPTS"
+else
+  CLASS=$COMMAND
+fi
+
+# cygwin path translation
+if $cygwin; then
+  CLASSPATH=`cygpath -p -w "$CLASSPATH"`
+  HORN_HOME=`cygpath -w "$HORN_HOME"`
+  HORN_LOG_DIR=`cygpath -w "$HORN_LOG_DIR"`
+  TOOL_PATH=`cygpath -p -w "$TOOL_PATH"`
+fi
+
+# cygwin path translation
+if $cygwin; then
+  JAVA_LIBRARY_PATH=`cygpath -p "$JAVA_LIBRARY_PATH"`
+fi
+
+HAMA_OPTS="$HAMA_OPTS -Dhama.log.dir=$HORN_LOG_DIR"
+HAMA_OPTS="$HAMA_OPTS -Dhama.log.file=$HAMA_LOGFILE"
+HAMA_OPTS="$HAMA_OPTS -Dhama.home.dir=$HORN_HOME"
+HAMA_OPTS="$HAMA_OPTS -Dhama.id.str=$HAMA_IDENT_STRING"
+HAMA_OPTS="$HAMA_OPTS -Dhama.root.logger=${HAMA_ROOT_LOGGER:-INFO,console}"
+
+if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
+  HAMA_OPTS="$HAMA_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
+fi
+HAMA_OPTS="$HAMA_OPTS -Dhama.policy.file=$HAMA_POLICYFILE"
+
+# run it
+exec "$JAVA" $JAVA_HEAP_MAX $HAMA_OPTS -classpath "$CLASSPATH" $CLASS "$@"
+
diff --git a/bin/horn-config.sh b/bin/horn-config.sh
new file mode 100644
index 0000000..ca79eaa
--- /dev/null
+++ b/bin/horn-config.sh
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# included in all the hama scripts with source command
+# should not be executable directly
+# also should not be passed any arguments, since we need original $*
+
+# resolve links - $0 may be a softlink
+
+this="$0"
+while [ -h "$this" ]; do
+  ls=`ls -ld "$this"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '.*/.*' > /dev/null; then
+    this="$link"
+  else
+    this=`dirname "$this"`/"$link"
+  fi
+done
+
+# convert relative path to absolute path
+bin=`dirname "$this"`
+script=`basename "$this"`
+bin=`cd "$bin"; pwd`
+this="$bin/$script"
+
+# the root of the Horn installation
+export HORN_HOME=`dirname "$this"`/..
+
+#check to see if the conf dir is given as an optional argument
+if [ $# -gt 1 ]
+then
+    if [ "--config" = "$1" ]
+	  then
+	      shift
+	      confdir=$1
+	      shift
+	      HORN_CONF_DIR=$confdir
+    fi
+fi
+ 
+# Allow alternate conf dir location.
+HORN_CONF_DIR="${HORN_CONF_DIR:-$HORN_HOME/conf}"
+
+# Source the horn-env.sh.  
+# Will have JAVA_HOME defined.
+if [ -f "${HORN_CONF_DIR}/horn-env.sh" ]; then
+  . "${HORN_CONF_DIR}/horn-env.sh"
+fi
+
diff --git a/conf/horn-env.sh b/conf/horn-env.sh
new file mode 100644
index 0000000..9f8d9c2
--- /dev/null
+++ b/conf/horn-env.sh
@@ -0,0 +1,26 @@
+#
+#/**
+# * Copyright 2007 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+# Set environment variables here.
+
+# The java implementation to use.  Required.
+export JAVA_HOME=/usr/lib/jvm/java-8-oracle
+
diff --git a/pom.xml b/pom.xml
index bc9d602..9480f9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,14 +31,81 @@
   <packaging>jar</packaging>
 
   <properties>
+    <commons-logging.version>1.1.1</commons-logging.version>
     <hama.version>0.7.0</hama.version>
     <hadoop.version>2.7.0</hadoop.version>
     <protobuf.version>2.5.0</protobuf.version>
     <junit.version>4.8.1</junit.version>
+    <log4j.version>1.2.16</log4j.version>
+    <slf4j.version>1.5.8</slf4j.version>
+    <commons-cli.version>1.2</commons-cli.version>
+    <commons-configuration.version>1.7</commons-configuration.version>
+    <commons-lang.version>2.6</commons-lang.version>
+    <commons-httpclient.version>3.0.1</commons-httpclient.version>
+    <commons-io.version>2.4</commons-io.version>
+    <commons-collections.version>3.2.1</commons-collections.version>
   </properties>
 
   <dependencies>
     <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <version>${commons-logging.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>${commons-io.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+      <version>${commons-cli.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-configuration</groupId>
+      <artifactId>commons-configuration</artifactId>
+      <version>${commons-configuration.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>${commons-lang.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-httpclient</groupId>
+      <artifactId>commons-httpclient</artifactId>
+      <version>${commons-httpclient.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+      <version>${commons-collections.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>13.0.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j.version}</version>
+    </dependency>
+    
+  
+    <dependency>
       <groupId>org.apache.hama</groupId>
       <artifactId>hama-commons</artifactId>
       <version>${hama.version}</version>
@@ -119,6 +186,43 @@
         </configuration>
       </plugin>
 
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.basedir}/lib</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>true</overWriteSnapshots>
+
+              <excludeGroupIds>org.apache.horn</excludeGroupIds>
+
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-log4j12</artifactId>
+                  <outputDirectory>${project.basedir}/lib</outputDirectory>
+                </artifactItem>
+                <artifactItem>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-api</artifactId>
+                  <outputDirectory>${project.basedir}/lib</outputDirectory>
+                </artifactItem>
+              </artifactItems>
+              <excludeTransitive>true</excludeTransitive>
+
+              <fileMode>755</fileMode>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      
     </plugins>
   </build>
 </project>
diff --git a/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
index b0d6ec5..1afe8f5 100644
--- a/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
+++ b/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
@@ -45,14 +45,10 @@
  */
 abstract class AbstractLayeredNeuralNetwork extends NeuralNetwork {
 
-  private static final double DEFAULT_REGULARIZATION_WEIGHT = 0;
   private static final double DEFAULT_MOMENTUM_WEIGHT = 0.1;
 
   double trainingError;
 
-  /* The weight of regularization */
-  protected double regularizationWeight;
-
   /* The momentumWeight */
   protected double momentumWeight;
 
@@ -76,7 +72,6 @@
   }
   
   public AbstractLayeredNeuralNetwork() {
-    this.regularizationWeight = DEFAULT_REGULARIZATION_WEIGHT;
     this.momentumWeight = DEFAULT_MOMENTUM_WEIGHT;
     this.trainingMethod = TrainingMethod.GRADIENT_DESCENT;
     this.learningStyle = LearningStyle.SUPERVISED;
@@ -86,38 +81,6 @@
     super(conf, modelPath);
   }
 
-  /**
-   * Set the regularization weight. Recommend in the range [0, 0.1), More
-   * complex the model is, less weight the regularization is.
-   * 
-   * @param regularizationWeight
-   */
-  public void setRegularizationWeight(double regularizationWeight) {
-    Preconditions.checkArgument(regularizationWeight >= 0
-        && regularizationWeight < 1.0,
-        "Regularization weight must be in range [0, 1.0)");
-    this.regularizationWeight = regularizationWeight;
-  }
-
-  public double getRegularizationWeight() {
-    return this.regularizationWeight;
-  }
-
-  /**
-   * Set the momemtum weight for the model. Recommend in range [0, 0.5].
-   * 
-   * @param momentumWeight
-   */
-  public void setMomemtumWeight(double momentumWeight) {
-    Preconditions.checkArgument(momentumWeight >= 0 && momentumWeight <= 1.0,
-        "Momentum weight must be in range [0, 1.0]");
-    this.momentumWeight = momentumWeight;
-  }
-
-  public double getMomemtumWeight() {
-    return this.momentumWeight;
-  }
-
   public void setTrainingMethod(TrainingMethod method) {
     this.trainingMethod = method;
   }
@@ -218,8 +181,6 @@
   @Override
   public void readFields(DataInput input) throws IOException {
     super.readFields(input);
-    // read regularization weight
-    this.regularizationWeight = input.readDouble();
     // read momentum weight
     this.momentumWeight = input.readDouble();
 
@@ -241,8 +202,6 @@
   @Override
   public void write(DataOutput output) throws IOException {
     super.write(output);
-    // write regularization weight
-    output.writeDouble(this.regularizationWeight);
     // write momentum weight
     output.writeDouble(this.momentumWeight);
 
diff --git a/src/main/java/org/apache/horn/bsp/AutoEncoder.java b/src/main/java/org/apache/horn/bsp/AutoEncoder.java
index 135d63a..a632942 100644
--- a/src/main/java/org/apache/horn/bsp/AutoEncoder.java
+++ b/src/main/java/org/apache/horn/bsp/AutoEncoder.java
@@ -17,10 +17,12 @@
  */
 package org.apache.horn.bsp;
 
+import java.io.IOException;
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.commons.math.DenseDoubleVector;
 import org.apache.hama.commons.math.DoubleFunction;
 import org.apache.hama.commons.math.DoubleMatrix;
@@ -66,21 +68,6 @@
     model = new SmallLayeredNeuralNetwork(conf, modelPath);
   }
 
-  public AutoEncoder setLearningRate(double learningRate) {
-    model.setLearningRate(learningRate);
-    return this;
-  }
-
-  public AutoEncoder setMomemtumWeight(double momentumWeight) {
-    model.setMomemtumWeight(momentumWeight);
-    return this;
-  }
-
-  public AutoEncoder setRegularizationWeight(double regularizationWeight) {
-    model.setRegularizationWeight(regularizationWeight);
-    return this;
-  }
-
   public AutoEncoder setModelPath(String modelPath) {
     model.setModelPath(modelPath);
     return this;
@@ -92,10 +79,13 @@
    * 
    * @param dataInputPath
    * @param trainingParams
+   * @throws InterruptedException 
+   * @throws IOException 
+   * @throws ClassNotFoundException 
    */
-  public void train(HamaConfiguration conf, Path dataInputPath,
-      Map<String, String> trainingParams) {
-    model.train(conf, dataInputPath, trainingParams);
+  public BSPJob train(HamaConfiguration conf, Path dataInputPath,
+      Map<String, String> trainingParams) throws ClassNotFoundException, IOException, InterruptedException {
+    return model.train(conf);
   }
 
   /**
diff --git a/src/main/java/org/apache/horn/bsp/HornJob.java b/src/main/java/org/apache/horn/bsp/HornJob.java
index bc79f54..9f27889 100644
--- a/src/main/java/org/apache/horn/bsp/HornJob.java
+++ b/src/main/java/org/apache/horn/bsp/HornJob.java
@@ -22,36 +22,84 @@
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.commons.math.Function;
-import org.apache.horn.trainer.Neuron;
-import org.apache.horn.trainer.Trainer;
+import org.apache.hama.commons.math.FunctionFactory;
 
 public class HornJob extends BSPJob {
 
+  SmallLayeredNeuralNetwork neuralNetwork;
+
   public HornJob(HamaConfiguration conf, Class<?> exampleClass)
       throws IOException {
     super(conf);
-    this.setBspClass(Trainer.class);
     this.setJarByClass(exampleClass);
+
+    neuralNetwork = new SmallLayeredNeuralNetwork();
+  }
+
+  public void addLayer(int featureDimension, Class<? extends Function> func) {
+    neuralNetwork.addLayer(featureDimension, false,
+        FunctionFactory.createDoubleFunction(func.getSimpleName()));
+  }
+
+  public void finalLayer(int labels, Class<? extends Function> func) {
+    neuralNetwork.addLayer(labels, true,
+        FunctionFactory.createDoubleFunction(func.getSimpleName()));
+  }
+
+  public void setCostFunction(Class<? extends Function> func) {
+    neuralNetwork.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction(func.getSimpleName()));
   }
 
   public void setDouble(String name, double value) {
     conf.setDouble(name, value);
   }
 
-  @SuppressWarnings("rawtypes")
-  public void addLayer(int i, Class<? extends Neuron> class1,
-      Class<? extends Function> class2) {
-    // TODO Auto-generated method stub
-
+  public void setMaxIteration(int maxIteration) {
+    this.conf.setInt("training.max.iterations", maxIteration);
   }
 
-  public void setCostFunction(Class<? extends Function> class1) {
-    // TODO Auto-generated method stub
-
+  public void setBatchSize(int batchSize) {
+    this.conf.setInt("training.batch.size", batchSize);
   }
 
-  public void setMaxIteration(int n) {
-    this.conf.setInt("horn.max.iteration", n);
+  public void setLearningRate(double learningRate) {
+    this.conf.setDouble("mlp.learning.rate", learningRate);
+  }
+
+  public void setConvergenceCheckInterval(int n) {
+    this.conf.setInt("convergence.check.interval", n);
+  }
+
+  public void setMomentumWeight(double momentumWeight) {
+    this.conf.setDouble("mlp.momentum.weight", momentumWeight);
+  }
+
+  public SmallLayeredNeuralNetwork getNeuralNetwork() {
+    return neuralNetwork;
+  }
+
+  public boolean waitForCompletion(boolean verbose) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    BSPJob job = neuralNetwork.train(this.conf);
+    if (verbose) {
+      return job.waitForCompletion(true);
+    } else {
+      return job.waitForCompletion(false);
+    }
+  }
+
+  public void setRegularizationWeight(double regularizationWeight) {
+    this.conf.setDouble("regularization.weight", regularizationWeight);
+  }
+
+  public void setModelPath(String modelPath) {
+    this.conf.set("model.path", modelPath);
+    neuralNetwork.setModelPath(modelPath);
+  }
+
+  public void setTrainingSetPath(String inputPath) {
+    this.conf.set("training.input.path", inputPath);
   }
 
 }
diff --git a/src/main/java/org/apache/horn/bsp/NeuralNetwork.java b/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
index 5afe1d3..051881d 100644
--- a/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
+++ b/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
@@ -22,9 +22,9 @@
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.Map;
 
 import org.apache.commons.lang.SerializationUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,6 +32,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.ml.util.DefaultFeatureTransformer;
 import org.apache.hama.ml.util.FeatureTransformer;
 
@@ -68,6 +69,7 @@
   }
 
   public NeuralNetwork(String modelPath) {
+    this.modelPath = modelPath;
   }
 
   public NeuralNetwork(HamaConfiguration conf, String modelPath) {
@@ -83,23 +85,6 @@
 
   }
 
-  /**
-   * Set the degree of aggression during model training, a large learning rate
-   * can increase the training speed, but it also decrease the chance of model
-   * converge. Recommend in range (0, 0.3).
-   * 
-   * @param learningRate
-   */
-  public void setLearningRate(double learningRate) {
-    Preconditions.checkArgument(learningRate > 0,
-        "Learning rate must be larger than 0.");
-    this.learningRate = learningRate;
-  }
-
-  public double getLearningRate() {
-    return this.learningRate;
-  }
-
   public void isLearningRateDecay(boolean decay) {
     this.learningRateDecay = decay;
   }
@@ -113,33 +98,22 @@
    * 
    * @param dataInputPath The path of the training data.
    * @param trainingParams The parameters for training.
+   * @throws InterruptedException 
+   * @throws ClassNotFoundException 
    * @throws IOException
    */
-  public void train(HamaConfiguration hamaConf, Path dataInputPath, Map<String, String> trainingParams) {
+  public BSPJob train(Configuration conf) throws ClassNotFoundException, IOException, InterruptedException {
     Preconditions.checkArgument(this.modelPath != null,
         "Please set the model path before training.");
+
     // train with BSP job
-    try {
-      trainInternal(hamaConf, dataInputPath, trainingParams);
-      // write the trained model back to model path
-      this.readFromModel();
-    } catch (IOException e) {
-      e.printStackTrace();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    } catch (ClassNotFoundException e) {
-      e.printStackTrace();
-    }
+    return trainInternal((HamaConfiguration) conf);
   }
 
   /**
    * Train the model with the path of given training data and parameters.
-   * 
-   * @param dataInputPath
-   * @param trainingParams
    */
-  protected abstract void trainInternal(HamaConfiguration hamaConf,
-      Path dataInputPath, Map<String, String> trainingParams)
+  protected abstract BSPJob trainInternal(HamaConfiguration hamaConf)
       throws IOException, InterruptedException, ClassNotFoundException;
 
   /**
@@ -163,7 +137,7 @@
   public void writeModelToFile() throws IOException {
     Preconditions.checkArgument(this.modelPath != null,
         "Model path has not been set.");
-    
+
     FSDataOutputStream is = fs.create(new Path(this.modelPath), true);
     this.write(is);
 
diff --git a/src/main/java/org/apache/horn/trainer/Neuron.java b/src/main/java/org/apache/horn/bsp/Neuron.java
similarity index 70%
rename from src/main/java/org/apache/horn/trainer/Neuron.java
rename to src/main/java/org/apache/horn/bsp/Neuron.java
index d1c35d1..f122b6d 100644
--- a/src/main/java/org/apache/horn/trainer/Neuron.java
+++ b/src/main/java/org/apache/horn/bsp/Neuron.java
@@ -15,37 +15,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.horn.trainer;
+package org.apache.horn.bsp;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.horn.funcs.Sigmoid;
+import org.apache.hama.commons.math.DoubleFunction;
 
 public abstract class Neuron<M extends Writable> implements NeuronInterface<M> {
   double output;
   double weight;
-
-  /**
-   * @return the theta value of this neuron.
-   */
-  public double getTheta() {
-    // TODO Auto-generated method stub
-    return 0;
-  }
+  double delta;
+  protected DoubleFunction squashingFunction;
 
   public void feedforward(double sum) {
     // TODO Auto-generated method stub
     // squashing
+    this.output = sum;
   }
 
   public void backpropagate(double gradient) {
     // TODO Auto-generated method stub
-
+    this.delta = gradient;
   }
 
-  public double activation(double sum) {
-    // TODO Auto-generated method stub
-    this.output = new Sigmoid().apply(sum);
-    return output;
+  public double getDelta() {
+    return delta;
+  }
+
+  public void setWeight(double weight) {
+    this.weight = weight;
   }
 
   public void setOutput(double output) {
@@ -57,18 +54,29 @@
   }
 
   // ////////* Below methods will communicate with parameter server */
-
-  public double getPreviousWeight() {
-    return weight;
-  }
+  private int i;
 
   public void push(double weight) {
-    // TODO Auto-generated method stub
-    this.weight = weight;
+    weights[i++] = weight;
   }
 
   public double getUpdate() {
     return weight;
   }
 
+  double[] weights;
+
+  public void setWeightVector(int rowCount) {
+    i = 0;
+    weights = new double[rowCount];
+  }
+
+  public double[] getWeights() {
+    return weights;
+  }
+
+  public void setSquashingFunction(DoubleFunction squashingFunction) {
+    this.squashingFunction = squashingFunction;
+  }
+
 }
diff --git a/src/main/java/org/apache/horn/trainer/NeuronInterface.java b/src/main/java/org/apache/horn/bsp/NeuronInterface.java
similarity index 97%
rename from src/main/java/org/apache/horn/trainer/NeuronInterface.java
rename to src/main/java/org/apache/horn/bsp/NeuronInterface.java
index c96931e..bcc1a5a 100644
--- a/src/main/java/org/apache/horn/trainer/NeuronInterface.java
+++ b/src/main/java/org/apache/horn/bsp/NeuronInterface.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.horn.trainer;
+package org.apache.horn.bsp;
 
 import java.io.IOException;
 
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
index bd0d103..aaad86e 100644
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
+++ b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
@@ -23,13 +23,13 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.lang.math.RandomUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.WritableUtils;
@@ -43,6 +43,8 @@
 import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
 import org.apache.hama.commons.math.FunctionFactory;
+import org.apache.hama.util.ReflectionUtils;
+import org.apache.horn.examples.MultiLayerPerceptron.StandardNeuron;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -63,7 +65,9 @@
 
   private static final Log LOG = LogFactory
       .getLog(SmallLayeredNeuralNetwork.class);
-  
+
+  public static Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>> neuronClass;
+
   /* Weights between neurons at adjacent layers */
   protected List<DoubleMatrix> weightMatrixList;
 
@@ -75,6 +79,8 @@
 
   protected int finalLayerIdx;
 
+  protected double regularizationWeight;
+
   public SmallLayeredNeuralNetwork() {
     this.layerSizeList = Lists.newArrayList();
     this.weightMatrixList = Lists.newArrayList();
@@ -84,6 +90,7 @@
 
   public SmallLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
     super(conf, modelPath);
+    this.regularizationWeight = conf.getDouble("regularization.weight", 0);
   }
 
   @Override
@@ -147,6 +154,7 @@
 
   /**
    * Set the previous weight matrices.
+   * 
    * @param prevUpdates
    */
   void setPrevWeightMatrices(DoubleMatrix[] prevUpdates) {
@@ -242,8 +250,8 @@
     // write squashing functions
     output.writeInt(this.squashingFunctionList.size());
     for (DoubleFunction aSquashingFunctionList : this.squashingFunctionList) {
-      WritableUtils.writeString(output, aSquashingFunctionList
-              .getFunctionName());
+      WritableUtils.writeString(output,
+          aSquashingFunctionList.getFunctionName());
     }
 
     // write weight matrices
@@ -305,10 +313,19 @@
       intermediateOutput = forward(i, intermediateOutput);
       outputCache.add(intermediateOutput);
     }
+
     return outputCache;
   }
 
   /**
+   * @return a new neuron instance
+   */
+  public static Neuron<Synapse<DoubleWritable, DoubleWritable>> newNeuronInstance() {
+    return (Neuron<Synapse<DoubleWritable, DoubleWritable>>) ReflectionUtils
+        .newInstance(neuronClass);
+  }
+
+  /**
    * Forward the calculation for one layer.
    * 
    * @param fromLayer The index of the previous layer.
@@ -318,8 +335,30 @@
   protected DoubleVector forward(int fromLayer, DoubleVector intermediateOutput) {
     DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer);
 
-    DoubleVector vec = weightMatrix.multiplyVectorUnsafe(intermediateOutput);
-    vec = vec.applyToElements(this.squashingFunctionList.get(fromLayer));
+    neuronClass = (Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>>) conf
+        .getClass("neuron.class", Neuron.class);
+
+    // TODO use the multithread processing
+    DoubleVector vec = new DenseDoubleVector(weightMatrix.getRowCount());
+    for (int row = 0; row < weightMatrix.getRowCount(); row++) {
+      List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
+      for (int col = 0; col < weightMatrix.getColumnCount(); col++) {
+        msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
+            new DoubleWritable(intermediateOutput.get(col)),
+            new DoubleWritable(weightMatrix.get(row, col))));
+      }
+      Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
+      Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
+      n.setup(conf);
+      n.setSquashingFunction(this.squashingFunctionList.get(fromLayer));
+      try {
+        n.forward(iterable);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      vec.set(row, n.getOutput());
+    }
 
     // add bias
     DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1);
@@ -327,6 +366,7 @@
     for (int i = 0; i < vec.getDimension(); ++i) {
       vecWithBias.set(i + 1, vec.get(i));
     }
+
     return vecWithBias;
   }
 
@@ -468,8 +508,6 @@
       DenseDoubleMatrix weightUpdateMatrix) {
 
     // get layer related information
-    DoubleFunction squashingFunction = this.squashingFunctionList
-        .get(curLayerIdx);
     DoubleVector curLayerOutput = outputCache.get(curLayerIdx);
     DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx);
     DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx);
@@ -480,41 +518,51 @@
           nextLayerDelta.getDimension() - 1);
     }
 
-    DoubleVector delta = weightMatrix.transpose()
-        .multiplyVector(nextLayerDelta);
-    for (int i = 0; i < delta.getDimension(); ++i) {
-      delta.set(
-          i,
-          delta.get(i)
-              * squashingFunction.applyDerivative(curLayerOutput.get(i)));
-    }
+    // DoubleMatrix transposed = weightMatrix.transpose();
+    DoubleVector deltaVector = new DenseDoubleVector(
+        weightMatrix.getColumnCount());
+    for (int row = 0; row < weightMatrix.getColumnCount(); ++row) {
+      Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
+      // calls setup method
+      n.setup(conf);
+      n.setSquashingFunction(this.squashingFunctionList.get(curLayerIdx));
+      n.setOutput(curLayerOutput.get(row));
 
-    // update weights
-    for (int i = 0; i < weightUpdateMatrix.getRowCount(); ++i) {
-      for (int j = 0; j < weightUpdateMatrix.getColumnCount(); ++j) {
-        weightUpdateMatrix.set(i, j,
-            -learningRate * nextLayerDelta.get(i) * curLayerOutput.get(j)
-                + this.momentumWeight * prevWeightMatrix.get(i, j));
+      List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
+
+      n.setWeightVector(weightMatrix.getRowCount());
+
+      for (int col = 0; col < weightMatrix.getRowCount(); ++col) {
+        // sum += (transposed.get(row, col) * nextLayerDelta.get(col));
+        msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
+            new DoubleWritable(nextLayerDelta.get(col)), new DoubleWritable(
+                weightMatrix.get(col, row)), new DoubleWritable(
+                prevWeightMatrix.get(col, row))));
       }
+
+      Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
+      try {
+        n.backward(iterable);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+
+      // update weights
+      weightUpdateMatrix.setColumn(row, n.getWeights());
+      deltaVector.set(row, n.getDelta());
     }
 
-    return delta;
+    return deltaVector;
   }
 
   @Override
-  protected void trainInternal(HamaConfiguration hamaConf, Path dataInputPath,
-      Map<String, String> trainingParams) throws IOException,
-      InterruptedException, ClassNotFoundException {
-    // add all training parameters to configuration
+  protected BSPJob trainInternal(HamaConfiguration hamaConf)
+      throws IOException, InterruptedException, ClassNotFoundException {
     this.conf = hamaConf;
     this.fs = FileSystem.get(conf);
-    
-    for (Map.Entry<String, String> entry : trainingParams.entrySet()) {
-      conf.set(entry.getKey(), entry.getValue());
-    }
 
-    // if training parameters contains the model path, update the model path
-    String modelPath = trainingParams.get("modelPath");
+    String modelPath = conf.get("model.path");
     if (modelPath != null) {
       this.modelPath = modelPath;
     }
@@ -524,8 +572,6 @@
           "Please specify the modelPath for model, "
               + "either through setModelPath() or add 'modelPath' to the training parameters.");
     }
-
-    conf.set("modelPath", this.modelPath);
     this.writeModelToFile();
 
     // create job
@@ -533,7 +579,11 @@
     job.setJobName("Small scale Neural Network training");
     job.setJarByClass(SmallLayeredNeuralNetworkTrainer.class);
     job.setBspClass(SmallLayeredNeuralNetworkTrainer.class);
-    job.setInputPath(dataInputPath);
+
+    job.getConfiguration().setClass("neuron.class", StandardNeuron.class,
+        Neuron.class);
+
+    job.setInputPath(new Path(conf.get("training.input.path")));
     job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
     job.setInputKeyClass(LongWritable.class);
     job.setInputValueClass(VectorWritable.class);
@@ -541,15 +591,7 @@
     job.setOutputValueClass(NullWritable.class);
     job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class);
 
-    int numTasks = conf.getInt("tasks", 1);
-    LOG.info(String.format("Number of tasks: %d\n", numTasks));
-    job.setNumBspTask(numTasks);
-    job.waitForCompletion(true);
-
-    // reload learned model
-    LOG.info(String.format("Reload model from %s.", this.modelPath));
-    this.readFromModel();
-
+    return job;
   }
 
   @Override
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
index 696d56c..58f96d1 100644
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
+++ b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
@@ -33,6 +33,7 @@
 import org.apache.hama.commons.math.DenseDoubleMatrix;
 import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.commons.math.FunctionFactory;
 import org.apache.hama.ipc.RPC;
 
 import com.google.common.base.Preconditions;
@@ -88,10 +89,9 @@
       BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
     // At least one master & slave worker exist.
     Preconditions.checkArgument(peer.getNumPeers() >= 2);
-
     this.conf = peer.getConfiguration();
 
-    String modelPath = conf.get("modelPath");
+    String modelPath = conf.get("model.path");
     this.inMemoryModel = new SmallLayeredNeuralNetwork(conf, modelPath);
 
     this.batchSize = conf.getInt("training.batch.size", 50);
@@ -135,8 +135,7 @@
     // write model to modelPath
     if (isMaster(peer)) {
       try {
-        LOG.info(String.format("Write model back to %s\n",
-            inMemoryModel.getModelPath()));
+        LOG.info("Write model back to " + inMemoryModel.getModelPath());
         this.inMemoryModel.writeModelToFile();
       } catch (IOException e) {
         e.printStackTrace();
diff --git a/src/main/java/org/apache/horn/trainer/PropMessage.java b/src/main/java/org/apache/horn/bsp/Synapse.java
similarity index 79%
rename from src/main/java/org/apache/horn/trainer/PropMessage.java
rename to src/main/java/org/apache/horn/bsp/Synapse.java
index 5724943..61725f9 100644
--- a/src/main/java/org/apache/horn/trainer/PropMessage.java
+++ b/src/main/java/org/apache/horn/bsp/Synapse.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.horn.trainer;
+package org.apache.horn.bsp;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -27,17 +27,24 @@
 /**
  * Message wrapper for a propagating message
  */
-public class PropMessage<M extends Writable, W extends Writable> implements
+public class Synapse<M extends Writable, W extends Writable> implements
     Writable {
 
   DoubleWritable message;
   DoubleWritable weight;
+  DoubleWritable prevWeight;
 
-  public PropMessage(DoubleWritable message, DoubleWritable weight) {
+  public Synapse(DoubleWritable message, DoubleWritable weight) {
     this.message = message;
     this.weight = weight;
   }
 
+  public Synapse(DoubleWritable message, DoubleWritable weight, DoubleWritable prevWeight) {
+    this.message = message;
+    this.weight = weight;
+    this.prevWeight = prevWeight;
+  }
+  
   /**
    * @return the activation or error message
    */
@@ -58,6 +65,10 @@
   public double getWeight() {
     return weight.get();
   }
+  
+  public double getPrevWeight() {
+    return prevWeight.get();
+  }
 
   @Override
   public void readFields(DataInput in) throws IOException {
diff --git a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
index 26402cc..08703cd 100644
--- a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
+++ b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
@@ -19,92 +19,99 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.TextInputFormat;
 import org.apache.horn.bsp.HornJob;
+import org.apache.horn.bsp.Neuron;
+import org.apache.horn.bsp.Synapse;
 import org.apache.horn.funcs.CrossEntropy;
 import org.apache.horn.funcs.Sigmoid;
-import org.apache.horn.trainer.Neuron;
-import org.apache.horn.trainer.PropMessage;
 
 public class MultiLayerPerceptron {
 
   public static class StandardNeuron extends
-      Neuron<PropMessage<DoubleWritable, DoubleWritable>> {
-
+      Neuron<Synapse<DoubleWritable, DoubleWritable>> {
     private double learningRate;
-    private double lambda;
     private double momentum;
-    private static double bias = -1;
 
     @Override
     public void setup(HamaConfiguration conf) {
-      this.learningRate = conf.getDouble("mlp.learning.rate", 0.1);
-      this.lambda = conf.getDouble("mlp.regularization.weight", 0.01);
+      this.learningRate = conf.getDouble("mlp.learning.rate", 0.5);
       this.momentum = conf.getDouble("mlp.momentum.weight", 0.2);
     }
 
     @Override
     public void forward(
-        Iterable<PropMessage<DoubleWritable, DoubleWritable>> messages)
+        Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
         throws IOException {
       double sum = 0;
-
-      for (PropMessage<DoubleWritable, DoubleWritable> m : messages) {
+      for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
         sum += m.getInput() * m.getWeight();
       }
-      sum += bias * this.getTheta(); // add bias feature
-      feedforward(activation(sum));
+
+      this.feedforward(this.squashingFunction.apply(sum));
     }
 
     @Override
     public void backward(
-        Iterable<PropMessage<DoubleWritable, DoubleWritable>> messages)
+        Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
         throws IOException {
-      for (PropMessage<DoubleWritable, DoubleWritable> m : messages) {
+      for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
         // Calculates error gradient for each neuron
-        double gradient = this.getOutput() * (1 - this.getOutput())
-            * m.getDelta() * m.getWeight();
-        backpropagate(gradient);
+        double gradient = this.squashingFunction.applyDerivative(this
+            .getOutput()) * (m.getDelta() * m.getWeight());
+        this.backpropagate(gradient);
 
         // Weight corrections
         double weight = -learningRate * this.getOutput() * m.getDelta()
-            + momentum * this.getPreviousWeight();
+            + momentum * m.getPrevWeight();
         this.push(weight);
       }
     }
+  }
 
+  public static HornJob createJob(HamaConfiguration conf, String modelPath,
+      String inputPath, double learningRate, double momemtumWeight,
+      double regularizationWeight, int features, int labels, int maxIteration,
+      int numOfTasks) throws IOException {
+
+    HornJob job = new HornJob(conf, MultiLayerPerceptron.class);
+    job.setTrainingSetPath(inputPath);
+    job.setModelPath(modelPath);
+
+    job.setNumBspTask(numOfTasks);
+    job.setMaxIteration(maxIteration);
+    job.setLearningRate(learningRate);
+    job.setMomentumWeight(momemtumWeight);
+    job.setRegularizationWeight(regularizationWeight);
+
+    job.setConvergenceCheckInterval(1000);
+    job.setBatchSize(300);
+
+    job.addLayer(features, Sigmoid.class);
+    job.addLayer(features, Sigmoid.class);
+    job.finalLayer(labels, Sigmoid.class);
+
+    job.setCostFunction(CrossEntropy.class);
+
+    return job;
   }
 
   public static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException {
-    HamaConfiguration conf = new HamaConfiguration();
-    HornJob job = new HornJob(conf, MultiLayerPerceptron.class);
-
-    job.setDouble("mlp.learning.rate", 0.1);
-    job.setDouble("mlp.regularization.weight", 0.01);
-    job.setDouble("mlp.momentum.weight", 0.2);
-
-    // initialize the topology of the model.
-    // a three-layer model is created in this example
-    job.addLayer(1000, StandardNeuron.class, Sigmoid.class); // 1st layer
-    job.addLayer(800, StandardNeuron.class, Sigmoid.class); // 2nd layer
-    job.addLayer(300, StandardNeuron.class, Sigmoid.class); // total classes
-
-    // set the cost function to evaluate the error
-    job.setCostFunction(CrossEntropy.class);
-
-    // set I/O and others
-    job.setInputFormat(TextInputFormat.class);
-    job.setOutputPath(new Path("/tmp/"));
-    job.setMaxIteration(10000);
-    job.setNumBspTask(3);
+    if (args.length < 9) {
+      System.out
+          .println("Usage: model_path training_set learning_rate momentum regularization_weight feature_dimension label_dimension max_iteration num_tasks");
+      System.exit(1);
+    }
+    HornJob ann = createJob(new HamaConfiguration(), args[0], args[1],
+        Double.parseDouble(args[2]), Double.parseDouble(args[3]),
+        Double.parseDouble(args[4]), Integer.parseInt(args[5]),
+        Integer.parseInt(args[6]), Integer.parseInt(args[7]),
+        Integer.parseInt(args[8]));
 
     long startTime = System.currentTimeMillis();
-
-    if (job.waitForCompletion(true)) {
+    if (ann.waitForCompletion(true)) {
       System.out.println("Job Finished in "
           + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
diff --git a/src/main/java/org/apache/horn/examples/NeuralNetwork.java b/src/main/java/org/apache/horn/examples/NeuralNetwork.java
index 737412b..1503ef0 100644
--- a/src/main/java/org/apache/horn/examples/NeuralNetwork.java
+++ b/src/main/java/org/apache/horn/examples/NeuralNetwork.java
@@ -160,9 +160,9 @@
 
       // train the model
       SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-      ann.setLearningRate(learningRate);
-      ann.setMomemtumWeight(momemtumWeight);
-      ann.setRegularizationWeight(regularizationWeight);
+      // ann.setLearningRate(learningRate);
+      // ann.setMomemtumWeight(momemtumWeight);
+      // ann.setRegularizationWeight(regularizationWeight);
       ann.addLayer(featureDimension, false,
           FunctionFactory.createDoubleFunction("Sigmoid"));
       ann.addLayer(featureDimension, false,
@@ -178,7 +178,7 @@
       trainingParameters.put("training.max.iterations", "" + iteration);
       trainingParameters.put("training.batch.size", "300");
       trainingParameters.put("convergence.check.interval", "1000");
-      ann.train(conf, new Path(trainingDataPath), trainingParameters);
+      // ann.train(conf, new Path(trainingDataPath), trainingParameters);
     }
 
   }
diff --git a/src/main/java/org/apache/horn/trainer/Trainer.java b/src/main/java/org/apache/horn/trainer/Trainer.java
deleted file mode 100644
index 4f903f0..0000000
--- a/src/main/java/org/apache/horn/trainer/Trainer.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.trainer;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-
-/**
- * The forward and backward passes are the essential computations of a Neural
- * Net. So, only few vertices of single layer of Neural Net will be activated in
- * a single superstep. This is quite inefficient. So, instead of doing like
- * this, we send training instance continuously at every superstep, and then
- * handle the information (forward messages of current training instance) and
- * error (backward messages of previous training instance) at once.
- * 
- * Then, we push the accumulated updates to parameter servers in the
- * corresponding mini-batch interval.
- * 
- */
-public class Trainer extends BSP {
-  
-  private static final Log LOG = LogFactory.getLog(Trainer.class);
-  
-  private boolean isConverge = false;
-  private int iterations;
-  private int maxIterations;
-  private int batchSize;
-
-  @Override
-  public final void setup(BSPPeer peer) {
-    this.iterations = 0;
-    this.maxIterations = peer.getConfiguration()
-        .getInt("horn.max.iteration", 1);
-    LOG.info("max iteration: " + this.maxIterations);
-    
-    // loads subset of neural network model replica into memory
-  }
-
-  @Override
-  public void bsp(BSPPeer peer) throws IOException, SyncException,
-      InterruptedException {
-
-    // Iterate until reach max iteration or convergence
-    while (this.iterations++ < maxIterations) {
-
-      // Fetch latest parameters
-      fetchParameters(peer);
-      // Perform the batch
-      performBatch(peer);
-      // Push parameters
-      pushParameters(peer);
-
-      if (this.isConverge) {
-        break;
-      }
-    }
-
-  }
-
-  /**
-   * Performs the mini-batch
-   * 
-   * @param peer
-   * @throws IOException 
-   * @throws InterruptedException 
-   * @throws SyncException 
-   */
-  private void performBatch(BSPPeer peer) throws IOException, SyncException, InterruptedException {
-    double avgTrainingError = 0.0;
-
-    int trains = 0;
-    while (trains < batchSize) {
-      // TODO reads and sends a single instance to first input layer
-      LongWritable key = new LongWritable();
-      Text value = new Text();
-      
-      if (!peer.readNext(key, value)) {
-        peer.reopenInput();
-        peer.readNext(key, value);
-      }
-      LOG.info(key + ", " + value);
-      
-      // TODO calls upward and downward methods
-
-      peer.sync();
-      trains++;
-    }
-  }
-
-  private void fetchParameters(BSPPeer peer) {
-    // TODO fetch latest weights from the parameter server
-  }
-
-  private void pushParameters(BSPPeer peer) {
-    // TODO push updated weights
-  }
-
-}
diff --git a/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java b/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java
index 9d5c0b9..a42fd72 100644
--- a/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java
+++ b/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java
@@ -53,8 +53,10 @@
     double[][] instances = { { 0, 0, 0, 1 }, { 0, 0, 1, 0 }, { 0, 1, 0, 0 },
         { 0, 0, 0, 0 } };
     AutoEncoder encoder = new AutoEncoder(4, 2);
-    encoder.setLearningRate(0.5);
-    encoder.setMomemtumWeight(0.2);
+    // TODO use the configuration
+
+    // encoder.setLearningRate(0.5);
+    // encoder.setMomemtumWeight(0.2);
 
     int maxIteration = 2000;
     Random rnd = new Random();
@@ -107,8 +109,8 @@
       vecInstanceList.add(new DenseDoubleVector(instance));
     }
     AutoEncoder encoder = new AutoEncoder(3, 2);
-    encoder.setLearningRate(0.05);
-    encoder.setMomemtumWeight(0.1);
+    // encoder.setLearningRate(0.05);
+    // encoder.setMomemtumWeight(0.1);
     int maxIteration = 2000;
     for (int iteration = 0; iteration < maxIteration; ++iteration) {
       for (DoubleVector vector : vecInstanceList) {
@@ -177,11 +179,11 @@
     String modelPath = "/tmp/autoencoder-modelpath";
     encoder.setModelPath(modelPath);
     Map<String, String> trainingParams = new HashMap<String, String>();
-    encoder.setLearningRate(0.5);
+    // encoder.setLearningRate(0.5);
     trainingParams.put("tasks", "5");
     trainingParams.put("training.max.iterations", "3000");
     trainingParams.put("training.batch.size", "200");
-    encoder.train(conf, path, trainingParams);
+    // encoder.train(conf, path, trainingParams);
 
     double errorInstance = 0;
     for (double[] instance : instanceList) {
diff --git a/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java b/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java
index 2f3a5b2..972d55a 100644
--- a/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java
+++ b/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java
@@ -70,11 +70,11 @@
     ann.setCostFunction(FunctionFactory
         .createDoubleDoubleFunction("SquaredError"));
     double learningRate = 0.2;
-    ann.setLearningRate(learningRate);
+    // ann.setLearningRate(learningRate);
     double momentumWeight = 0.5;
-    ann.setMomemtumWeight(momentumWeight);
+    // ann.setMomemtumWeight(momentumWeight);
     double regularizationWeight = 0.05;
-    ann.setRegularizationWeight(regularizationWeight);
+    //ann.setRegularizationWeight(regularizationWeight);
     // intentionally initialize all weights to 0.5
     DoubleMatrix[] matrices = new DenseDoubleMatrix[2];
     matrices[0] = new DenseDoubleMatrix(5, 3, 0.2);
@@ -99,10 +99,10 @@
     SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(new HamaConfiguration(), modelPath);
     assertEquals(annCopy.getClass().getSimpleName(), annCopy.getModelType());
     assertEquals(modelPath, annCopy.getModelPath());
-    assertEquals(learningRate, annCopy.getLearningRate(), 0.000001);
-    assertEquals(momentumWeight, annCopy.getMomemtumWeight(), 0.000001);
-    assertEquals(regularizationWeight, annCopy.getRegularizationWeight(),
-        0.000001);
+    // assertEquals(learningRate, annCopy.getLearningRate(), 0.000001);
+    // assertEquals(momentumWeight, annCopy.getMomemtumWeight(), 0.000001);
+    //assertEquals(regularizationWeight, annCopy.getRegularizationWeight(),
+    //    0.000001);
     assertEquals(TrainingMethod.GRADIENT_DESCENT, annCopy.getTrainingMethod());
     assertEquals(LearningStyle.UNSUPERVISED, annCopy.getLearningStyle());
 
@@ -137,7 +137,7 @@
         FunctionFactory.createDoubleFunction("IdentityFunction"));
     ann.setCostFunction(FunctionFactory
         .createDoubleDoubleFunction("SquaredError"));
-    ann.setLearningRate(0.1);
+    // ann.setLearningRate(0.1);
     // intentionally initialize all weights to 0.5
     DoubleMatrix[] matrices = new DenseDoubleMatrix[2];
     matrices[0] = new DenseDoubleMatrix(5, 3, 0.5);
@@ -157,7 +157,7 @@
     ann2.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
     ann2.setCostFunction(FunctionFactory
         .createDoubleDoubleFunction("SquaredError"));
-    ann2.setLearningRate(0.3);
+    // ann2.setLearningRate(0.3);
     // intentionally initialize all weights to 0.5
     DoubleMatrix[] matrices2 = new DenseDoubleMatrix[2];
     matrices2[0] = new DenseDoubleMatrix(3, 3, 0.5);
@@ -176,7 +176,7 @@
     ann3.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
     ann3.setCostFunction(FunctionFactory
         .createDoubleDoubleFunction("SquaredError"));
-    ann3.setLearningRate(0.3);
+    // ann3.setLearningRate(0.3);
     // intentionally initialize all weights to 0.5
     DoubleMatrix[] initMatrices = new DenseDoubleMatrix[2];
     initMatrices[0] = new DenseDoubleMatrix(3, 3, 0.5);
@@ -196,8 +196,8 @@
     ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
     ann.setCostFunction(FunctionFactory
         .createDoubleDoubleFunction("SquaredError"));
-    ann.setLearningRate(0.5);
-    ann.setMomemtumWeight(0.0);
+    // ann.setLearningRate(0.5);
+    // ann.setMomemtumWeight(0.0);
 
     int iterations = 50000; // iteration should be set to a very large number
     double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
@@ -249,8 +249,8 @@
     ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
     ann.setCostFunction(FunctionFactory
         .createDoubleDoubleFunction("SquaredError"));
-    ann.setLearningRate(0.6);
-    ann.setMomemtumWeight(0.3);
+    // ann.setLearningRate(0.6);
+    // ann.setMomemtumWeight(0.3);
 
     int iterations = 2000; // iteration should be set to a very large number
     double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
@@ -299,9 +299,9 @@
     ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
     ann.setCostFunction(FunctionFactory
         .createDoubleDoubleFunction("SquaredError"));
-    ann.setLearningRate(0.7);
-    ann.setMomemtumWeight(0.5);
-    ann.setRegularizationWeight(0.002);
+    // ann.setLearningRate(0.7);
+    // ann.setMomemtumWeight(0.5);
+    //ann.setRegularizationWeight(0.002);
 
     int iterations = 5000; // iteration should be set to a very large number
     double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
@@ -378,9 +378,9 @@
         instanceList.size() - 100);
 
     SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    ann.setLearningRate(0.001);
-    ann.setMomemtumWeight(0.1);
-    ann.setRegularizationWeight(0.01);
+    // ann.setLearningRate(0.001);
+    // ann.setMomemtumWeight(0.1);
+    //ann.setRegularizationWeight(0.01);
     ann.addLayer(dimension, false,
         FunctionFactory.createDoubleFunction("Sigmoid"));
     ann.addLayer(dimension, false,
@@ -486,9 +486,9 @@
     // create model
     int dimension = 8;
     SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    ann.setLearningRate(0.7);
-    ann.setMomemtumWeight(0.5);
-    ann.setRegularizationWeight(0.1);
+    // ann.setLearningRate(0.7);
+    // ann.setMomemtumWeight(0.5);
+    //ann.setRegularizationWeight(0.1);
     ann.addLayer(dimension, false,
         FunctionFactory.createDoubleFunction("Sigmoid"));
     ann.addLayer(dimension, false,
@@ -506,7 +506,7 @@
     trainingParameters.put("training.max.iterations", "2000");
     trainingParameters.put("training.batch.size", "300");
     trainingParameters.put("convergence.check.interval", "1000");
-    ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
+    //ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
 
     long end = new Date().getTime();
 
@@ -528,7 +528,7 @@
         (double) (end - start) / 1000));
     Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
   }
-  
+
   public void testLogisticRegressionDistributedVersionWithFeatureTransformer() {
     // write data into a sequence file
     String tmpStrDatasetPath = "/tmp/logistic_regression_data_feature_transformer";
@@ -591,9 +591,9 @@
     // create model
     int dimension = 8;
     SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    ann.setLearningRate(0.7);
-    ann.setMomemtumWeight(0.5);
-    ann.setRegularizationWeight(0.1);
+    // ann.setLearningRate(0.7);
+    // ann.setMomemtumWeight(0.5);
+    //ann.setRegularizationWeight(0.1);
     ann.addLayer(dimension, false,
         FunctionFactory.createDoubleFunction("Sigmoid"));
     ann.addLayer(dimension, false,
@@ -615,7 +615,7 @@
     trainingParameters.put("training.max.iterations", "2000");
     trainingParameters.put("training.batch.size", "300");
     trainingParameters.put("convergence.check.interval", "1000");
-    ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
+    //ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
     
 
     long end = new Date().getTime();
diff --git a/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java b/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java
index 4f44c94..5582dc0 100644
--- a/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java
+++ b/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java
@@ -25,9 +25,7 @@
 import java.io.OutputStreamWriter;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,7 +37,7 @@
 import org.apache.hama.commons.io.VectorWritable;
 import org.apache.hama.commons.math.DenseDoubleVector;
 import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.commons.math.FunctionFactory;
+import org.apache.horn.bsp.HornJob;
 import org.apache.horn.bsp.SmallLayeredNeuralNetwork;
 
 /**
@@ -198,33 +196,16 @@
     }
 
     try {
-      int iteration = 1000;
-      double learningRate = 0.4;
-      double momemtumWeight = 0.2;
-      double regularizationWeight = 0.01;
+      HornJob ann = MultiLayerPerceptron.createJob(conf, MODEL_PATH,
+          SEQTRAIN_DATA, 0.4, 0.2, 0.01, featureDimension, labelDimension,
+          1000, 2);
 
-      // train the model
-      SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-      ann.setLearningRate(learningRate);
-      ann.setMomemtumWeight(momemtumWeight);
-      ann.setRegularizationWeight(regularizationWeight);
-      ann.addLayer(featureDimension, false,
-          FunctionFactory.createDoubleFunction("Sigmoid"));
-      ann.addLayer(featureDimension, false,
-          FunctionFactory.createDoubleFunction("Sigmoid"));
-      ann.addLayer(labelDimension, true,
-          FunctionFactory.createDoubleFunction("Sigmoid"));
-      ann.setCostFunction(FunctionFactory
-          .createDoubleDoubleFunction("CrossEntropy"));
-      ann.setModelPath(MODEL_PATH);
-
-      Map<String, String> trainingParameters = new HashMap<String, String>();
-      trainingParameters.put("tasks", "2");
-      trainingParameters.put("training.max.iterations", "" + iteration);
-      trainingParameters.put("training.batch.size", "300");
-      trainingParameters.put("convergence.check.interval", "1000");
-      ann.train(conf, sequenceTrainingDataPath, trainingParameters);
-
+      long startTime = System.currentTimeMillis();
+      if (ann.waitForCompletion(true)) {
+        LOG.info("Job Finished in "
+            + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+      }
+      
     } catch (Exception e) {
       e.printStackTrace();
     }
diff --git a/src/test/java/org/apache/horn/trainer/TestNeuron.java b/src/test/java/org/apache/horn/trainer/TestNeuron.java
index d5042a1..b5f6bfc 100644
--- a/src/test/java/org/apache/horn/trainer/TestNeuron.java
+++ b/src/test/java/org/apache/horn/trainer/TestNeuron.java
@@ -25,6 +25,9 @@
 
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hama.HamaConfiguration;
+import org.apache.horn.bsp.Neuron;
+import org.apache.horn.bsp.Synapse;
+import org.apache.horn.funcs.Sigmoid;
 
 public class TestNeuron extends TestCase {
   private static double learningRate = 0.1;
@@ -32,7 +35,7 @@
   private static double theta = 0.8;
 
   public static class MyNeuron extends
-      Neuron<PropMessage<DoubleWritable, DoubleWritable>> {
+      Neuron<Synapse<DoubleWritable, DoubleWritable>> {
 
     @Override
     public void setup(HamaConfiguration conf) {
@@ -40,24 +43,23 @@
 
     @Override
     public void forward(
-        Iterable<PropMessage<DoubleWritable, DoubleWritable>> messages)
+        Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
         throws IOException {
       double sum = 0;
-      for (PropMessage<DoubleWritable, DoubleWritable> m : messages) {
+      for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
         sum += m.getInput() * m.getWeight();
       }
       sum += (bias * theta);
-      feedforward(activation(sum));
+      this.feedforward(new Sigmoid().apply(sum));
     }
 
     @Override
     public void backward(
-        Iterable<PropMessage<DoubleWritable, DoubleWritable>> messages)
+        Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
         throws IOException {
-      for (PropMessage<DoubleWritable, DoubleWritable> m : messages) {
+      for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
         // Calculates error gradient for each neuron
-        double gradient = this.getOutput() * (1 - this.getOutput())
-            * m.getDelta() * m.getWeight();
+        double gradient = new Sigmoid().applyDerivative(this.getOutput()) * (m.getDelta() * m.getWeight());
 
         // Propagates to lower layer
         backpropagate(gradient);
@@ -71,10 +73,10 @@
   }
 
   public void testProp() throws IOException {
-    List<PropMessage<DoubleWritable, DoubleWritable>> x = new ArrayList<PropMessage<DoubleWritable, DoubleWritable>>();
-    x.add(new PropMessage<DoubleWritable, DoubleWritable>(new DoubleWritable(
+    List<Synapse<DoubleWritable, DoubleWritable>> x = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
+    x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
         1.0), new DoubleWritable(0.5)));
-    x.add(new PropMessage<DoubleWritable, DoubleWritable>(new DoubleWritable(
+    x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
         1.0), new DoubleWritable(0.4)));
 
     MyNeuron n = new MyNeuron();
@@ -82,7 +84,7 @@
     assertEquals(0.5249791874789399, n.getOutput());
 
     x.clear();
-    x.add(new PropMessage<DoubleWritable, DoubleWritable>(new DoubleWritable(
+    x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
         -0.1274), new DoubleWritable(-1.2)));
     n.backward(x);
     assertEquals(-0.006688234848481696, n.getUpdate());
diff --git a/src/test/java/org/apache/horn/trainer/TestTrainer.java b/src/test/java/org/apache/horn/trainer/TestTrainer.java
deleted file mode 100644
index 295420e..0000000
--- a/src/test/java/org/apache/horn/trainer/TestTrainer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.trainer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.Constants;
-import org.apache.hama.HamaCluster;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.NullOutputFormat;
-import org.apache.hama.bsp.TextInputFormat;
-
-public class TestTrainer extends HamaCluster {
-  protected HamaConfiguration configuration;
-
-  // these variables are preventing from rebooting the whole stuff again since
-  // setup and teardown are called per method.
-
-  public TestTrainer() {
-    configuration = new HamaConfiguration();
-    configuration.set("bsp.master.address", "localhost");
-    configuration.set("hama.child.redirect.log.console", "true");
-    assertEquals("Make sure master addr is set to localhost:", "localhost",
-        configuration.get("bsp.master.address"));
-    configuration.set("bsp.local.dir", "/tmp/hama-test");
-    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
-    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
-    configuration.set("hama.sync.client.class",
-        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
-            .getCanonicalName());
-  }
-
-  @Override
-  public void setUp() throws Exception {
-    super.setUp();
-  }
-
-  @Override
-  public void tearDown() throws Exception {
-    super.tearDown();
-  }
-
-  public void testOutputJob() throws Exception {
-    String strTrainingDataPath = "src/test/resources/neuralnets_classification_training.txt";
-
-    Configuration conf = new Configuration();
-    conf.set("bsp.local.dir", "/tmp/hama-test");
-    conf.setInt("horn.max.iteration", 100);
-    conf.setInt("horn.minibatch.size", 10);
-    conf.setBoolean("bsp.input.runtime.partitioning", true);
-
-    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
-    bsp.setJobName("Test Replica Trainer");
-
-    bsp.setPartitioner(HashPartitioner.class);
-
-    bsp.setBspClass(Trainer.class);
-    bsp.setOutputFormat(NullOutputFormat.class);
-
-    bsp.setNumBspTask(2);
-    bsp.setInputFormat(TextInputFormat.class);
-    bsp.setInputPath(new Path(strTrainingDataPath));
-
-    bsp.waitForCompletion(true);
-  }
-
-}