HORN-13: Convert MNIST dataset into a sequence file
diff --git a/README.md b/README.md
index 060dc8c..651cfa2 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,60 @@
 # Apache Horn
 
-The Apache Horn is an Apache Incubating project, a neuron-centric programming model and Sync and Async hybrid distributed training framework, supports both data and model parallelism for training large models with massive datasets. Unlike most systems having matrix approach to neural network training, Horn adopted the the neuron-centric model which enables training large-scale deep learning on highly scalable CPU cluster. In the future, we plan also to support GPU accelerations for heterogeneous devices.
+The Apache Horn is an Apache Incubating project, a neuron-centric programming model and Sync and Async hybrid distributed training framework, supports both data and model parallelism for training large models with massive datasets on top of Apache Hadoop and Hama.
+
+## Programming Model
+
+Apache Horn provides a neuron-centric programming model for implementing the neural network based algorithms. The user defines the computation that takes place at each neuron in each layer of the model, and the messages that should be passed during the forward and backward phases of computation. For example, we apply a set of weights to the input data and calculate an output in forward() method like below:
+```Java
+    @Override
+    public void forward(
+        Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
+        throws IOException {
+      double sum = 0;
+      for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
+        sum += m.getInput() * m.getWeight();
+      }
+      this.feedforward(this.squashingFunction.apply(sum));
+    }
+```
+Then, we measure the margin of error of the output and adjust the weights accordingly to decrease the error in backward() method:
+```Java
+    @Override
+    public void backward(
+        Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
+        throws IOException {
+      for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
+        // Calculates error gradient for each neuron
+        double gradient = this.squashingFunction.applyDerivative(this
+            .getOutput()) * (m.getDelta() * m.getWeight());
+        this.backpropagate(gradient);
+
+        // Weight corrections
+        double weight = -learningRate * this.getOutput() * m.getDelta()
+            + momentum * m.getPrevWeight();
+        this.push(weight);
+      }
+    }
+  }
+```
+The advantages of this programming model are:
+
+ * Easy and intuitive to use
+ * Flexible to make your own CUDA kernels
+ * Allows multithreading to be used internally
+
+Also, Apache Horn provides a simplified and intuitive configuration interface. To create neural network job and submit it to existing Hadoop or Hama cluster, we just add the layer with its properties such as squashing function and neuron class. The below example configures the create 4-layer neural network with 500 neurons in hidden layers for train MNIST dataset:
+```Java
+  HornJob job = new HornJob(conf, MultiLayerPerceptron.class);
+  job.setLearningRate(learningRate);
+  ..
+
+  job.inputLayer(784, Sigmoid.class, StandardNeuron.class);
+  job.addLayer(500, Sigmoid.class, StandardNeuron.class);
+  job.addLayer(500, Sigmoid.class, StandardNeuron.class);
+  job.outputLayer(10, Sigmoid.class, StandardNeuron.class);
+  job.setCostFunction(CrossEntropy.class);
+```
 
 ## High Scalability
 
diff --git a/pom.xml b/pom.xml
index 9480f9e..8e258d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache</groupId>
     <artifactId>apache</artifactId>
-    <version>8</version>
+    <version>10</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
@@ -104,7 +104,12 @@
       <version>${log4j.version}</version>
     </dependency>
     
-  
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.5.3</version>
+    </dependency>
+ 
     <dependency>
       <groupId>org.apache.hama</groupId>
       <artifactId>hama-commons</artifactId>
diff --git a/src/main/java/org/apache/horn/bsp/HornJob.java b/src/main/java/org/apache/horn/bsp/HornJob.java
index 9f27889..a9c7cc1 100644
--- a/src/main/java/org/apache/horn/bsp/HornJob.java
+++ b/src/main/java/org/apache/horn/bsp/HornJob.java
@@ -36,12 +36,16 @@
     neuralNetwork = new SmallLayeredNeuralNetwork();
   }
 
+  public void inputLayer(int featureDimension, Class<? extends Function> func) {
+    addLayer(featureDimension, func);
+  }
+  
   public void addLayer(int featureDimension, Class<? extends Function> func) {
     neuralNetwork.addLayer(featureDimension, false,
         FunctionFactory.createDoubleFunction(func.getSimpleName()));
   }
 
-  public void finalLayer(int labels, Class<? extends Function> func) {
+  public void outputLayer(int labels, Class<? extends Function> func) {
     neuralNetwork.addLayer(labels, true,
         FunctionFactory.createDoubleFunction(func.getSimpleName()));
   }
diff --git a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
index 08703cd..f66344c 100644
--- a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
+++ b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
@@ -88,9 +88,9 @@
     job.setConvergenceCheckInterval(1000);
     job.setBatchSize(300);
 
+    job.inputLayer(features, Sigmoid.class);
     job.addLayer(features, Sigmoid.class);
-    job.addLayer(features, Sigmoid.class);
-    job.finalLayer(labels, Sigmoid.class);
+    job.outputLayer(labels, Sigmoid.class);
 
     job.setCostFunction(CrossEntropy.class);
 
diff --git a/src/main/java/org/apache/horn/utils/MNISTConverter.java b/src/main/java/org/apache/horn/utils/MNISTConverter.java
new file mode 100644
index 0000000..99742d6
--- /dev/null
+++ b/src/main/java/org/apache/horn/utils/MNISTConverter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.utils;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleVector;
+
+public class MNISTConverter {
+
+  private static int PIXELS = 28 * 28;
+
+  public static void main(String[] args) throws Exception {
+    if(args.length < 3) {
+      System.out.println("Usage: TRAINING_DATA LABELS_DATA OUTPUT_PATH");
+      System.out.println("ex) train-images.idx3-ubyte train-labels.idx1-ubyte /tmp/mnist.seq");
+      System.exit(1);
+    }
+    
+    String training_data = args[0];
+    String labels_data = args[1];
+    String output = args[2];
+
+    DataInputStream imagesIn = new DataInputStream(new FileInputStream(
+        new File(training_data)));
+    DataInputStream labelsIn = new DataInputStream(new FileInputStream(
+        new File(labels_data)));
+
+    imagesIn.readInt(); // Magic number
+    int count = imagesIn.readInt();
+    labelsIn.readInt(); // Magic number
+    labelsIn.readInt(); // Count
+    imagesIn.readInt(); // Rows
+    imagesIn.readInt(); // Cols
+
+    System.out.println("Writing " + count + " samples on " + output);
+
+    byte[][] images = new byte[count][PIXELS];
+    byte[] labels = new byte[count];
+    for (int n = 0; n < count; n++) {
+      imagesIn.readFully(images[n]);
+      labels[n] = labelsIn.readByte();
+    }
+
+    HamaConfiguration conf = new HamaConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(
+        output), LongWritable.class, VectorWritable.class);
+
+    for (int i = 0; i < count; i++) {
+      double[] vals = new double[PIXELS + 1];
+      for (int j = 0; j < PIXELS; j++) {
+        vals[j] = (images[i][j] & 0xff);
+      }
+      vals[PIXELS] = (labels[i] & 0xff);
+      writer.append(new LongWritable(), new VectorWritable(
+          new DenseDoubleVector(vals)));
+    }
+    
+    writer.close();
+  }
+}