Initial trainer codes

add unit test

commit current status

Add distributed model trainer
diff --git a/pom.xml b/pom.xml
index bffd62e..bc9d602 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,13 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hama</groupId>
+      <artifactId>hama-core</artifactId>
+      <version>${hama.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hama</groupId>
       <artifactId>hama-ml</artifactId>
       <version>${hama.version}</version>
     </dependency>
diff --git a/src/main/java/org/apache/horn/trainer/Neuron.java b/src/main/java/org/apache/horn/trainer/Neuron.java
new file mode 100644
index 0000000..1ae473b
--- /dev/null
+++ b/src/main/java/org/apache/horn/trainer/Neuron.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.trainer;
+
+import org.apache.hadoop.io.Writable;
+
+public abstract class Neuron<M extends Writable> implements NeuronInterface<M> {
+  double output;
+  double weight;
+
+  public void propagate(double gradient) {
+    // TODO Auto-generated method stub
+  }
+
+  public void setOutput(double output) {
+    this.output = output;
+  }
+
+  public double getOutput() {
+    return output;
+  }
+
+  public void push(double weight) {
+    // TODO Auto-generated method stub
+    this.weight = weight;
+  }
+
+  public double getUpdate() {
+    return weight;
+  }
+
+}
diff --git a/src/main/java/org/apache/horn/trainer/NeuronInterface.java b/src/main/java/org/apache/horn/trainer/NeuronInterface.java
new file mode 100644
index 0000000..4921c15
--- /dev/null
+++ b/src/main/java/org/apache/horn/trainer/NeuronInterface.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.trainer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public interface NeuronInterface<M extends Writable> {
+
+  /**
+   * This method is called when the messages are propagated from the lower
+   * layer. It can be used to determine if the neuron would activate, or fire.
+   * 
+   * @param messages
+   * @throws IOException
+   */
+  public void upward(Iterable<M> messages) throws IOException;
+
+  /**
+   * This method is called when the errors are propagated from the upper layer.
+   * It can be used to calculate the error of each neuron and change the
+   * weights.
+   * 
+   * @param messages
+   * @throws IOException
+   */
+  public void downward(Iterable<M> messages) throws IOException;
+  
+}
diff --git a/src/main/java/org/apache/horn/trainer/PropMessage.java b/src/main/java/org/apache/horn/trainer/PropMessage.java
new file mode 100644
index 0000000..74b2434
--- /dev/null
+++ b/src/main/java/org/apache/horn/trainer/PropMessage.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.trainer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Message wrapper for a propagating message
+ */
+public class PropMessage<M extends Writable, W extends Writable> implements
+    Writable {
+
+  M message;
+  W weight;
+
+  public PropMessage(M message, W weight) {
+    this.message = message;
+    this.weight = weight;
+  }
+
+  /**
+   * @return the activation or error message
+   */
+  public M getMessage() {
+    return message;
+  }
+
+  public W getWeight() {
+    return weight;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    message.readFields(in);
+    weight.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    message.write(out);
+    weight.write(out);
+  }
+
+}
diff --git a/src/main/java/org/apache/horn/trainer/Trainer.java b/src/main/java/org/apache/horn/trainer/Trainer.java
new file mode 100644
index 0000000..94309c9
--- /dev/null
+++ b/src/main/java/org/apache/horn/trainer/Trainer.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.trainer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+
+/**
+ * The forward and backward passes are the essential computations of a Neural
+ * Net. So, only few vertices of single layer of Neural Net will be activated in
+ * a single superstep. This is quite inefficient. So, instead of doing like
+ * this, we send training instance continuously at every superstep, and then
+ * handle the information (forward messages of current training instance) and
+ * error (backward messages of previous training instance) at once.
+ * 
+ * Then, we push the accumulated updates to parameter servers in the
+ * corresponding mini-batch interval.
+ * 
+ */
+public class Trainer extends BSP {
+  
+  private static final Log LOG = LogFactory.getLog(Trainer.class);
+  
+  private boolean isConverge = false;
+  private int iterations;
+  private int maxIterations;
+  private int batchSize;
+
+  @Override
+  public final void setup(BSPPeer peer) {
+    this.iterations = 0;
+    this.maxIterations = peer.getConfiguration()
+        .getInt("horn.max.iteration", 1);
+    this.batchSize = peer.getConfiguration()
+        .getInt("horn.minibatch.size", 1000);
+
+    LOG.info("max iteration: " + this.maxIterations);
+    
+    // loads subset of neural network model replica into memory
+  }
+
+  @Override
+  public void bsp(BSPPeer peer) throws IOException, SyncException,
+      InterruptedException {
+
+    // Iterate until reach max iteration or convergence
+    while (this.iterations++ < maxIterations) {
+
+      // Fetch latest parameters
+      fetchParameters(peer);
+      // Perform the batch
+      doMinibatch(peer);
+      // Push parameters
+      pushParameters(peer);
+
+      if (this.isConverge) {
+        break;
+      }
+    }
+
+  }
+
+  /**
+   * Performs the mini-batch
+   * 
+   * @param peer
+   * @throws IOException 
+   * @throws InterruptedException 
+   * @throws SyncException 
+   */
+  private void doMinibatch(BSPPeer peer) throws IOException, SyncException, InterruptedException {
+    double avgTrainingError = 0.0;
+
+    int trains = 0;
+    while (trains < batchSize) {
+      // TODO reads and sends a single instance to first input layer
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      
+      if (!peer.readNext(key, value)) {
+        peer.reopenInput();
+        peer.readNext(key, value);
+      }
+      LOG.info(key + ", " + value);
+      
+      // TODO calls upward and downward methods
+
+      peer.sync();
+      trains++;
+    }
+  }
+
+  private void fetchParameters(BSPPeer peer) {
+    // TODO fetch latest weights from the parameter server
+  }
+
+  private void pushParameters(BSPPeer peer) {
+    // TODO push updated weights
+  }
+
+}
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
new file mode 100644
index 0000000..f7bed47
--- /dev/null
+++ b/src/main/resources/log4j.properties
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.apache.avro=ERROR
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG
+#log4j.logger.org.apache.zookeeper=DEBUG
diff --git a/src/test/java/org/apache/horn/trainer/TestNeuron.java b/src/test/java/org/apache/horn/trainer/TestNeuron.java
new file mode 100644
index 0000000..823be51
--- /dev/null
+++ b/src/test/java/org/apache/horn/trainer/TestNeuron.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.trainer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hama.commons.math.Sigmoid;
+
+public class TestNeuron extends TestCase {
+  private static double learningRate = 0.1;
+  private static double bias = -1;
+  private static double theta = 0.8;
+
+  public static class MyNeuron extends
+      Neuron<PropMessage<DoubleWritable, DoubleWritable>> {
+
+    @Override
+    public void upward(
+        Iterable<PropMessage<DoubleWritable, DoubleWritable>> messages)
+        throws IOException {
+      double sum = 0;
+      for (PropMessage<DoubleWritable, DoubleWritable> m : messages) {
+        sum += m.getMessage().get() * m.getWeight().get();
+      }
+      sum += (bias * theta);
+
+      double output = new Sigmoid().apply(sum);
+      this.setOutput(output);
+      this.propagate(output);
+    }
+
+    @Override
+    public void downward(
+        Iterable<PropMessage<DoubleWritable, DoubleWritable>> messages)
+        throws IOException {
+      for (PropMessage<DoubleWritable, DoubleWritable> m : messages) {
+        // Calculates error gradient for each neuron
+        double gradient = this.getOutput() * (1 - this.getOutput())
+            * m.getMessage().get() * m.getWeight().get();
+
+        // Propagates to lower layer
+        this.propagate(gradient);
+
+        // Weight corrections
+        double weight = learningRate * this.getOutput() * m.getMessage().get();
+        this.push(weight);
+      }
+    }
+
+  }
+
+  public void testProp() throws IOException {
+    List<PropMessage<DoubleWritable, DoubleWritable>> x = new ArrayList<PropMessage<DoubleWritable, DoubleWritable>>();
+    x.add(new PropMessage<DoubleWritable, DoubleWritable>(new DoubleWritable(
+        1.0), new DoubleWritable(0.5)));
+    x.add(new PropMessage<DoubleWritable, DoubleWritable>(new DoubleWritable(
+        1.0), new DoubleWritable(0.4)));
+
+    MyNeuron n = new MyNeuron();
+    n.upward(x);
+    assertEquals(0.5249791874789399, n.getOutput());
+
+    x.clear();
+    x.add(new PropMessage<DoubleWritable, DoubleWritable>(new DoubleWritable(
+        -0.1274), new DoubleWritable(-1.2)));
+    n.downward(x);
+    assertEquals(-0.006688234848481696, n.getUpdate());
+  }
+  
+}
diff --git a/src/test/java/org/apache/horn/trainer/TestTrainer.java b/src/test/java/org/apache/horn/trainer/TestTrainer.java
new file mode 100644
index 0000000..295420e
--- /dev/null
+++ b/src/test/java/org/apache/horn/trainer/TestTrainer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.trainer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.NullOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
+
+public class TestTrainer extends HamaCluster {
+  protected HamaConfiguration configuration;
+
+  // these variables are preventing from rebooting the whole stuff again since
+  // setup and teardown are called per method.
+
+  public TestTrainer() {
+    configuration = new HamaConfiguration();
+    configuration.set("bsp.master.address", "localhost");
+    configuration.set("hama.child.redirect.log.console", "true");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.set("bsp.local.dir", "/tmp/hama-test");
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+    configuration.set("hama.sync.client.class",
+        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+            .getCanonicalName());
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  public void testOutputJob() throws Exception {
+    String strTrainingDataPath = "src/test/resources/neuralnets_classification_training.txt";
+
+    Configuration conf = new Configuration();
+    conf.set("bsp.local.dir", "/tmp/hama-test");
+    conf.setInt("horn.max.iteration", 100);
+    conf.setInt("horn.minibatch.size", 10);
+    conf.setBoolean("bsp.input.runtime.partitioning", true);
+
+    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
+    bsp.setJobName("Test Replica Trainer");
+
+    bsp.setPartitioner(HashPartitioner.class);
+
+    bsp.setBspClass(Trainer.class);
+    bsp.setOutputFormat(NullOutputFormat.class);
+
+    bsp.setNumBspTask(2);
+    bsp.setInputFormat(TextInputFormat.class);
+    bsp.setInputPath(new Path(strTrainingDataPath));
+
+    bsp.waitForCompletion(true);
+  }
+
+}
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
new file mode 100644
index 0000000..f7bed47
--- /dev/null
+++ b/src/test/resources/log4j.properties
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.apache.avro=ERROR
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG
+#log4j.logger.org.apache.zookeeper=DEBUG