blob: 582c6a71661b50764fd11c09c00454bbd75a5545 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.pipes;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigDecimal;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.FileInputFormat;
import org.apache.hama.bsp.FileOutputFormat;
import org.apache.hama.bsp.KeyValueTextInputFormat;
import org.apache.hama.bsp.NullInputFormat;
import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.commons.io.PipesKeyValueWritable;
import org.apache.hama.commons.io.PipesVectorWritable;
import org.apache.hama.commons.math.DenseDoubleVector;
import org.apache.hama.commons.math.DoubleVector;
/**
* Test case for {@link PipesBSP}
*
*/
public class TestPipes extends HamaCluster {
private static final Log LOG = LogFactory.getLog(TestPipes.class);
public static final String EXAMPLES_INSTALL_PROPERTY = "hama.pipes.examples.install";
public static final String EXAMPLE_SUMMATION_EXEC = "/examples/summation";
public static final String EXAMPLE_PIESTIMATOR_EXEC = "/examples/piestimator";
public static final String EXAMPLE_MATRIXMULTIPLICATION_EXEC = "/examples/matrixmultiplication";
public static final String EXAMPLE_TMP_OUTPUT = "/tmp/test-example/";
public static final String HAMA_TMP_OUTPUT = "/tmp/hama-pipes/";
public static final String HAMA_TMP_DISK_QUEUE_OUTPUT = "/tmp/messageQueue";
public static final int DOUBLE_PRECISION = 6;
private HamaConfiguration configuration;
private static FileSystem fs = null;
private String examplesInstallPath;
public TestPipes() {
configuration = new HamaConfiguration();
try {
// Cleanup temp Hama locations
fs = FileSystem.get(configuration);
cleanup(fs, new Path(HAMA_TMP_OUTPUT));
cleanup(fs, new Path(HAMA_TMP_DISK_QUEUE_OUTPUT));
// Remove local temp folder
cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
} catch (IOException e) {
e.printStackTrace();
}
configuration.set("bsp.master.address", "localhost");
configuration.set("hama.child.redirect.log.console", "true");
assertEquals("Make sure master addr is set to localhost:", "localhost",
configuration.get("bsp.master.address"));
configuration.set("bsp.local.dir", HAMA_TMP_OUTPUT);
configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
configuration.set("hama.sync.client.class",
org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
.getCanonicalName());
}
@Override
public void setUp() throws Exception {
super.setUp();
}
@Override
public void tearDown() throws Exception {
super.tearDown();
}
public void testPipes() throws Exception {
assertNotNull("System property " + EXAMPLES_INSTALL_PROPERTY
+ " is not defined!", System.getProperty(EXAMPLES_INSTALL_PROPERTY));
if (System.getProperty(EXAMPLES_INSTALL_PROPERTY).isEmpty()) {
LOG.error("System property " + EXAMPLES_INSTALL_PROPERTY
+ " is empty! Skipping TestPipes!");
return;
}
this.examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY);
// *** Summation Test ***
summation();
// *** PiEstimator Test ***
piestimation();
// *** MatrixMultiplication Test ***
matrixMult();
// Remove local temp folder
cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
}
private void summation() throws Exception {
// Setup Paths
Path summationExec = new Path(this.examplesInstallPath
+ EXAMPLE_SUMMATION_EXEC);
Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/in");
Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/out");
// Generate Summation input
BigDecimal sum = writeSummationInputFile(fs, inputPath);
// Run Summation example
runProgram(getSummationJob(configuration), summationExec, inputPath,
outputPath, 1, this.numOfGroom);
// Verify output
verifyOutput(configuration, outputPath, sum.doubleValue(),
Math.pow(10, (DOUBLE_PRECISION * -1)));
// Clean input and output folder
cleanup(fs, inputPath);
cleanup(fs, outputPath);
}
private void piestimation() throws Exception {
// Setup Paths
Path piestimatorExec = new Path(this.examplesInstallPath
+ EXAMPLE_PIESTIMATOR_EXEC);
Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "piestimator/in");
Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "piestimator/out");
// Run PiEstimator example
runProgram(getPiestimatorJob(configuration), piestimatorExec, inputPath,
outputPath, 3, this.numOfGroom);
// Verify output
verifyOutput(configuration, outputPath, Math.PI, Math.pow(10, (2 * -1)));
// Clean input and output folder
cleanup(fs, inputPath);
cleanup(fs, outputPath);
}
private void matrixMult() throws Exception {
// Setup Paths
Path matrixmultiplicationExec = new Path(this.examplesInstallPath
+ EXAMPLE_MATRIXMULTIPLICATION_EXEC);
Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "matmult/in");
Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "matmult/out");
// Generate matrix dimensions
Random rand = new Random();
// (0-19) + 11 -> between 11-30
int rows = rand.nextInt(20) + 11;
int cols = rand.nextInt(20) + 11;
// Generate MatrixMultiplication input
double[][] matrixA = createRandomMatrix(rows, cols, rand);
double[][] matrixB = createRandomMatrix(cols, rows, rand);
Path matrixAPath = writeMatrix(configuration, matrixA, new Path(inputPath,
"matrixA.seq"), false);
Path transposedMatrixBPath = writeMatrix(configuration, matrixB, new Path(
inputPath, "transposedMatrixB.seq"), true);
// Run MatrixMultiplication example
runProgram(
getMatrixMultiplicationJob(configuration, transposedMatrixBPath),
matrixmultiplicationExec, matrixAPath, outputPath, 2, this.numOfGroom);
// Verify output
double[][] matrixC = multiplyMatrix(matrixA, matrixB);
verifyMatrixMultiplicationOutput(configuration, outputPath, matrixC);
cleanup(fs, inputPath);
cleanup(fs, outputPath);
}
static BSPJob getSummationJob(HamaConfiguration conf) throws IOException {
BSPJob bsp = new BSPJob(conf);
bsp.setInputFormat(KeyValueTextInputFormat.class);
bsp.setInputKeyClass(Text.class);
bsp.setInputValueClass(Text.class);
bsp.setOutputFormat(SequenceFileOutputFormat.class);
bsp.setOutputKeyClass(NullWritable.class);
bsp.setOutputValueClass(DoubleWritable.class);
bsp.setMessageClass(DoubleWritable.class);
return bsp;
}
static BSPJob getPiestimatorJob(HamaConfiguration conf) throws IOException {
BSPJob bsp = new BSPJob(conf);
bsp.setInputFormat(NullInputFormat.class);
bsp.setOutputFormat(SequenceFileOutputFormat.class);
bsp.setOutputKeyClass(NullWritable.class);
bsp.setOutputValueClass(DoubleWritable.class);
bsp.setMessageClass(IntWritable.class);
return bsp;
}
static BSPJob getMatrixMultiplicationJob(HamaConfiguration conf,
Path transposedMatrixB) throws IOException {
BSPJob bsp = new BSPJob(conf);
bsp.setInputFormat(SequenceFileInputFormat.class);
bsp.setInputKeyClass(IntWritable.class);
bsp.setInputValueClass(PipesVectorWritable.class);
bsp.setOutputFormat(SequenceFileOutputFormat.class);
bsp.setOutputKeyClass(IntWritable.class);
bsp.setOutputValueClass(PipesVectorWritable.class);
bsp.setMessageClass(PipesKeyValueWritable.class);
bsp.set(Constants.RUNTIME_PARTITIONING_DIR, HAMA_TMP_OUTPUT + "/parts");
bsp.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
bsp.setPartitioner(PipesPartitioner.class);
// sort sent messages
bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
"org.apache.hama.bsp.message.queue.SortedMemoryQueue");
bsp.set("hama.mat.mult.B.path", transposedMatrixB.toString());
return bsp;
}
static BigDecimal writeSummationInputFile(FileSystem fs, Path dir)
throws IOException {
DataOutputStream out = fs.create(new Path(dir, "part0"));
Random rand = new Random();
double rangeMin = 0;
double rangeMax = 100;
BigDecimal sum = new BigDecimal(0);
// loop between 50 and 149 times
for (int i = 0; i < rand.nextInt(100) + 50; i++) {
// generate key value pair inputs
double randomValue = rangeMin + (rangeMax - rangeMin) * rand.nextDouble();
String truncatedValue = new BigDecimal(randomValue).setScale(
DOUBLE_PRECISION, BigDecimal.ROUND_DOWN).toString();
String line = "key" + (i + 1) + "\t" + truncatedValue + "\n";
out.writeBytes(line);
sum = sum.add(new BigDecimal(truncatedValue));
}
out.close();
return sum;
}
static double[][] createRandomMatrix(int rows, int columns, Random rand) {
final double[][] matrix = new double[rows][columns];
double rangeMin = 0;
double rangeMax = 100;
for (int i = 0; i < rows; i++) {
for (int j = 0; j < columns; j++) {
double randomValue = rangeMin + (rangeMax - rangeMin)
* rand.nextDouble();
matrix[i][j] = new BigDecimal(randomValue).setScale(DOUBLE_PRECISION,
BigDecimal.ROUND_DOWN).doubleValue();
}
}
return matrix;
}
static Path writeMatrix(Configuration conf, double[][] matrix, Path path,
boolean saveTransposed) {
// Write matrix to DFS
SequenceFile.Writer writer = null;
try {
writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class,
PipesVectorWritable.class);
// Transpose Matrix before saving
if (saveTransposed) {
int rows = matrix.length;
int columns = matrix[0].length;
double[][] transposed = new double[columns][rows];
for (int i = 0; i < rows; i++) {
for (int j = 0; j < columns; j++) {
transposed[j][i] = matrix[i][j];
}
}
matrix = transposed;
}
LOG.info("writeRandomDistributedRowMatrix path: " + path
+ " saveTransposed: " + saveTransposed);
for (int i = 0; i < matrix.length; i++) {
DenseDoubleVector rowVector = new DenseDoubleVector(matrix[i]);
writer.append(new IntWritable(i), new PipesVectorWritable(rowVector));
LOG.info("IntWritable: " + i + " PipesVectorWritable: "
+ rowVector.toString());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return path;
}
static double[][] multiplyMatrix(double[][] matrixA, double[][] matrixB) {
final double[][] matrixC = new double[matrixA.length][matrixB[0].length];
int m = matrixA.length;
int n = matrixA[0].length;
int p = matrixB[0].length;
for (int k = 0; k < n; k++) {
for (int i = 0; i < m; i++) {
for (int j = 0; j < p; j++) {
matrixC[i][j] = matrixC[i][j] + matrixA[i][k] * matrixB[k][j];
}
}
}
return matrixC;
}
static void verifyOutput(HamaConfiguration conf, Path outputPath,
String[] expectedResults) throws IOException {
FileStatus[] listStatus = fs.listStatus(outputPath);
for (FileStatus status : listStatus) {
if (!status.isDir()) {
if (status.getLen() > 0) {
LOG.info("Output File: " + status.getPath());
BufferedReader br = new BufferedReader(new InputStreamReader(
fs.open(status.getPath())));
try {
String line = "";
int i = 0;
while ((line = br.readLine()) != null) {
LOG.info("output[" + i + "]: '" + line + "'");
LOG.info("expected[" + i + "]: '" + expectedResults[i] + "'");
assertEquals("'" + expectedResults[i] + "' != '" + line + "'",
expectedResults[i], line);
i++;
}
} finally {
br.close();
}
}
}
}
}
static void verifyOutput(HamaConfiguration conf, Path outputPath,
double expectedResult, double delta) throws IOException {
FileStatus[] listStatus = fs.listStatus(outputPath);
for (FileStatus status : listStatus) {
if (!status.isDir()) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
status.getPath(), conf);
NullWritable key = NullWritable.get();
DoubleWritable value = new DoubleWritable();
if (reader.next(key, value)) {
LOG.info("Output File: " + status.getPath());
LOG.info("key: '" + key + "' value: '" + value + "' expected: '"
+ expectedResult + "'");
assertEquals("Expected value: '" + expectedResult + "' != '" + value
+ "'", expectedResult, value.get(), delta);
}
reader.close();
}
}
}
static void verifyMatrixMultiplicationOutput(HamaConfiguration conf,
Path outputPath, double[][] matrix) throws IOException {
FileStatus[] listStatus = fs.listStatus(outputPath);
for (FileStatus status : listStatus) {
if (!status.isDir()) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
status.getPath(), conf);
IntWritable key = new IntWritable();
PipesVectorWritable value = new PipesVectorWritable();
int rowIdx = 0;
while (reader.next(key, value)) {
assertEquals("Expected rowIdx: '" + rowIdx + "' != '" + key.get()
+ "'", rowIdx, key.get());
DoubleVector rowVector = value.getVector();
for (int colIdx = 0; colIdx < rowVector.getLength(); colIdx++) {
double colValue = rowVector.get(colIdx);
assertEquals("Expected colValue: '" + matrix[rowIdx][colIdx]
+ "' != '" + colValue + "' in row: " + rowIdx + " values: "
+ rowVector.toString(), matrix[rowIdx][colIdx], colValue,
Math.pow(10, (DOUBLE_PRECISION * -1)));
}
rowIdx++;
}
reader.close();
}
}
}
static void cleanup(FileSystem fs, Path p) throws IOException {
fs.delete(p, true);
assertFalse(p.getName() + " not cleaned up", fs.exists(p));
}
static void runProgram(BSPJob bsp, Path program, Path inputPath,
Path outputPath, int numBspTasks, int numOfGroom) throws IOException,
ClassNotFoundException, InterruptedException {
HamaConfiguration conf = (HamaConfiguration) bsp.getConfiguration();
bsp.setJobName("Test Hama Pipes " + program.getName());
bsp.setBspClass(PipesBSP.class);
FileInputFormat.setInputPaths(bsp, inputPath);
FileOutputFormat.setOutputPath(bsp, outputPath);
BSPJobClient jobClient = new BSPJobClient(conf);
// Set bspTaskNum
ClusterStatus cluster = jobClient.getClusterStatus(false);
assertEquals(numOfGroom, cluster.getGroomServers());
bsp.setNumBspTask(numBspTasks);
// Copy binary to DFS
Path testExec = new Path(EXAMPLE_TMP_OUTPUT + "testing/bin/application");
fs.delete(testExec.getParent(), true);
fs.copyFromLocalFile(program, testExec);
// Set Executable
Submitter.setExecutable(conf, fs.makeQualified(testExec).toString());
// Run bspJob
Submitter.runJob(bsp);
LOG.info("Client finishes execution job");
// check output
FileStatus[] listStatus = fs.listStatus(outputPath);
assertEquals(listStatus.length, numBspTasks);
}
}