HORN-32: Support RNN
diff --git a/semi.seq b/semi.seq
new file mode 100644
index 0000000..64e5871
--- /dev/null
+++ b/semi.seq
Binary files differ
diff --git a/src/main/java/org/apache/horn/examples/DropoutNeuron.java b/src/main/java/org/apache/horn/core/DropoutNeuron.java
similarity index 95%
rename from src/main/java/org/apache/horn/examples/DropoutNeuron.java
rename to src/main/java/org/apache/horn/core/DropoutNeuron.java
index ec02570..c9ebe9f 100644
--- a/src/main/java/org/apache/horn/examples/DropoutNeuron.java
+++ b/src/main/java/org/apache/horn/core/DropoutNeuron.java
@@ -15,13 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.horn.examples;
+package org.apache.horn.core;
import java.io.IOException;
import org.apache.hadoop.io.FloatWritable;
-import org.apache.horn.core.Neuron;
-import org.apache.horn.core.Synapse;
import org.apache.horn.utils.MathUtils;
public class DropoutNeuron extends
diff --git a/src/main/java/org/apache/horn/core/HornJob.java b/src/main/java/org/apache/horn/core/HornJob.java
index 82343fe..ea622ff 100644
--- a/src/main/java/org/apache/horn/core/HornJob.java
+++ b/src/main/java/org/apache/horn/core/HornJob.java
@@ -62,6 +62,11 @@
neuralNetwork.setDropRateOfInputLayer(dropRate);
}
+ public void inputLayer(int featureDimension, float dropRate, Class<? extends Neuron<?>> neuronClass) {
+ addLayer(featureDimension, null, neuronClass);
+ neuralNetwork.setDropRateOfInputLayer(dropRate);
+ }
+
public void addLayer(int featureDimension, Class<? extends Function> func,
Class<? extends Neuron<?>> neuronClass) {
neuralNetwork.addLayer(
@@ -71,12 +76,37 @@
.getSimpleName()) : null, neuronClass);
}
+ /**
+ * TODO: Adds comments
+ * @param featureDimension
+ * @param class1
+ * @param neuronClass
+ */
+ public void addLayer(int featureDimension, Class<? extends Function> func,
+ Class<? extends Neuron<?>> neuronClass, boolean isRecurrent) {
+ if (neuralNetwork instanceof RecurrentLayeredNeuralNetwork) {
+ ((RecurrentLayeredNeuralNetwork)neuralNetwork).addLayer(
+ featureDimension,
+ false,
+ (func != null) ? FunctionFactory.createFloatFunction(func
+ .getSimpleName()) : null, neuronClass, null, isRecurrent);
+ } else {
+ this.addLayer(featureDimension, func, neuronClass);
+ }
+ }
+
public void outputLayer(int labels, Class<? extends Function> func,
Class<? extends Neuron<?>> neuronClass) {
neuralNetwork.addLayer(labels, true,
FunctionFactory.createFloatFunction(func.getSimpleName()), neuronClass);
}
+ public void outputLayer(int labels, Class<? extends Function> func,
+ Class<? extends Neuron<?>> neuronClass, int numOutCells) {
+ ((RecurrentLayeredNeuralNetwork)neuralNetwork).addLayer(labels, true,
+ FunctionFactory.createFloatFunction(func.getSimpleName()), neuronClass, numOutCells);
+ }
+
public void setCostFunction(Class<? extends Function> func) {
neuralNetwork.setCostFunction(FunctionFactory.createFloatFloatFunction(func
.getSimpleName()));
@@ -94,6 +124,11 @@
this.conf.setInt("training.batch.size", batchSize);
}
+ public void setRecurrentStepSize(int stepSize) {
+ ((RecurrentLayeredNeuralNetwork) neuralNetwork).setRecurrentStepSize(stepSize);
+ this.conf.setInt("training.recurrent.step.size", stepSize);
+ }
+
public void setTrainingMethod(TrainingMethod method) {
this.neuralNetwork.setTrainingMethod(method);
}
diff --git a/src/main/java/org/apache/horn/examples/DropoutNeuron.java b/src/main/java/org/apache/horn/core/RecurrentDropoutNeuron.java
similarity index 70%
copy from src/main/java/org/apache/horn/examples/DropoutNeuron.java
copy to src/main/java/org/apache/horn/core/RecurrentDropoutNeuron.java
index ec02570..da4846b 100644
--- a/src/main/java/org/apache/horn/examples/DropoutNeuron.java
+++ b/src/main/java/org/apache/horn/core/RecurrentDropoutNeuron.java
@@ -15,24 +15,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.horn.examples;
+package org.apache.horn.core;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.FloatWritable;
-import org.apache.horn.core.Neuron;
-import org.apache.horn.core.Synapse;
import org.apache.horn.utils.MathUtils;
-public class DropoutNeuron extends
+public class RecurrentDropoutNeuron extends
Neuron<Synapse<FloatWritable, FloatWritable>> {
private float m2;
+ private float recurrentDelta = 0;
+ private double dropRate = 0;
+
+ public double getDropRate() {
+ return dropRate;
+ }
+
+ public void setDropRate(double dropRate) {
+ this.dropRate = dropRate;
+ }
@Override
public void forward(Iterable<Synapse<FloatWritable, FloatWritable>> messages)
throws IOException {
- m2 = (isTraining()) ? MathUtils.getBinomial(1, 0.5) : 0.5f;
+ m2 = (isTraining()) ? MathUtils.getBinomial(1, dropRate) :1.0f;
if (m2 > 0) {
float sum = 0;
@@ -48,6 +58,8 @@
}
}
+ private static final Log LOG = LogFactory.getLog(RecurrentDropoutNeuron.class);
+
@Override
public void backward(Iterable<Synapse<FloatWritable, FloatWritable>> messages)
throws IOException {
@@ -63,11 +75,18 @@
* this.getOutput() + this.getMomentumWeight() * m.getPrevWeight();
this.push(weight);
}
-
- this.backpropagate(delta * squashingFunction.applyDerivative(getOutput()));
+ // TODO set squashingFunction of recurrent neurons identity
+ this.backpropagate(recurrentDelta + delta * squashingFunction.applyDerivative(getOutput()));
} else {
this.backpropagate(0);
}
}
+ public float getRecurrentDelta() {
+ return recurrentDelta;
+ }
+
+ public void setRecurrentDelta(float recurrentDelta) {
+ this.recurrentDelta = recurrentDelta;
+ }
}
diff --git a/src/main/java/org/apache/horn/core/RecurrentLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/RecurrentLayeredNeuralNetwork.java
new file mode 100644
index 0000000..d65e4a6
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/RecurrentLayeredNeuralNetwork.java
@@ -0,0 +1,1032 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.commons.io.FloatMatrixWritable;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseFloatMatrix;
+import org.apache.hama.commons.math.DenseFloatVector;
+import org.apache.hama.commons.math.FloatFunction;
+import org.apache.hama.commons.math.FloatMatrix;
+import org.apache.hama.commons.math.FloatVector;
+import org.apache.hama.util.ReflectionUtils;
+import org.apache.horn.core.Constants.LearningStyle;
+import org.apache.horn.core.Constants.TrainingMethod;
+import org.apache.horn.examples.MultiLayerPerceptron.StandardNeuron;
+import org.apache.horn.funcs.FunctionFactory;
+import org.apache.horn.funcs.IdentityFunction;
+import org.apache.horn.funcs.SoftMax;
+import org.apache.horn.utils.MathUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * SmallLayeredNeuralNetwork defines the general operations for derivative
+ * layered models, include Linear Regression, Logistic Regression, Multilayer
+ * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc. For
+ * SmallLayeredNeuralNetwork, the training can be conducted in parallel, but the
+ * parameters of the models are assumes to be stored in a single machine.
+ *
+ * In general, these models consist of neurons which are aligned in layers.
+ * Between layers, for any two adjacent layers, the neurons are connected to
+ * form a bipartite weighted graph.
+ *
+ */
+public class RecurrentLayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
+
+ private static final Log LOG = LogFactory.getLog(RecurrentLayeredNeuralNetwork.class);
+
+ /* Weights between neurons at adjacent layers */
+ protected List<FloatMatrix> weightMatrixList;
+ /* Weights between neurons at adjacent layers */
+ protected List<List<FloatMatrix>> weightMatrixLists;
+ /* Previous weight updates between neurons at adjacent layers */
+ protected List<FloatMatrix> prevWeightUpdatesList;
+ protected List<List<FloatMatrix>> prevWeightUpdatesLists;
+ /* Different layers can have different squashing function */
+ protected List<FloatFunction> squashingFunctionList;
+ protected List<Class<? extends Neuron<?>>> neuronClassList;
+ /* Record the recurrent layer */
+ protected List<Boolean> recurrentLayerList;
+ /* Recurrent step size */
+ protected int recurrentStepSize;
+ protected int finalLayerIdx;
+ private List<Neuron<?>[]> neurons;
+ private List<List<Neuron<?>[]>> neuronLists;
+ private float dropRate;
+ private long iterations;
+
+ private int numOutCells;
+
+ public RecurrentLayeredNeuralNetwork() {
+ this.layerSizeList = Lists.newArrayList();
+ this.weightMatrixList = Lists.newArrayList();
+ this.prevWeightUpdatesList = Lists.newArrayList();
+ this.squashingFunctionList = Lists.newArrayList();
+ this.neuronClassList = Lists.newArrayList();
+ this.weightMatrixLists = Lists.newArrayList();
+ this.prevWeightUpdatesLists = Lists.newArrayList();
+ this.neuronLists = Lists.newArrayList();
+ this.recurrentLayerList = Lists.newArrayList();
+ }
+
+ public RecurrentLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
+ super(conf, modelPath);
+ initializeNeurons(false);
+ initializeWeightMatrixLists();
+ }
+
+ public RecurrentLayeredNeuralNetwork(HamaConfiguration conf, String modelPath,
+ boolean isTraining) {
+ super(conf, modelPath);
+ initializeNeurons(isTraining);
+ initializeWeightMatrixLists();
+ }
+
+ /**
+ * initialize neuron objects
+ * @param isTraining
+ */
+ private void initializeNeurons(boolean isTraining) {
+ this.neuronLists = Lists.newArrayListWithExpectedSize(recurrentStepSize);
+ for (int stepIdx = 0; stepIdx < this.recurrentStepSize; stepIdx++) {
+ neurons = new ArrayList<Neuron<?>[]>();
+
+ int expectedNeuronsSize = this.layerSizeList.size();
+ if (stepIdx < this.recurrentStepSize - this.numOutCells) {
+ expectedNeuronsSize--;
+ }
+ for (int neuronLayerIdx = 0; neuronLayerIdx < expectedNeuronsSize; neuronLayerIdx++) {
+ int numOfNeurons = layerSizeList.get(neuronLayerIdx);
+ // if not final layer and next layer is recurrent
+ if (stepIdx > 0 && neuronLayerIdx < layerSizeList.size() - 1
+ && this.recurrentLayerList.get(neuronLayerIdx+1)) {
+ numOfNeurons = numOfNeurons + layerSizeList.get(neuronLayerIdx+1) - 1;
+ }
+ Class<? extends Neuron<?>> neuronClass;
+ if (neuronLayerIdx == 0)
+ neuronClass = StandardNeuron.class; // actually doesn't needed
+ else
+ neuronClass = neuronClassList.get(neuronLayerIdx - 1);
+
+ Neuron<?>[] tmp = new Neuron[numOfNeurons];
+ for (int neuronIdx = 0; neuronIdx < numOfNeurons; neuronIdx++) {
+ Neuron<?> n = newNeuronInstance(neuronClass);
+ if (n instanceof RecurrentDropoutNeuron)
+ ((RecurrentDropoutNeuron) n).setDropRate(dropRate);
+ if (neuronLayerIdx > 0 && neuronIdx < layerSizeList.get(neuronLayerIdx))
+ n.setSquashingFunction(squashingFunctionList.get(neuronLayerIdx - 1));
+ else
+ n.setSquashingFunction(new IdentityFunction());
+ n.setLayerIndex(neuronLayerIdx);
+ n.setNeuronID(neuronIdx);
+ n.setLearningRate(this.learningRate);
+ n.setMomentumWeight(this.momentumWeight);
+ n.setTraining(isTraining);
+ tmp[neuronIdx] = n;
+ }
+ neurons.add(tmp);
+ }
+ this.neuronLists.add(neurons);
+ }
+ }
+
+ /**
+ * Initialize WeightMatrixLists
+ */
+ public void initializeWeightMatrixLists() {
+ this.numOutCells = (numOutCells == 0 ? this.recurrentStepSize:numOutCells);
+ this.weightMatrixLists.clear();
+ this.weightMatrixLists = Lists.newArrayListWithExpectedSize(this.recurrentStepSize);
+ this.prevWeightUpdatesLists.clear();
+ this.prevWeightUpdatesLists = Lists.newArrayListWithExpectedSize(this.recurrentStepSize);
+
+ for (int stepIdx = 0; stepIdx < recurrentStepSize - 1; stepIdx++) {
+ int expectedMatrixListSize = this.layerSizeList.size() - 1;
+ if (stepIdx < this.recurrentStepSize - this.numOutCells) {
+ expectedMatrixListSize--;
+ }
+ List<FloatMatrix> aWeightMatrixList = Lists.newArrayListWithExpectedSize(
+ expectedMatrixListSize);
+ List<FloatMatrix> aPrevWeightUpdatesList = Lists.newArrayListWithExpectedSize(
+ expectedMatrixListSize);
+ for (int matrixIdx = 0; matrixIdx < expectedMatrixListSize; matrixIdx++) {
+ int rows = this.weightMatrixList.get(matrixIdx).getRowCount();
+ int cols = this.weightMatrixList.get(matrixIdx).getColumnCount();
+ if ( stepIdx == 0 )
+ cols = this.layerSizeList.get(matrixIdx);
+ FloatMatrix weightMatrix = new DenseFloatMatrix(rows, cols);
+ weightMatrix.applyToElements(new FloatFunction() {
+ @Override
+ public float apply(float value) {
+ return RandomUtils.nextFloat() - 0.5f;
+ }
+ @Override
+ public float applyDerivative(float value) {
+ throw new UnsupportedOperationException("");
+ }
+ });
+ aWeightMatrixList.add(weightMatrix);
+ aPrevWeightUpdatesList.add(
+ new DenseFloatMatrix(
+ this.prevWeightUpdatesList.get(matrixIdx).getRowCount(),
+ this.prevWeightUpdatesList.get(matrixIdx).getColumnCount()));
+ }
+ this.weightMatrixLists.add(aWeightMatrixList);
+ this.prevWeightUpdatesLists.add(aPrevWeightUpdatesList);
+ }
+ // add matrix of last step
+ this.weightMatrixLists.add(this.weightMatrixList);
+ this.prevWeightUpdatesLists.add(this.prevWeightUpdatesList);
+ this.weightMatrixList = Lists.newArrayList();
+ this.prevWeightUpdatesList = Lists.newArrayList();
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public int addLayer(int size, boolean isFinalLayer,
+ FloatFunction squashingFunction, Class<? extends Neuron<?>> neuronClass) {
+ return addLayer(size, isFinalLayer, squashingFunction, neuronClass, null, true);
+ }
+
+ public int addLayer(int size, boolean isFinalLayer,
+ FloatFunction squashingFunction, Class<? extends Neuron<?>> neuronClass, int numOutCells) {
+ if (isFinalLayer)
+ this.numOutCells = (numOutCells == 0 ? this.recurrentStepSize:numOutCells);
+ return addLayer(size, isFinalLayer, squashingFunction, neuronClass, null, false);
+ }
+
+ public int addLayer(int size, boolean isFinalLayer,
+ FloatFunction squashingFunction, Class<? extends Neuron<?>> neuronClass,
+ Class<? extends IntermediateOutput> interlayer, boolean isRecurrent) {
+ Preconditions.checkArgument(size > 0,
+ "Size of layer must be larger than 0.");
+ if (!isFinalLayer) {
+ if (this.layerSizeList.size() == 0) {
+ this.recurrentLayerList.add(false);
+ LOG.info("add input layer: " + size + " neurons");
+ } else {
+ this.recurrentLayerList.add(isRecurrent);
+ LOG.info("add hidden layer: " + size + " neurons");
+ }
+ size += 1;
+ } else {
+ this.recurrentLayerList.add(false);
+ }
+
+ this.layerSizeList.add(size);
+ int layerIdx = this.layerSizeList.size() - 1;
+ if (isFinalLayer) {
+ this.finalLayerIdx = layerIdx;
+ LOG.info("add output layer: " + size + " neurons");
+ }
+
+ // add weights between current layer and previous layer, and input layer has
+ // no squashing function
+ if (layerIdx > 0) {
+ int sizePrevLayer = this.layerSizeList.get(layerIdx - 1);
+ // row count equals to size of current size and column count equals to
+ // size of previous layer
+ int row = isFinalLayer ? size : size - 1;
+ // expand matrix for recurrent layer
+ int col = !(this.recurrentLayerList.get(layerIdx)) ?
+ sizePrevLayer : sizePrevLayer + this.layerSizeList.get(layerIdx) - 1;
+
+ FloatMatrix weightMatrix = new DenseFloatMatrix(row, col);
+ // initialize weights
+ weightMatrix.applyToElements(new FloatFunction() {
+ @Override
+ public float apply(float value) {
+ return RandomUtils.nextFloat() - 0.5f;
+ }
+
+ @Override
+ public float applyDerivative(float value) {
+ throw new UnsupportedOperationException("");
+ }
+ });
+ this.weightMatrixList.add(weightMatrix);
+ this.prevWeightUpdatesList.add(new DenseFloatMatrix(row, col));
+ this.squashingFunctionList.add(squashingFunction);
+
+ this.neuronClassList.add(neuronClass);
+ }
+ return layerIdx;
+ }
+
+ /**
+ * Update the weight matrices with given matrices.
+ *
+ * @param matrices
+ */
+ public void updateWeightMatrices(FloatMatrix[] matrices) {
+ int matrixIdx = 0;
+ for (List<FloatMatrix> aWeightMatrixList: this.weightMatrixLists) {
+ for (int weightMatrixIdx = 0; weightMatrixIdx < aWeightMatrixList.size(); weightMatrixIdx++) {
+ FloatMatrix matrix = aWeightMatrixList.get(weightMatrixIdx);
+ aWeightMatrixList.set(weightMatrixIdx, matrix.add(matrices[matrixIdx++]));
+ }
+ }
+ }
+
+ /**
+ * Set the previous weight matrices.
+ *
+ * @param prevUpdates
+ */
+ void setPrevWeightMatrices(FloatMatrix[] prevUpdates) {
+ int matrixIdx = 0;
+ for (List<FloatMatrix> aWeightUpdateMatrixList: this.prevWeightUpdatesLists) {
+ for (int weightMatrixIdx = 0; weightMatrixIdx < aWeightUpdateMatrixList.size();
+ weightMatrixIdx++) {
+ aWeightUpdateMatrixList.set(weightMatrixIdx, prevUpdates[matrixIdx++]);
+ }
+ }
+ }
+
+ /**
+ * Add a batch of matrices onto the given destination matrices.
+ *
+ * @param destMatrices
+ * @param sourceMatrices
+ */
+ static void matricesAdd(FloatMatrix[] destMatrices,
+ FloatMatrix[] sourceMatrices) {
+ for (int i = 0; i < destMatrices.length; ++i) {
+ destMatrices[i] = destMatrices[i].add(sourceMatrices[i]);
+ }
+ }
+
+ /**
+ * Get all the weight matrices.
+ *
+ * @return The matrices in form of matrix array.
+ */
+ FloatMatrix[] getWeightMatrices() {
+ FloatMatrix[] matrices = new FloatMatrix[this.getSizeOfWeightmatrix()];
+ int matrixIdx = 0;
+ for (List<FloatMatrix> aWeightMatrixList: this.weightMatrixLists) {
+ for (FloatMatrix aWeightMatrix : aWeightMatrixList) {
+ matrices[matrixIdx++] = aWeightMatrix;
+ }
+ }
+ return matrices;
+ }
+
+ /**
+ * Set the weight matrices.
+ *
+ * @param matrices
+ */
+ public void setWeightMatrices(FloatMatrix[] matrices) {
+ int matrixIdx = 0;
+ for (List<FloatMatrix> aWeightMatrixList: this.weightMatrixLists) {
+ for (int weightMatrixIdx = 0; weightMatrixIdx < aWeightMatrixList.size(); weightMatrixIdx++) {
+ aWeightMatrixList.set(weightMatrixIdx, matrices[matrixIdx++]);
+ }
+ }
+ }
+
+ /**
+ * Get the previous matrices updates in form of array.
+ *
+ * @return The matrices in form of matrix array.
+ */
+ public FloatMatrix[] getPrevMatricesUpdates() {
+ FloatMatrix[] matrices = new FloatMatrix[this.getSizeOfWeightmatrix()];
+ int matrixIdx = 0;
+ for (List<FloatMatrix> aWeightMatrixList: this.prevWeightUpdatesLists) {
+ for (FloatMatrix aWeightMatrix : aWeightMatrixList) {
+ matrices[matrixIdx++] = aWeightMatrix;
+ }
+ }
+ return matrices;
+ }
+
+ public void setWeightMatrix(int index, FloatMatrix matrix) {
+ Preconditions.checkArgument(
+ 0 <= index && index < this.weightMatrixList.size(), String.format(
+ "index [%d] should be in range[%d, %d].", index, 0,
+ this.weightMatrixList.size()));
+ this.weightMatrixList.set(index, matrix);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+
+ this.finalLayerIdx = input.readInt();
+ this.dropRate = input.readFloat();
+
+ // read neuron classes
+ int neuronClasses = input.readInt();
+ this.neuronClassList = Lists.newArrayList();
+ for (int i = 0; i < neuronClasses; ++i) {
+ try {
+ Class<? extends Neuron<?>> clazz = (Class<? extends Neuron<?>>) Class
+ .forName(input.readUTF());
+ neuronClassList.add(clazz);
+ } catch (ClassNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ // read squash functions
+ int squashingFunctionSize = input.readInt();
+ this.squashingFunctionList = Lists.newArrayList();
+ for (int i = 0; i < squashingFunctionSize; ++i) {
+ this.squashingFunctionList.add(FunctionFactory
+ .createFloatFunction(WritableUtils.readString(input)));
+ }
+
+ this.recurrentStepSize = input.readInt();
+ this.numOutCells = input.readInt();
+ int recurrentLayerListSize = input.readInt();
+ this.recurrentLayerList = Lists.newArrayList();
+ for (int i = 0; i < recurrentLayerListSize; i++) {
+ this.recurrentLayerList.add(input.readBoolean());
+ }
+
+ // read weights and construct matrices of previous updates
+ int numOfMatrices = input.readInt();
+ this.weightMatrixLists = Lists.newArrayListWithExpectedSize(this.recurrentStepSize);
+ this.prevWeightUpdatesLists = Lists.newArrayList();
+
+ for (int step = 0; step < this.recurrentStepSize; step++) {
+ this.weightMatrixList = Lists.newArrayList();
+ this.prevWeightUpdatesList = Lists.newArrayList();
+
+ for (int j = 0; j < this.layerSizeList.size() - 2; j++) {
+ FloatMatrix matrix = FloatMatrixWritable.read(input);
+ this.weightMatrixList.add(matrix);
+ this.prevWeightUpdatesList.add(new DenseFloatMatrix(matrix.getRowCount(),
+ matrix.getColumnCount()));
+ }
+ // if the cell has output layer, read from input
+ if (step >= this.recurrentStepSize - this.numOutCells) {
+ FloatMatrix matrix = FloatMatrixWritable.read(input);
+ this.weightMatrixList.add(matrix);
+ this.prevWeightUpdatesList.add(new DenseFloatMatrix(matrix.getRowCount(),
+ matrix.getColumnCount()));
+ }
+ this.weightMatrixLists.add(this.weightMatrixList);
+ this.prevWeightUpdatesLists.add(this.prevWeightUpdatesList);
+ }
+ }
+
+// }
+ protected int getSizeOfWeightmatrix() {
+ return this.recurrentStepSize * (this.layerSizeList.size() - 2) + this.numOutCells;
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ super.write(output);
+ output.writeInt(finalLayerIdx);
+ output.writeFloat(dropRate);
+
+ // write neuron classes
+ output.writeInt(this.neuronClassList.size());
+ for (Class<? extends Neuron<?>> clazz : this.neuronClassList) {
+ output.writeUTF(clazz.getName());
+ }
+
+ // write squashing functions
+ output.writeInt(this.squashingFunctionList.size());
+ for (FloatFunction aSquashingFunctionList : this.squashingFunctionList) {
+ WritableUtils.writeString(output,
+ aSquashingFunctionList.getFunctionName());
+ }
+
+ // write recurrent step size
+ output.writeInt(this.recurrentStepSize);
+
+ // write recurrent step size
+ output.writeInt(this.numOutCells);
+
+ // write recurrent layer list
+ output.writeInt(this.recurrentLayerList.size());
+ for (Boolean isReccurentLayer: recurrentLayerList) {
+ output.writeBoolean(isReccurentLayer);
+ }
+
+ // write weight matrices
+ output.writeInt(this.getSizeOfWeightmatrix());
+ for (List<FloatMatrix> aWeightMatrixLists : this.weightMatrixLists) {
+ for (FloatMatrix aWeightMatrixList : aWeightMatrixLists) {
+ FloatMatrixWritable.write(aWeightMatrixList, output);
+ }
+ }
+
+ // DO NOT WRITE WEIGHT UPDATE
+ }
+
+ @Override
+ public FloatMatrix getWeightsByLayer(int layerIdx) {
+ return this.weightMatrixList.get(layerIdx);
+ }
+
+ public FloatMatrix getWeightsByLayer(int stepIdx, int layerIdx) {
+ return this.weightMatrixLists.get(stepIdx).get(layerIdx);
+ }
+
+ /**
+ * Get the output of the model according to given feature instance.
+ */
+ @Override
+ public FloatVector getOutput(FloatVector instance) {
+ Preconditions.checkArgument((this.layerSizeList.get(0) - 1) * this.recurrentStepSize
+ == instance.getDimension(), String.format(
+ "The dimension of input instance should be %d.",
+ this.layerSizeList.get(0) - 1));
+ // transform the features to another space
+ FloatVector transformedInstance = this.featureTransformer
+ .transform(instance);
+ // add bias feature
+ FloatVector instanceWithBias = new DenseFloatVector(
+ transformedInstance.getDimension() + 1);
+ instanceWithBias.set(0, 0.99999f); // set bias to be a little bit less than
+ // 1.0
+ for (int i = 1; i < instanceWithBias.getDimension(); ++i) {
+ instanceWithBias.set(i, transformedInstance.get(i - 1));
+ }
+ // return the output of the last layer
+ return getOutputInternal(instanceWithBias);
+ }
+
+ public void setDropRateOfInputLayer(float dropRate) {
+ this.dropRate = dropRate;
+ }
+
+ /**
+ * Calculate output internally, the intermediate output of each layer will be
+ * stored.
+ *
+ * @param instanceWithBias The instance contains the features.
+ * @return Cached output of each layer.
+ */
+ public FloatVector getOutputInternal(FloatVector instanceWithBias) {
+ // sets the output of input layer
+ Neuron<?>[] inputLayer;
+ for (int stepIdx = 0; stepIdx < this.weightMatrixLists.size(); stepIdx++) {
+ inputLayer = neuronLists.get(stepIdx).get(0);
+ for (int inputNeuronIdx = 0; inputNeuronIdx < this.layerSizeList.get(0); inputNeuronIdx++) {
+ float m2 = MathUtils.getBinomial(1, dropRate);
+ if(m2 == 0)
+ inputLayer[inputNeuronIdx].setDrop(true);
+ else
+ inputLayer[inputNeuronIdx].setDrop(false);
+ inputLayer[inputNeuronIdx].setOutput(
+ instanceWithBias.get(stepIdx * this.layerSizeList.get(0) + inputNeuronIdx) * m2);
+ }
+ // loop forward as much as recurrent step size
+ this.weightMatrixList = this.weightMatrixLists.get(stepIdx);
+ for (int layerIdx = 0; layerIdx < weightMatrixList.size(); ++layerIdx) {
+ forward(stepIdx, layerIdx);
+ }
+ }
+
+ // output for each recurrent step
+ int singleOutputLength =
+ neuronLists.get(this.recurrentStepSize-1).get(this.finalLayerIdx).length;
+ FloatVector output = new DenseFloatVector(singleOutputLength * this.numOutCells);
+ int outputNeuronIdx = 0;
+ for (int step = this.recurrentStepSize - this.numOutCells;
+ step < this.recurrentStepSize; step++) {
+ neurons = neuronLists.get(step);
+ for (int neuronIdx = 0; neuronIdx < singleOutputLength; neuronIdx++) {
+ output.set(outputNeuronIdx, neurons.get(this.finalLayerIdx)[neuronIdx].getOutput());
+ outputNeuronIdx++;
+ }
+ }
+
+ return output;
+ }
+
+ /**
+ * @param neuronClass
+ * @return a new neuron instance
+ */
+ @SuppressWarnings({ "rawtypes" })
+ public static Neuron newNeuronInstance(Class<? extends Neuron> neuronClass) {
+ return (Neuron) ReflectionUtils.newInstance(neuronClass);
+ }
+
+ public class InputMessageIterable implements
+ Iterable<Synapse<FloatWritable, FloatWritable>> {
+ private int currNeuronID;
+ private int prevNeuronID;
+ private int end;
+ private FloatMatrix weightMat;
+ private Neuron<?>[] layer;
+
+ public InputMessageIterable(int fromLayer, int row) {
+ this.currNeuronID = row;
+ this.prevNeuronID = -1;
+ this.end = weightMatrixList.get(fromLayer).getColumnCount() - 1;
+ this.weightMat = weightMatrixList.get(fromLayer);
+ this.layer = neurons.get(fromLayer);
+ }
+
+ @Override
+ public Iterator<Synapse<FloatWritable, FloatWritable>> iterator() {
+ return new MessageIterator();
+ }
+
+ private class MessageIterator implements
+ Iterator<Synapse<FloatWritable, FloatWritable>> {
+
+ @Override
+ public boolean hasNext() {
+ if (prevNeuronID < end) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private FloatWritable i = new FloatWritable();
+ private FloatWritable w = new FloatWritable();
+ private Synapse<FloatWritable, FloatWritable> msg = new Synapse<FloatWritable, FloatWritable>();
+
+ @Override
+ public Synapse<FloatWritable, FloatWritable> next() {
+ prevNeuronID++;
+ i.set(layer[prevNeuronID].getOutput());
+ w.set(weightMat.get(currNeuronID, prevNeuronID));
+ msg.set(prevNeuronID, i, w);
+ return new Synapse<FloatWritable, FloatWritable>(prevNeuronID, i, w);
+ }
+
+ @Override
+ public void remove() {
+ }
+ }
+ }
+
+ /**
+ * Forward the calculation for one layer.
+ *
+ * @param fromLayerIdx The index of the previous layer.
+ */
+ protected void forward(int stepIdx, int fromLayerIdx) {
+ neurons = this.neuronLists.get(stepIdx);
+ int curLayerIdx = fromLayerIdx + 1;
+ // weight matrix for current layer
+ FloatMatrix weightMatrix = this.weightMatrixList.get(fromLayerIdx);
+ FloatFunction squashingFunction = getSquashingFunction(fromLayerIdx);
+ FloatVector vec = new DenseFloatVector(weightMatrix.getRowCount());
+
+ for (int row = 0; row < weightMatrix.getRowCount(); row++) {
+ Neuron<?> n;
+ if (curLayerIdx == finalLayerIdx)
+ n = neurons.get(curLayerIdx)[row];
+ else
+ n = neurons.get(curLayerIdx)[row + 1];
+
+ try {
+ Iterable msgs = new InputMessageIterable(fromLayerIdx, row);
+ ((RecurrentDropoutNeuron) n).setRecurrentDelta(0);
+ n.setIterationNumber(iterations);
+ n.forward(msgs);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ vec.set(row, n.getOutput());
+ }
+
+ if (squashingFunction.getFunctionName().equalsIgnoreCase(
+ SoftMax.class.getSimpleName())) {
+ IntermediateOutput interlayer = (IntermediateOutput) ReflectionUtils
+ .newInstance(SoftMax.SoftMaxOutputComputer.class);
+ try {
+ vec = interlayer.interlayer(vec);
+
+ for (int i = 0; i < vec.getDimension(); i++) {
+ neurons.get(curLayerIdx)[i].setOutput(vec.get(i));
+ }
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ // add bias
+ if (curLayerIdx != finalLayerIdx)
+ neurons.get(curLayerIdx)[0].setOutput(1);
+
+ // copy output to next recurrent layer
+ if (this.recurrentLayerList.get(curLayerIdx) && stepIdx < this.recurrentStepSize - 1) {
+ for (int i = 0; i < vec.getDimension(); i++) {
+ this.neuronLists.get(stepIdx+1).get(
+ fromLayerIdx)[this.layerSizeList.get(fromLayerIdx)+i].setOutput(vec.get(i));
+ }
+ }
+ }
+
+ /**
+ * Train the model online.
+ *
+ * @param trainingInstance
+ */
+ public void trainOnline(FloatVector trainingInstance) {
+ FloatMatrix[] updateMatrices = this.trainByInstance(trainingInstance);
+ this.updateWeightMatrices(updateMatrices);
+ }
+
+ @Override
+ public FloatMatrix[] trainByInstance(FloatVector trainingInstance) {
+ int inputDimension = (this.layerSizeList.get(0) - 1) * this.recurrentStepSize;
+ FloatVector transformedVector = this.featureTransformer.transform(
+ trainingInstance.sliceUnsafe(inputDimension));
+ int outputDimension;
+ FloatVector inputInstance = null;
+ FloatVector labels = null;
+ if (this.learningStyle == LearningStyle.SUPERVISED) {
+ outputDimension = this.layerSizeList.get(this.layerSizeList.size() - 1);
+ // validate training instance
+ Preconditions.checkArgument(
+ (inputDimension + outputDimension == trainingInstance.getDimension()
+ || inputDimension + outputDimension * recurrentStepSize == trainingInstance.getDimension()),
+ String
+ .format(
+ "The dimension of training instance is %d, but requires %d.",
+ trainingInstance.getDimension(), inputDimension + outputDimension));
+
+ inputInstance = new DenseFloatVector(this.layerSizeList.get(0) * this.recurrentStepSize);
+ // get the features from the transformed vector
+ int vecIdx = 0;
+ for (int i = 0; i < inputInstance.getLength(); ++i) {
+ if (i % this.layerSizeList.get(0) == 0) {
+ inputInstance.set(i, 1); // add bias
+ } else {
+ inputInstance.set(i, transformedVector.get(vecIdx));
+ vecIdx++;
+ }
+ }
+ // get the labels from the original training instance
+ labels = trainingInstance.sliceUnsafe(transformedVector.getDimension(),
+ trainingInstance.getDimension() - 1);
+ } else if (this.learningStyle == LearningStyle.UNSUPERVISED) {
+ // labels are identical to input features
+ outputDimension = inputDimension;
+ // validate training instance
+ Preconditions.checkArgument(inputDimension == trainingInstance
+ .getDimension(), String.format(
+ "The dimension of training instance is %d, but requires %d.",
+ trainingInstance.getDimension(), inputDimension));
+
+ inputInstance = new DenseFloatVector(this.layerSizeList.get(0) * this.recurrentStepSize);
+ // get the features from the transformed vector
+ int vecIdx = 0;
+ for (int i = 0; i < inputInstance.getLength(); ++i) {
+ if (i % this.layerSizeList.get(0) == 0) {
+ inputInstance.set(i, 1); // add bias
+ } else {
+ inputInstance.set(i, transformedVector.get(vecIdx));
+ vecIdx++;
+ }
+ }
+ // get the labels by copying the transformed vector
+ labels = transformedVector.deepCopy();
+ }
+ FloatVector output = this.getOutputInternal(inputInstance);
+ // get the training error
+ calculateTrainingError(labels, output);
+ if (this.trainingMethod.equals(TrainingMethod.GRADIENT_DESCENT)) {
+ FloatMatrix[] updates = this.trainByInstanceGradientDescent(labels);
+ return updates;
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Training method is not supported."));
+ }
+ }
+
+ /**
+ * Train by gradient descent. Get the updated weights using one training
+ * instance.
+ *
+ * @param trainingInstance
+ * @return The weight update matrices.
+ */
+ private FloatMatrix[] trainByInstanceGradientDescent(FloatVector labels) {
+
+ // initialize weight update matrices
+ DenseFloatMatrix[] weightUpdateMatrices = new DenseFloatMatrix[this.getSizeOfWeightmatrix()];
+ int matrixIdx = 0;
+ for (List<FloatMatrix> aWeightMatrixList: this.weightMatrixLists) {
+ for (FloatMatrix aWeightMatrix : aWeightMatrixList) {
+ weightUpdateMatrices[matrixIdx++] =
+ new DenseFloatMatrix(aWeightMatrix.getRowCount(), aWeightMatrix.getColumnCount());
+ }
+ }
+ FloatVector deltaVec = new DenseFloatVector(
+ this.layerSizeList.get(layerSizeList.size() - 1) * this.numOutCells);
+
+ FloatFunction squashingFunction = this.squashingFunctionList
+ .get(this.squashingFunctionList.size() - 1);
+
+ int labelIdx = 0;
+ // start from last recurrent step to first recurrent step
+ for (int step=this.recurrentStepSize-this.numOutCells; step < this.recurrentStepSize; step++) {
+
+ FloatMatrix lastWeightMatrix = this.weightMatrixLists.get(step)
+ .get(this.weightMatrixLists.get(step).size() - 1);
+ int neuronIdx = 0;
+ for (Neuron<?> aNeurons : this.neuronLists.get(step).get(this.finalLayerIdx)) {
+ float finalOut = aNeurons.getOutput();
+ float costFuncDerivative = this.costFunction.applyDerivative(
+ labels.get(labelIdx), finalOut);
+ // add regularization
+ costFuncDerivative += this.regularizationWeight
+ * lastWeightMatrix.getRowVector(neuronIdx).sum();
+
+ if (!squashingFunction.getFunctionName().equalsIgnoreCase(
+ SoftMax.class.getSimpleName())) {
+ costFuncDerivative *= squashingFunction.applyDerivative(finalOut);
+ }
+ aNeurons.backpropagate(costFuncDerivative);
+ deltaVec.set(labelIdx, costFuncDerivative);
+ neuronIdx++;
+ labelIdx++;
+ }
+ }
+
+ // start from last recurrent step to first recurrent step
+ boolean skipLastLayer = false;
+ int weightMatrixIdx = weightUpdateMatrices.length - 1;
+ for (int step = this.recurrentStepSize - 1 ; step >= 0; --step) {
+ this.weightMatrixList = this.weightMatrixLists.get(step);
+ this.prevWeightUpdatesList = this.prevWeightUpdatesLists.get(step);
+ this.neurons = this.neuronLists.get(step);
+ if (step < this.recurrentStepSize - this.numOutCells)
+ skipLastLayer = true;
+ // start from previous layer of output layer
+ for (int layer = this.layerSizeList.size() - 2; layer >= 0; --layer) {
+ if (skipLastLayer) {
+ skipLastLayer = false; continue;
+ }
+ backpropagate(step, layer, weightUpdateMatrices[weightMatrixIdx--]);
+ }
+ }
+ // TODO eliminate non-output cells from weightUpdateLists
+ this.setPrevWeightMatrices(weightUpdateMatrices);
+ return weightUpdateMatrices;
+ }
+
+ public class ErrorMessageIterable implements
+ Iterable<Synapse<FloatWritable, FloatWritable>> {
+ private int row;
+ private int neuronID;
+ private int end;
+ private FloatMatrix weightMat;
+ private FloatMatrix prevWeightMat;
+
+ private float[] nextLayerDelta;
+
+ public ErrorMessageIterable(int recurrentStepIdx, int curLayerIdx, int row) {
+ this.row = row;
+ this.neuronID = -1;
+ this.weightMat = weightMatrixLists.get(recurrentStepIdx).get(curLayerIdx);
+ this.end = weightMat.getRowCount() - 1;
+ this.prevWeightMat = prevWeightUpdatesLists.get(recurrentStepIdx).get(curLayerIdx);
+
+ Neuron<?>[] nextLayer = neuronLists.get(recurrentStepIdx).get(curLayerIdx + 1);
+ nextLayerDelta = new float[weightMat.getRowCount()];
+
+ for(int i = 0; i <= end; ++i) {
+ if (curLayerIdx + 1 == finalLayerIdx) {
+ nextLayerDelta[i] = nextLayer[i].getDelta();
+ } else {
+ nextLayerDelta[i] = nextLayer[i + 1].getDelta();
+ }
+ }
+ }
+
+ @Override
+ public Iterator<Synapse<FloatWritable, FloatWritable>> iterator() {
+ return new MessageIterator();
+ }
+
+ private class MessageIterator implements
+ Iterator<Synapse<FloatWritable, FloatWritable>> {
+
+ @Override
+ public boolean hasNext() {
+ if (neuronID < end) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private FloatWritable d = new FloatWritable();
+ private FloatWritable w = new FloatWritable();
+ private FloatWritable p = new FloatWritable();
+ private Synapse<FloatWritable, FloatWritable> msg = new Synapse<FloatWritable, FloatWritable>();
+
+ @Override
+ public Synapse<FloatWritable, FloatWritable> next() {
+ neuronID++;
+
+ d.set(nextLayerDelta[neuronID]);
+ w.set(weightMat.get(neuronID, row));
+ p.set(prevWeightMat.get(neuronID, row));
+ msg.set(neuronID, d, w, p);
+ return msg;
+ }
+
+ @Override
+ public void remove() {
+ }
+
+ }
+ }
+
+ /**
+ * Back-propagate the errors to from next layer to current layer. The weight
+ * updated information will be stored in the weightUpdateMatrices, and the
+ * delta of the prevLayer would be returned.
+ *
+ * @param layer Index of current layer.
+ */
+ private void backpropagate(int recurrentStepIdx, int curLayerIdx,
+ DenseFloatMatrix weightUpdateMatrix) {
+
+ // get layer related information
+ int x = this.weightMatrixList.get(curLayerIdx).getColumnCount();
+ int y = this.weightMatrixList.get(curLayerIdx).getRowCount();
+
+ FloatVector deltaVector = new DenseFloatVector(x);
+ Neuron<?>[] ns = this.neuronLists.get(recurrentStepIdx).get(curLayerIdx);
+
+ for (int row = 0; row < x; ++row) {
+ Neuron<?> n = ns[row];
+ n.setWeightVector(y);
+
+ try {
+ Iterable msgs = new ErrorMessageIterable(recurrentStepIdx, curLayerIdx, row);
+ n.backward(msgs);
+ if (row >= layerSizeList.get(curLayerIdx) && recurrentStepIdx > 0
+ && recurrentLayerList.get(curLayerIdx+1)) {
+ Neuron<?> recurrentNeuron = neuronLists.get(recurrentStepIdx-1).get(curLayerIdx+1)
+ [row-layerSizeList.get(curLayerIdx)+1];
+ recurrentNeuron.backpropagate(n.getDelta());
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ // update weights
+ weightUpdateMatrix.setColumn(row, n.getWeights());
+ deltaVector.set(row, n.getDelta());
+ }
+ }
+
+ @Override
+ protected BSPJob trainInternal(HamaConfiguration conf) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ this.conf = conf;
+ this.fs = FileSystem.get(conf);
+
+ String modelPath = conf.get("model.path");
+ if (modelPath != null) {
+ this.modelPath = modelPath;
+ }
+ // modelPath must be set before training
+ if (this.modelPath == null) {
+ throw new IllegalArgumentException(
+ "Please specify the modelPath for model, "
+ + "either through setModelPath() or add 'modelPath' to the training parameters.");
+ }
+ this.setRecurrentStepSize(conf.getInt("training.recurrent.step.size", 1));
+ this.initializeWeightMatrixLists();
+ this.writeModelToFile();
+
+ // create job
+ BSPJob job = new BSPJob(conf, RecurrentLayeredNeuralNetworkTrainer.class);
+ job.setJobName("Neural Network training");
+ job.setJarByClass(RecurrentLayeredNeuralNetworkTrainer.class);
+ job.setBspClass(RecurrentLayeredNeuralNetworkTrainer.class);
+
+ job.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
+
+ job.setBoolean("training.mode", true);
+ job.setInputPath(new Path(conf.get("training.input.path")));
+ job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
+ job.setInputKeyClass(LongWritable.class);
+ job.setInputValueClass(VectorWritable.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class);
+
+ return job;
+ }
+
+ @Override
+ protected void calculateTrainingError(FloatVector labels, FloatVector output) {
+ FloatVector errors = labels.deepCopy().applyToElements(output,
+ this.costFunction);
+ this.trainingError = errors.sum();
+ }
+
+ /**
+ * Get the squashing function of a specified layer.
+ *
+ * @param idx
+ * @return a new vector with the result of the operation.
+ */
+ public FloatFunction getSquashingFunction(int idx) {
+ return this.squashingFunctionList.get(idx);
+ }
+
+ public void setIterationNumber(long iterations) {
+ this.iterations = iterations;
+ }
+
+ public void setRecurrentStepSize(int recurrentStepSize) {
+ this.recurrentStepSize = recurrentStepSize;
+ }
+
+}
diff --git a/src/main/java/org/apache/horn/core/RecurrentLayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/core/RecurrentLayeredNeuralNetworkTrainer.java
new file mode 100644
index 0000000..adf8b8c
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/RecurrentLayeredNeuralNetworkTrainer.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.io.FloatVectorWritable;
+import org.apache.hama.commons.math.DenseFloatMatrix;
+import org.apache.hama.commons.math.FloatMatrix;
+import org.apache.hama.commons.math.FloatVector;
+
+/**
+ * The trainer that train the {@link RecurrentLayeredNeuralNetwork} based on BSP
+ * framework.
+ *
+ */
+public final class RecurrentLayeredNeuralNetworkTrainer
+ extends
+ BSP<LongWritable, FloatVectorWritable, NullWritable, NullWritable, ParameterMessage> {
+
+ private static final Log LOG = LogFactory
+ .getLog(RecurrentLayeredNeuralNetworkTrainer.class);
+
+ private RecurrentLayeredNeuralNetwork inMemoryModel;
+ private HamaConfiguration conf;
+ /* Default batch size */
+ private int batchSize;
+
+ /* check the interval between intervals */
+ private double prevAvgTrainingError;
+ private double curAvgTrainingError;
+ private long convergenceCheckInterval;
+ private long iterations;
+ private long maxIterations;
+ private boolean isConverge;
+
+ private String modelPath;
+
+ @Override
+ /**
+ * If the model path is specified, load the existing from storage location.
+ */
+ public void setup(
+ BSPPeer<LongWritable, FloatVectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
+ if (peer.getPeerIndex() == 0) {
+ LOG.info("Begin to train");
+ }
+ this.isConverge = false;
+ this.conf = peer.getConfiguration();
+ this.iterations = 0;
+ this.modelPath = conf.get("model.path");
+ this.maxIterations = conf.getLong("training.max.iterations", Long.MAX_VALUE);
+ this.convergenceCheckInterval = conf.getLong("convergence.check.interval",
+ 100);
+ this.inMemoryModel = new RecurrentLayeredNeuralNetwork(conf, modelPath, true);
+ this.prevAvgTrainingError = Integer.MAX_VALUE;
+ this.batchSize = conf.getInt("training.batch.size", 5);
+ }
+
+ @Override
+ /**
+ * Write the trained model back to stored location.
+ */
+ public void cleanup(
+ BSPPeer<LongWritable, FloatVectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
+ // write model to modelPath
+ if (peer.getPeerIndex() == peer.getNumPeers() - 1) {
+ try {
+ LOG.info(String.format("End of training, number of iterations: %d.",
+ this.iterations));
+ LOG.info(String.format("Write model back to %s",
+ inMemoryModel.getModelPath()));
+ this.inMemoryModel.writeModelToFile();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private List<FloatVector> trainingSet = new ArrayList<FloatVector>();
+ private Random r = new Random();
+
+ @Override
+ public void bsp(
+ BSPPeer<LongWritable, FloatVectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
+ throws IOException, SyncException, InterruptedException {
+ // load local data into memory
+ LongWritable key = new LongWritable();
+ FloatVectorWritable value = new FloatVectorWritable();
+ while (peer.readNext(key, value)) {
+ FloatVector v = value.getVector();
+ trainingSet.add(v);
+ }
+
+ if (peer.getPeerIndex() != peer.getNumPeers() - 1) {
+ LOG.debug(peer.getPeerName() + ": " + trainingSet.size() + " training instances loaded.");
+ }
+
+ while (this.iterations++ < maxIterations) {
+ this.inMemoryModel.setIterationNumber(iterations);
+
+ // each groom calculate the matrices updates according to local data
+ if (peer.getPeerIndex() != peer.getNumPeers() - 1) {
+ calculateUpdates(peer);
+ } else {
+ // doing summation received updates
+ if (peer.getSuperstepCount() > 0) {
+ // and broadcasts previous updated weights
+ mergeUpdates(peer);
+ }
+ }
+
+ peer.sync();
+
+ if (maxIterations == Long.MAX_VALUE && isConverge) {
+ if (peer.getPeerIndex() == peer.getNumPeers() - 1)
+ peer.sync();
+ break;
+ }
+ }
+
+ peer.sync();
+ if (peer.getPeerIndex() == peer.getNumPeers() - 1)
+ mergeUpdates(peer); // merge last updates
+ }
+
+ private FloatVector getRandomInstance() {
+ return trainingSet.get(r.nextInt(trainingSet.size()));
+ }
+
+ /**
+ * Calculate the matrices updates according to local partition of data.
+ *
+ * @param peer
+ * @throws IOException
+ */
+ private void calculateUpdates(
+ BSPPeer<LongWritable, FloatVectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
+ throws IOException {
+ // receive update information from master
+ if (peer.getNumCurrentMessages() != 0) {
+ ParameterMessage inMessage = peer.getCurrentMessage();
+ FloatMatrix[] newWeights = inMessage.getCurMatrices();
+ FloatMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
+ this.inMemoryModel.setWeightMatrices(newWeights);
+ this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
+ this.isConverge = inMessage.isConverge();
+ // check converge
+ if (isConverge) {
+ return;
+ }
+ }
+
+ FloatMatrix[] weightUpdates = new FloatMatrix[this.inMemoryModel.getSizeOfWeightmatrix()];
+ int matrixIdx = 0;
+ for (List<FloatMatrix> aWeightMatrixList: this.inMemoryModel.weightMatrixLists) {
+ for (FloatMatrix aWeightMatrix : aWeightMatrixList) {
+ weightUpdates[matrixIdx++] = new DenseFloatMatrix(
+ aWeightMatrix.getRowCount(), aWeightMatrix.getColumnCount());
+ }
+ }
+
+ // continue to train
+ float avgTrainingError = 0.0f;
+ for (int recordsRead = 0; recordsRead < batchSize; ++recordsRead) {
+ FloatVector trainingInstance = getRandomInstance();
+ RecurrentLayeredNeuralNetwork.matricesAdd(
+ weightUpdates, this.inMemoryModel.trainByInstance(trainingInstance));
+ avgTrainingError += this.inMemoryModel.trainingError;
+ }
+ avgTrainingError /= batchSize;
+
+ // calculate the average of updates
+ for (int i = 0; i < weightUpdates.length; ++i) {
+ weightUpdates[i] = weightUpdates[i].divide(batchSize);
+ }
+
+ FloatMatrix[] prevWeightUpdates = this.inMemoryModel
+ .getPrevMatricesUpdates();
+ ParameterMessage outMessage = new ParameterMessage(avgTrainingError, false,
+ weightUpdates, prevWeightUpdates);
+
+ peer.send(peer.getPeerName(peer.getNumPeers() - 1), outMessage);
+ }
+
+ /**
+ * Merge the updates according to the updates of the grooms.
+ *
+ * @param peer
+ * @throws IOException
+ */
+ private void mergeUpdates(
+ BSPPeer<LongWritable, FloatVectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
+ throws IOException {
+ int numMessages = peer.getNumCurrentMessages();
+ boolean converge = false;
+ if (numMessages == 0) { // converges
+ converge = true;
+ return;
+ }
+
+ double avgTrainingError = 0;
+ FloatMatrix[] matricesUpdates = null;
+ FloatMatrix[] prevMatricesUpdates = null;
+
+ while (peer.getNumCurrentMessages() > 0) {
+ ParameterMessage message = peer.getCurrentMessage();
+ if (matricesUpdates == null) {
+ matricesUpdates = message.getCurMatrices();
+ prevMatricesUpdates = message.getPrevMatrices();
+ } else {
+ RecurrentLayeredNeuralNetwork.matricesAdd(matricesUpdates,
+ message.getCurMatrices());
+ RecurrentLayeredNeuralNetwork.matricesAdd(prevMatricesUpdates,
+ message.getPrevMatrices());
+ }
+
+ avgTrainingError += message.getTrainingError();
+ }
+
+ if (numMessages > 1) {
+ avgTrainingError /= numMessages;
+ for (int i = 0; i < matricesUpdates.length; ++i) {
+ matricesUpdates[i] = matricesUpdates[i].divide(numMessages);
+ prevMatricesUpdates[i] = prevMatricesUpdates[i].divide(numMessages);
+ }
+ }
+
+ this.inMemoryModel.updateWeightMatrices(matricesUpdates);
+ this.inMemoryModel.setPrevWeightMatrices(prevMatricesUpdates);
+
+ // check convergence
+ if (peer.getSuperstepCount() > 0
+ && iterations % convergenceCheckInterval == 0) {
+ if (prevAvgTrainingError < curAvgTrainingError) {
+ // error cannot decrease any more
+ converge = true;
+ }
+ // update
+ prevAvgTrainingError = curAvgTrainingError;
+ LOG.info("Training error: " + curAvgTrainingError + " at " + (iterations)
+ + " iteration.");
+ curAvgTrainingError = 0;
+ }
+ curAvgTrainingError += avgTrainingError / convergenceCheckInterval;
+ this.isConverge = converge;
+
+ if (iterations < maxIterations) {
+ // broadcast updated weight matrices
+ for (String peerName : peer.getAllPeerNames()) {
+ ParameterMessage msg = new ParameterMessage(0, converge,
+ this.inMemoryModel.getWeightMatrices(),
+ this.inMemoryModel.getPrevMatricesUpdates());
+ if (!peer.getPeerName().equals(peerName))
+ peer.send(peerName, msg);
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/horn/examples/ExampleDriver.java b/src/main/java/org/apache/horn/examples/ExampleDriver.java
index d9f63a3..b4c1964 100644
--- a/src/main/java/org/apache/horn/examples/ExampleDriver.java
+++ b/src/main/java/org/apache/horn/examples/ExampleDriver.java
@@ -18,6 +18,7 @@
package org.apache.horn.examples;
import org.apache.hadoop.util.ProgramDriver;
+import org.apache.horn.utils.ExclusiveOrConverter;
import org.apache.horn.utils.MNISTConverter;
import org.apache.horn.utils.MNISTEvaluator;
@@ -32,12 +33,26 @@
MNISTConverter.class,
"A utility program that converts MNIST training and label datasets "
+ "into HDFS sequence file.");
- pgd.addClass("MNISTEvaluator", MNISTEvaluator.class,
+ pgd.addClass("MNISTEvaluator",
+ MNISTEvaluator.class,
"A utility program that evaluates trained model for the MNIST dataset");
pgd.addClass(
"MultiLayerPerceptron",
MultiLayerPerceptron.class,
"An example program that trains a multilayer perceptron model from HDFS sequence file.");
+ pgd.addClass("ExclusiveOrConverter",
+ ExclusiveOrConverter.class,
+ "A utility program that converts ExclusiveOR training and label datasets ");
+ pgd.addClass(
+ "ExclusiveOrRecurrentMultiLayerPerceptron",
+ ExclusiveOrRecurrentMultiLayerPerceptron.class,
+ "An example program that trains a recurrent multilayer perceptron model with exclusive or"
+ + " from HDFS sequence file.");
+ pgd.addClass(
+ "MnistRecurrentMultiLayerPerceptron",
+ MnistRecurrentMultiLayerPerceptron.class,
+ "An example program that trains a recurrent multilayer perceptron model with MNIST"
+ + " from HDFS sequence file.");
exitCode = pgd.run(args);
} catch (Throwable e) {
e.printStackTrace();
diff --git a/src/main/java/org/apache/horn/examples/ExclusiveOrRecurrentMultiLayerPerceptron.java b/src/main/java/org/apache/horn/examples/ExclusiveOrRecurrentMultiLayerPerceptron.java
new file mode 100644
index 0000000..6e88217
--- /dev/null
+++ b/src/main/java/org/apache/horn/examples/ExclusiveOrRecurrentMultiLayerPerceptron.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.examples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.horn.core.Constants.TrainingMethod;
+import org.apache.horn.core.HornJob;
+import org.apache.horn.core.Neuron;
+import org.apache.horn.core.RecurrentDropoutNeuron;
+import org.apache.horn.core.RecurrentLayeredNeuralNetwork;
+import org.apache.horn.core.Synapse;
+import org.apache.horn.funcs.SoftMax;
+import org.apache.horn.funcs.SquaredError;
+import org.apache.horn.funcs.Tanh;
+
+public class ExclusiveOrRecurrentMultiLayerPerceptron {
+
+ public static class StandardNeuron extends
+ Neuron<Synapse<FloatWritable, FloatWritable>> {
+
+ @Override
+ public void forward(Iterable<Synapse<FloatWritable, FloatWritable>> messages)
+ throws IOException {
+ float sum = 0;
+ for (Synapse<FloatWritable, FloatWritable> m : messages) {
+ sum += m.getInput() * m.getWeight();
+ }
+ this.feedforward(squashingFunction.apply(sum));
+ }
+
+ @Override
+ public void backward(
+ Iterable<Synapse<FloatWritable, FloatWritable>> messages)
+ throws IOException {
+ float delta = 0;
+
+ if (!this.isDropped()) {
+ for (Synapse<FloatWritable, FloatWritable> m : messages) {
+ // Calculates error gradient for each neuron
+ delta += (m.getDelta() * m.getWeight());
+
+ // Weight corrections
+ float weight = -this.getLearningRate() * m.getDelta()
+ * this.getOutput() + this.getMomentumWeight() * m.getPrevWeight();
+ this.push(weight);
+ }
+ }
+
+ this.backpropagate(delta * squashingFunction.applyDerivative(getOutput()));
+ }
+ }
+
+ public static HornJob createJob(HamaConfiguration conf, String modelPath,
+ String inputPath, float learningRate, float momemtumWeight,
+ float regularizationWeight, int features, int hu, int labels,
+ int stepSize, int numOutCells, int miniBatch, int maxIteration)
+ throws IOException, InstantiationException, IllegalAccessException {
+
+ boolean isRecurrent = (stepSize == 1 ? false: true);
+
+ HornJob job = new HornJob(
+ conf, RecurrentLayeredNeuralNetwork.class, ExclusiveOrRecurrentMultiLayerPerceptron.class);
+ job.setTrainingSetPath(inputPath);
+ job.setModelPath(modelPath);
+
+ job.setMaxIteration(maxIteration);
+ job.setLearningRate(learningRate);
+ job.setMomentumWeight(momemtumWeight);
+ job.setRegularizationWeight(regularizationWeight);
+
+ job.setConvergenceCheckInterval(10);
+ job.setBatchSize(miniBatch);
+ job.setRecurrentStepSize(stepSize);
+
+ job.setTrainingMethod(TrainingMethod.GRADIENT_DESCENT);
+
+ job.inputLayer(features, 1.0f); // droprate
+ job.addLayer(hu, Tanh.class, RecurrentDropoutNeuron.class, isRecurrent);
+ job.addLayer(hu, Tanh.class, RecurrentDropoutNeuron.class, isRecurrent);
+ job.outputLayer(labels, SoftMax.class, RecurrentDropoutNeuron.class, numOutCells);
+
+ job.setCostFunction(SquaredError.class);
+
+ return job;
+ }
+
+ public static void main(String[] args) throws IOException,
+ InterruptedException, ClassNotFoundException,
+ NumberFormatException, InstantiationException, IllegalAccessException {
+ if (args.length < 9) {
+ System.out.println("Usage: <MODEL_PATH> <INPUT_PATH> "
+ + "<LEARNING_RATE> <MOMEMTUM_WEIGHT> <REGULARIZATION_WEIGHT> "
+ + "<FEATURE_DIMENSION> <HIDDEN_UNITS> <LABEL_DIMENSION> "
+ + "<STEP_SIZE> <NUM_OUTPUTCELLS> <BATCH_SIZE> <MAX_ITERATION>");
+ System.out.println("E.g. MnistRecurrentMultiLayerPerceptron"
+ + " ./model_semi_rnn semi.seq 0.01 0.9 0.0005 1 2 2 2 1 10 10000");
+ System.exit(-1);
+ }
+
+ HornJob rnn = createJob(new HamaConfiguration(), args[0], args[1],
+ Float.parseFloat(args[2]), Float.parseFloat(args[3]),
+ Float.parseFloat(args[4]), Integer.parseInt(args[5]),
+ Integer.parseInt(args[6]), Integer.parseInt(args[7]),
+ Integer.parseInt(args[8]), Integer.parseInt(args[9]),
+ Integer.parseInt(args[10]), Integer.parseInt(args[11]));
+
+ long startTime = System.currentTimeMillis();
+ if (rnn.waitForCompletion(true)) {
+ System.out.println("Optimization Finished! "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+}
diff --git a/src/main/java/org/apache/horn/examples/MnistRecurrentMultiLayerPerceptron.java b/src/main/java/org/apache/horn/examples/MnistRecurrentMultiLayerPerceptron.java
new file mode 100644
index 0000000..81e17c5
--- /dev/null
+++ b/src/main/java/org/apache/horn/examples/MnistRecurrentMultiLayerPerceptron.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.examples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.horn.core.Constants.TrainingMethod;
+import org.apache.horn.core.HornJob;
+import org.apache.horn.core.Neuron;
+import org.apache.horn.core.RecurrentDropoutNeuron;
+import org.apache.horn.core.RecurrentLayeredNeuralNetwork;
+import org.apache.horn.core.Synapse;
+import org.apache.horn.funcs.CrossEntropy;
+import org.apache.horn.funcs.ReLU;
+import org.apache.horn.funcs.SoftMax;
+import org.apache.horn.funcs.Tanh;
+
+public class MnistRecurrentMultiLayerPerceptron {
+
+ public static class StandardNeuron extends
+ Neuron<Synapse<FloatWritable, FloatWritable>> {
+
+ @Override
+ public void forward(Iterable<Synapse<FloatWritable, FloatWritable>> messages)
+ throws IOException {
+ float sum = 0;
+ for (Synapse<FloatWritable, FloatWritable> m : messages) {
+ sum += m.getInput() * m.getWeight();
+ }
+ this.feedforward(squashingFunction.apply(sum));
+ }
+
+ @Override
+ public void backward(
+ Iterable<Synapse<FloatWritable, FloatWritable>> messages)
+ throws IOException {
+ float delta = 0;
+
+ if (!this.isDropped()) {
+ for (Synapse<FloatWritable, FloatWritable> m : messages) {
+ // Calculates error gradient for each neuron
+ delta += (m.getDelta() * m.getWeight());
+
+ // Weight corrections
+ float weight = -this.getLearningRate() * m.getDelta()
+ * this.getOutput() + this.getMomentumWeight() * m.getPrevWeight();
+ this.push(weight);
+ }
+ }
+ this.backpropagate(delta * squashingFunction.applyDerivative(getOutput()));
+ }
+ }
+
+ public static HornJob createJob(HamaConfiguration conf, String modelPath,
+ String inputPath, float learningRate, float momemtumWeight,
+ float regularizationWeight, int features, int hu, int labels,
+ int stepSize, int numOutCells, int miniBatch, int maxIteration)
+ throws IOException, InstantiationException, IllegalAccessException {
+
+ boolean isRecurrent = (stepSize == 1 ? false: true);
+
+ HornJob job = new HornJob(
+ conf, RecurrentLayeredNeuralNetwork.class, MnistRecurrentMultiLayerPerceptron.class);
+ job.setTrainingSetPath(inputPath);
+ job.setModelPath(modelPath);
+
+ job.setMaxIteration(maxIteration);
+ job.setLearningRate(learningRate);
+ job.setMomentumWeight(momemtumWeight);
+ job.setRegularizationWeight(regularizationWeight);
+
+ job.setConvergenceCheckInterval(10);
+ job.setBatchSize(miniBatch);
+ job.setRecurrentStepSize(stepSize);
+
+ job.setTrainingMethod(TrainingMethod.GRADIENT_DESCENT);
+
+ job.inputLayer(features, 1.0f); // droprate
+ job.addLayer(hu, Tanh.class, RecurrentDropoutNeuron.class, isRecurrent);
+ job.outputLayer(labels, SoftMax.class, RecurrentDropoutNeuron.class, numOutCells);
+
+ job.setCostFunction(CrossEntropy.class);
+
+ return job;
+ }
+
+ public static void main(String[] args) throws IOException,
+ InterruptedException, ClassNotFoundException,
+ NumberFormatException, InstantiationException, IllegalAccessException {
+
+ if (args.length < 9) {
+ System.out.println("Usage: <MODEL_PATH> <INPUT_PATH> "
+ + "<LEARNING_RATE> <MOMEMTUM_WEIGHT> <REGULARIZATION_WEIGHT> "
+ + "<FEATURE_DIMENSION> <HIDDEN_UNITS> <LABEL_DIMENSION> "
+ + "<STEP_SIZE> <NUM_OUTPUTCELLS> <BATCH_SIZE> <MAX_ITERATION>");
+ System.out.println("E.g. MnistRecurrentMultiLayerPerceptron"
+ + " ./model_rnn mnist.seq 0.01 0.9 0.0005 196 200 10 4 1 10 12000");
+ System.exit(-1);
+ }
+
+ HornJob rnn = createJob(new HamaConfiguration(), args[0], args[1],
+ Float.parseFloat(args[2]), Float.parseFloat(args[3]),
+ Float.parseFloat(args[4]), Integer.parseInt(args[5]),
+ Integer.parseInt(args[6]), Integer.parseInt(args[7]),
+ Integer.parseInt(args[8]), Integer.parseInt(args[9]),
+ Integer.parseInt(args[10]), Integer.parseInt(args[11]));
+
+ long startTime = System.currentTimeMillis();
+ if (rnn.waitForCompletion(true)) {
+ System.out.println("Optimization Finished! "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+}
diff --git a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
index 2c45673..e7f2683 100644
--- a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
+++ b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.io.FloatWritable;
import org.apache.hama.HamaConfiguration;
import org.apache.horn.core.Constants.TrainingMethod;
+import org.apache.horn.core.DropoutNeuron;
import org.apache.horn.core.HornJob;
import org.apache.horn.core.LayeredNeuralNetwork;
import org.apache.horn.core.Neuron;
diff --git a/src/main/java/org/apache/horn/utils/ExclusiveOrConverter.java b/src/main/java/org/apache/horn/utils/ExclusiveOrConverter.java
new file mode 100644
index 0000000..9305875
--- /dev/null
+++ b/src/main/java/org/apache/horn/utils/ExclusiveOrConverter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.utils;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.io.FloatVectorWritable;
+import org.apache.hama.commons.math.DenseFloatVector;
+
+public class ExclusiveOrConverter {
+
+ private static int STEPS = 2;
+ private static int LABELS = 2;
+
+ public static void main(String[] args) throws Exception {
+
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.set("dfs.block.size", "11554432");
+ FileSystem fs = FileSystem.get(conf);
+ String output = "semi.seq";
+
+ @SuppressWarnings("deprecation")
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(
+ output), LongWritable.class, FloatVectorWritable.class);
+
+ float[][] vals = {{0,0,1,0},{0,1,0,1},{1,0,0,1},{1,1,1,0}};
+
+ for (int i = 0; i < vals.length; i++) {
+ writer.append(new LongWritable(), new FloatVectorWritable(
+ new DenseFloatVector(vals[i])));
+ }
+ writer.close();
+ }
+
+}