Refactored RowHash and TeraSortIngest (#68)
diff --git a/bin/mapred b/bin/mapred
new file mode 100755
index 0000000..f943b45
--- /dev/null
+++ b/bin/mapred
@@ -0,0 +1,67 @@
+#! /usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
+at_home=$( cd "$( dirname "$bin_dir" )" && pwd )
+
+function print_usage() {
+ cat <<EOF
+
+Usage: mapred <application> {-o test.<prop>=<value>}
+
+Available applications:
+
+ terasort Run Terasort
+ rowhash Run RowHash
+EOF
+}
+
+if [ -f "$at_home/conf/env.sh" ]; then
+ . "$at_home"/conf/env.sh
+else
+ . "$at_home"/conf/env.sh.example
+fi
+
+if [ -z "$1" ]; then
+ echo "ERROR: <application> needs to be set"
+ print_usage
+ exit 1
+fi
+
+mr_package="org.apache.accumulo.testing.mapreduce"
+case "$1" in
+ terasort)
+ mr_main="${mr_package}.TeraSortIngest"
+ ;;
+ rowhash)
+ mr_main="${mr_package}.RowHash"
+ ;;
+ *)
+ echo "Unknown application: $1"
+ print_usage
+ exit 1
+esac
+
+export CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH"
+
+if [ ! -z $HADOOP_HOME ]; then
+ export HADOOP_USE_CLIENT_CLASSLOADER=true
+ "$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$mr_main" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS" "${@:2}"
+else
+ echo "Hadoop must be installed and HADOOP_HOME must be set!"
+ exit 1
+fi
diff --git a/conf/accumulo-testing.properties.example b/conf/accumulo-testing.properties.example
index 502bcde..9dbe4e0 100644
--- a/conf/accumulo-testing.properties.example
+++ b/conf/accumulo-testing.properties.example
@@ -119,3 +119,32 @@
# produce a bulk import file.
test.ci.bulk.reducers.max=1024
+#################
+# MapReduce Tests
+#################
+
+# RowHash test
+# ------------
+# Table containing input data
+test.rowhash.input.table = terasort
+# Table where data will be output to
+test.rowhash.output.table = rowhash
+# Column that is fetched in input table
+test.rowhash.column = c
+
+# TeraSort ingest
+# ---------------
+# Table to ingest into
+test.terasort.table = terasort
+# Number of rows to ingest
+test.terasort.num.rows = 10000
+# Minimum key size
+test.terasort.min.keysize = 10
+# Maximum key size
+test.terasort.max.keysize = 10
+# Minimum value size
+test.terasort.min.valuesize = 78
+# Maximum value size
+test.terasort.max.valuesize = 78
+# Number of table splits
+test.terasort.num.splits = 4
diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java
index 3f2ca15..49ea718 100644
--- a/src/main/java/org/apache/accumulo/testing/TestProps.java
+++ b/src/main/java/org/apache/accumulo/testing/TestProps.java
@@ -33,6 +33,8 @@
private static final String CI_SCANNER = CI + "scanner.";
private static final String CI_VERIFY = CI + "verify.";
private static final String CI_BULK = CI + "bulk.";
+ public static final String TERASORT = PREFIX + "terasort.";
+ public static final String ROWHASH = PREFIX + "rowhash.";
/** Common properties **/
// HDFS root path. Should match 'fs.defaultFS' property in Hadoop's core-site.xml
@@ -122,6 +124,20 @@
public static final String CI_BULK_MAP_NODES = CI_BULK + "map.nodes";
public static final String CI_BULK_REDUCERS = CI_BULK + "reducers.max";
+ /** TeraSort **/
+ public static final String TERASORT_TABLE = TERASORT + "table";
+ public static final String TERASORT_NUM_ROWS = TERASORT + "num.rows";
+ public static final String TERASORT_MIN_KEYSIZE = TERASORT + "min.keysize";
+ public static final String TERASORT_MAX_KEYSIZE = TERASORT + "max.keysize";
+ public static final String TERASORT_MIN_VALUESIZE = TERASORT + "min.valuesize";
+ public static final String TERASORT_MAX_VALUESIZE = TERASORT + "max.valuesize";
+ public static final String TERASORT_NUM_SPLITS = TERASORT + "num.splits";
+
+ /** RowHash **/
+ public static final String ROWHASH_INPUT_TABLE = ROWHASH + "input.table";
+ public static final String ROWHASH_OUTPUT_TABLE = ROWHASH + "output.table";
+ public static final String ROWHASH_COLUMN = ROWHASH + "column";
+
public static Properties loadFromFile(String propsFilePath) {
try {
return loadFromStream(new FileInputStream(propsFilePath));
diff --git a/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java b/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java
index dca7128..4722799 100644
--- a/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java
+++ b/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java
@@ -20,6 +20,7 @@
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
+import java.util.Properties;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.Key;
@@ -27,8 +28,8 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.testing.cli.ClientOpts;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.testing.TestEnv;
+import org.apache.accumulo.testing.TestProps;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
@@ -37,8 +38,6 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import com.beust.jcommander.Parameter;
-
public class RowHash extends Configured implements Tool {
/**
* The Mapper class that given a row number, will generate the appropriate output line.
@@ -57,23 +56,16 @@
public void setup(Context job) {}
}
- private static class Opts extends ClientOpts {
- @Parameter(names = "--column", required = true)
- String column;
- @Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
- String tableName;
- }
-
@Override
public int run(String[] args) throws Exception {
+ TestEnv env = new TestEnv(args);
Job job = Job.getInstance(getConf());
job.setJobName(this.getClass().getName());
job.setJarByClass(this.getClass());
- Opts opts = new Opts();
- opts.parseArgs(RowHash.class.getName(), args);
job.setInputFormatClass(AccumuloInputFormat.class);
- String col = opts.column;
+ Properties props = env.getTestProperties();
+ String col = props.getProperty(TestProps.ROWHASH_COLUMN);
int idx = col.indexOf(":");
Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
@@ -81,12 +73,15 @@
if (cf.getLength() > 0)
cols = Collections.singleton(new IteratorSetting.Column(cf, cq));
- AccumuloInputFormat.configure().clientProperties(opts.getClientProps()).table(opts.tableName)
- .auths(opts.auths).fetchColumns(cols).store(job);
+ String inputTable = props.getProperty(TestProps.ROWHASH_INPUT_TABLE);
+ String outputTable = props.getProperty(TestProps.ROWHASH_OUTPUT_TABLE);
- AccumuloOutputFormat.configure().clientProperties(opts.getClientProps())
- .defaultTable(opts.tableName).createTables(true).store(job);
+ AccumuloInputFormat.configure().clientProperties(env.getClientProps()).table(inputTable)
+ .fetchColumns(cols).store(job);
+ AccumuloOutputFormat.configure().clientProperties(env.getClientProps())
+ .defaultTable(outputTable).createTables(true).store(job);
+ job.getConfiguration().set("mapreduce.job.classloader", "true");
job.setMapperClass(HashDataMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Mutation.class);
@@ -100,6 +95,7 @@
}
public static void main(String[] args) throws Exception {
- ToolRunner.run(new Configuration(), new RowHash(), args);
+ TestEnv env = new TestEnv(args);
+ ToolRunner.run(env.getHadoopConfiguration(), new RowHash(), args);
}
}
diff --git a/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java b/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java
index cd47e1e..ee2b5d0 100644
--- a/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java
@@ -23,12 +23,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
import java.util.Random;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.testing.cli.ClientOpts;
+import org.apache.accumulo.testing.TestEnv;
+import org.apache.accumulo.testing.TestProps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
@@ -46,8 +49,6 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import com.beust.jcommander.Parameter;
-
/**
* Generate the *almost* official terasort input data set. (See below) The user specifies the number
* of rows and the output directory and this class runs a map/reduce program to generate the data.
@@ -79,6 +80,8 @@
long firstRow;
long rowCount;
+ RangeInputSplit() {}
+
public RangeInputSplit(long offset, long length) {
firstRow = offset;
rowCount = length;
@@ -165,8 +168,8 @@
*/
@Override
public List<InputSplit> getSplits(JobContext job) {
- long totalRows = job.getConfiguration().getLong(NUMROWS, 0);
- int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1);
+ long totalRows = job.getConfiguration().getLong(TestProps.TERASORT_NUM_ROWS, 0);
+ int numSplits = job.getConfiguration().getInt(TestProps.TERASORT_NUM_SPLITS, 1);
long rowsPerSplit = totalRows / numSplits;
System.out.println(
"Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit);
@@ -183,12 +186,9 @@
}
- private static String NUMSPLITS = "terasort.overridesplits";
- private static String NUMROWS = "terasort.numrows";
-
static class RandomGenerator {
private long seed = 0;
- private static final long mask32 = (1l << 32) - 1;
+ private static final long mask32 = (1L << 32) - 1;
/**
* The number of iterations separating the precomputed seeds.
*/
@@ -343,65 +343,49 @@
@Override
public void setup(Context job) {
- minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0);
- maxkeylength = job.getConfiguration().getInt("cloudgen.maxkeylength", 0);
- minvaluelength = job.getConfiguration().getInt("cloudgen.minvaluelength", 0);
- maxvaluelength = job.getConfiguration().getInt("cloudgen.maxvaluelength", 0);
- tableName = new Text(job.getConfiguration().get("cloudgen.tablename"));
+ minkeylength = job.getConfiguration().getInt(TestProps.TERASORT_MIN_KEYSIZE, 0);
+ maxkeylength = job.getConfiguration().getInt(TestProps.TERASORT_MAX_KEYSIZE, 0);
+ minvaluelength = job.getConfiguration().getInt(TestProps.TERASORT_MIN_VALUESIZE, 0);
+ maxvaluelength = job.getConfiguration().getInt(TestProps.TERASORT_MAX_VALUESIZE, 0);
+ tableName = new Text(job.getConfiguration().get(TestProps.TERASORT_TABLE));
}
}
public static void main(String[] args) throws Exception {
- ToolRunner.run(new Configuration(), new TeraSortIngest(), args);
- }
-
- static class Opts extends ClientOpts {
- @Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
- String tableName;
- @Parameter(names = "--count", description = "number of rows to ingest", required = true)
- long numRows;
- @Parameter(names = {"-nk", "--minKeySize"}, description = "miniumum key size", required = true)
- int minKeyLength;
- @Parameter(names = {"-xk", "--maxKeySize"}, description = "maximum key size", required = true)
- int maxKeyLength;
- @Parameter(names = {"-nv", "--minValueSize"}, description = "minimum key size", required = true)
- int minValueLength;
- @Parameter(names = {"-xv", "--maxValueSize"}, description = "maximum key size", required = true)
- int maxValueLength;
- @Parameter(names = "--splits", description = "number of splits to create in the table")
- int splits = 0;
+ TestEnv env = new TestEnv(args);
+ ToolRunner.run(env.getHadoopConfiguration(), new TeraSortIngest(), args);
}
@Override
public int run(String[] args) throws Exception {
- Job job = Job.getInstance(getConf());
- job.setJobName("TeraSortCloud");
- job.setJarByClass(this.getClass());
- Opts opts = new Opts();
- opts.parseArgs(TeraSortIngest.class.getName(), args);
+ TestEnv env = new TestEnv(args);
+
+ Job job = Job.getInstance(getConf());
+ job.setJobName("TeraSortIngest");
+ job.setJarByClass(this.getClass());
job.setInputFormatClass(RangeInputFormat.class);
job.setMapperClass(SortGenMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Mutation.class);
-
job.setNumReduceTasks(0);
-
job.setOutputFormatClass(AccumuloOutputFormat.class);
- AccumuloOutputFormat.configure().clientProperties(opts.getClientProps()).createTables(true)
- .defaultTable(opts.tableName);
+ Properties testProps = env.getTestProperties();
+ String tableName = testProps.getProperty(TestProps.TERASORT_TABLE);
+ Objects.requireNonNull(tableName);
+
+ AccumuloOutputFormat.configure().clientProperties(env.getClientProps()).createTables(true)
+ .defaultTable(tableName).store(job);
Configuration conf = job.getConfiguration();
- conf.setLong(NUMROWS, opts.numRows);
- conf.setInt("cloudgen.minkeylength", opts.minKeyLength);
- conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength);
- conf.setInt("cloudgen.minvaluelength", opts.minValueLength);
- conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength);
- conf.set("cloudgen.tablename", opts.tableName);
-
- if (args.length > 10)
- conf.setInt(NUMSPLITS, opts.splits);
+ conf.set("mapreduce.job.classloader", "true");
+ for (Object keyObj : testProps.keySet()) {
+ String key = (String) keyObj;
+ if (key.startsWith(TestProps.TERASORT)) {
+ conf.set(key, testProps.getProperty(key));
+ }
+ }
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
diff --git a/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java b/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java
index 6271fd4..972206a 100644
--- a/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java
+++ b/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java
@@ -50,5 +50,4 @@
r.visit(null, env, null);
}
-
}