MAPREDUCE-5157. Bring back old sampler related code so that we can support binary compatibility with hadoop-1 sorter example. Contributed by Zhijie Shen.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1480474 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 68b780f..cb74b98 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -224,6 +224,9 @@
MAPREDUCE-5159. Change ValueAggregatorJob to add APIs which can support
binary compatibility with hadoop-1 examples. (Zhijie Shen via vinodkv)
+ MAPREDUCE-5157. Bring back old sampler related code so that we can support
+ binary compatibility with hadoop-1 sorter example. (Zhijie Shen via vinodkv)
+
OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java
index b99b0c7..a55abe6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java
@@ -19,10 +19,18 @@
package org.apache.hadoop.mapred.lib;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
@InterfaceAudience.Public
@@ -30,6 +38,8 @@
public class InputSampler<K,V> extends
org.apache.hadoop.mapreduce.lib.partition.InputSampler<K, V> {
+ private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
public InputSampler(JobConf conf) {
super(conf);
}
@@ -38,4 +48,219 @@ public static <K,V> void writePartitionFile(JobConf job, Sampler<K,V> sampler)
throws IOException, ClassNotFoundException, InterruptedException {
writePartitionFile(new Job(job), sampler);
}
+ /**
+ * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
+ */
+ public interface Sampler<K,V> extends
+ org.apache.hadoop.mapreduce.lib.partition.InputSampler.Sampler<K, V> {
+ /**
+ * For a given job, collect and return a subset of the keys from the
+ * input data.
+ */
+ K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
+ }
+
+ /**
+ * Samples the first n records from s splits.
+ * Inexpensive way to sample random data.
+ */
+ public static class SplitSampler<K,V> extends
+ org.apache.hadoop.mapreduce.lib.partition.InputSampler.SplitSampler<K, V>
+ implements Sampler<K,V> {
+
+ /**
+ * Create a SplitSampler sampling <em>all</em> splits.
+ * Takes the first numSamples / numSplits records from each split.
+ * @param numSamples Total number of samples to obtain from all selected
+ * splits.
+ */
+ public SplitSampler(int numSamples) {
+ this(numSamples, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Create a new SplitSampler.
+ * @param numSamples Total number of samples to obtain from all selected
+ * splits.
+ * @param maxSplitsSampled The maximum number of splits to examine.
+ */
+ public SplitSampler(int numSamples, int maxSplitsSampled) {
+ super(numSamples, maxSplitsSampled);
+ }
+
+ /**
+ * From each split sampled, take the first numSamples / numSplits records.
+ */
+ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+ public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+ InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+ ArrayList<K> samples = new ArrayList<K>(numSamples);
+ int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+ int splitStep = splits.length / splitsToSample;
+ int samplesPerSplit = numSamples / splitsToSample;
+ long records = 0;
+ for (int i = 0; i < splitsToSample; ++i) {
+ RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+ job, Reporter.NULL);
+ K key = reader.createKey();
+ V value = reader.createValue();
+ while (reader.next(key, value)) {
+ samples.add(key);
+ key = reader.createKey();
+ ++records;
+ if ((i+1) * samplesPerSplit <= records) {
+ break;
+ }
+ }
+ reader.close();
+ }
+ return (K[])samples.toArray();
+ }
+ }
+
+ /**
+ * Sample from random points in the input.
+ * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
+ * each split.
+ */
+ public static class RandomSampler<K,V> extends
+ org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler<K, V>
+ implements Sampler<K,V> {
+
+ /**
+ * Create a new RandomSampler sampling <em>all</em> splits.
+ * This will read every split at the client, which is very expensive.
+ * @param freq Probability with which a key will be chosen.
+ * @param numSamples Total number of samples to obtain from all selected
+ * splits.
+ */
+ public RandomSampler(double freq, int numSamples) {
+ this(freq, numSamples, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Create a new RandomSampler.
+ * @param freq Probability with which a key will be chosen.
+ * @param numSamples Total number of samples to obtain from all selected
+ * splits.
+ * @param maxSplitsSampled The maximum number of splits to examine.
+ */
+ public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
+ super(freq, numSamples, maxSplitsSampled);
+ }
+
+ /**
+ * Randomize the split order, then take the specified number of keys from
+ * each split sampled, where each key is selected with the specified
+ * probability and possibly replaced by a subsequently selected key when
+ * the quota of keys from that split is satisfied.
+ */
+ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+ public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+ InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+ ArrayList<K> samples = new ArrayList<K>(numSamples);
+ int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.debug("seed: " + seed);
+ // shuffle splits
+ for (int i = 0; i < splits.length; ++i) {
+ InputSplit tmp = splits[i];
+ int j = r.nextInt(splits.length);
+ splits[i] = splits[j];
+ splits[j] = tmp;
+ }
+ // our target rate is in terms of the maximum number of sample splits,
+ // but we accept the possibility of sampling additional splits to hit
+ // the target sample keyset
+ for (int i = 0; i < splitsToSample ||
+ (i < splits.length && samples.size() < numSamples); ++i) {
+ RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
+ Reporter.NULL);
+ K key = reader.createKey();
+ V value = reader.createValue();
+ while (reader.next(key, value)) {
+ if (r.nextDouble() <= freq) {
+ if (samples.size() < numSamples) {
+ samples.add(key);
+ } else {
+ // When exceeding the maximum number of samples, replace a
+ // random element with this one, then adjust the frequency
+ // to reflect the possibility of existing elements being
+ // pushed out
+ int ind = r.nextInt(numSamples);
+ if (ind != numSamples) {
+ samples.set(ind, key);
+ }
+ freq *= (numSamples - 1) / (double) numSamples;
+ }
+ key = reader.createKey();
+ }
+ }
+ reader.close();
+ }
+ return (K[])samples.toArray();
+ }
+ }
+
+ /**
+ * Sample from s splits at regular intervals.
+ * Useful for sorted data.
+ */
+ public static class IntervalSampler<K,V> extends
+ org.apache.hadoop.mapreduce.lib.partition.InputSampler.IntervalSampler<K, V>
+ implements Sampler<K,V> {
+
+ /**
+ * Create a new IntervalSampler sampling <em>all</em> splits.
+ * @param freq The frequency with which records will be emitted.
+ */
+ public IntervalSampler(double freq) {
+ this(freq, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Create a new IntervalSampler.
+ * @param freq The frequency with which records will be emitted.
+ * @param maxSplitsSampled The maximum number of splits to examine.
+ * @see #getSample
+ */
+ public IntervalSampler(double freq, int maxSplitsSampled) {
+ super(freq, maxSplitsSampled);
+ }
+
+ /**
+ * For each split sampled, emit when the ratio of the number of records
+ * retained to the total record count is less than the specified
+ * frequency.
+ */
+ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+ public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+ InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+ ArrayList<K> samples = new ArrayList<K>();
+ int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+ int splitStep = splits.length / splitsToSample;
+ long records = 0;
+ long kept = 0;
+ for (int i = 0; i < splitsToSample; ++i) {
+ RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+ job, Reporter.NULL);
+ K key = reader.createKey();
+ V value = reader.createValue();
+ while (reader.next(key, value)) {
+ ++records;
+ if ((double) kept / records < freq) {
+ ++kept;
+ samples.add(key);
+ key = reader.createKey();
+ }
+ }
+ reader.close();
+ }
+ return (K[])samples.toArray();
+ }
+ }
+
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
index 72b47f2..7423168 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
@@ -96,8 +96,8 @@ public interface Sampler<K,V> {
*/
public static class SplitSampler<K,V> implements Sampler<K,V> {
- private final int numSamples;
- private final int maxSplitsSampled;
+ protected final int numSamples;
+ protected final int maxSplitsSampled;
/**
* Create a SplitSampler sampling <em>all</em> splits.
@@ -157,9 +157,9 @@ public SplitSampler(int numSamples, int maxSplitsSampled) {
* each split.
*/
public static class RandomSampler<K,V> implements Sampler<K,V> {
- private double freq;
- private final int numSamples;
- private final int maxSplitsSampled;
+ protected double freq;
+ protected final int numSamples;
+ protected final int maxSplitsSampled;
/**
* Create a new RandomSampler sampling <em>all</em> splits.
@@ -249,8 +249,8 @@ public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
* Useful for sorted data.
*/
public static class IntervalSampler<K,V> implements Sampler<K,V> {
- private final double freq;
- private final int maxSplitsSampled;
+ protected final double freq;
+ protected final int maxSplitsSampled;
/**
* Create a new IntervalSampler sampling <em>all</em> splits.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
index b35e843..d2bcd6f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
@@ -17,23 +17,26 @@
*/
package org.apache.hadoop.mapreduce.lib.partition;
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Test;
public class TestInputSampler {
@@ -47,6 +50,24 @@ static class SequentialSplit extends InputSplit {
public int getInit() { return i; }
}
+ static class MapredSequentialSplit implements org.apache.hadoop.mapred.InputSplit {
+ private int i;
+ MapredSequentialSplit(int i) {
+ this.i = i;
+ }
+ @Override
+ public long getLength() { return 0; }
+ @Override
+ public String[] getLocations() { return new String[0]; }
+ public int getInit() { return i; }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ }
+ }
+
static class TestInputSamplerIF
extends InputFormat<IntWritable,NullWritable> {
@@ -90,6 +111,71 @@ public void close() { }
}
+ static class TestMapredInputSamplerIF extends TestInputSamplerIF implements
+ org.apache.hadoop.mapred.InputFormat<IntWritable,NullWritable> {
+
+ TestMapredInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
+ super(maxDepth, numSplits, splitInit);
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job,
+ int numSplits) throws IOException {
+ List<InputSplit> splits = null;
+ try {
+ splits = getSplits(Job.getInstance(job));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ org.apache.hadoop.mapred.InputSplit[] retVals =
+ new org.apache.hadoop.mapred.InputSplit[splits.size()];
+ for (int i = 0; i < splits.size(); ++i) {
+ MapredSequentialSplit split = new MapredSequentialSplit(
+ ((SequentialSplit) splits.get(i)).getInit());
+ retVals[i] = split;
+ }
+ return retVals;
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordReader<IntWritable, NullWritable>
+ getRecordReader(final org.apache.hadoop.mapred.InputSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+ return new org.apache.hadoop.mapred.RecordReader
+ <IntWritable, NullWritable>() {
+ private final IntWritable i =
+ new IntWritable(((MapredSequentialSplit)split).getInit());
+ private int maxVal = i.get() + maxDepth + 1;
+
+ @Override
+ public boolean next(IntWritable key, NullWritable value)
+ throws IOException {
+ i.set(i.get() + 1);
+ return i.get() < maxVal;
+ }
+ @Override
+ public IntWritable createKey() {
+ return new IntWritable(i.get());
+ }
+ @Override
+ public NullWritable createValue() {
+ return NullWritable.get();
+ }
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+ @Override
+ public void close() throws IOException {
+ }
+ @Override
+ public float getProgress() throws IOException {
+ return 0;
+ }
+ };
+ }
+ }
+
/**
* Verify SplitSampler contract, that an equal number of records are taken
* from the first splits.
@@ -119,6 +205,36 @@ public void testSplitSampler() throws Exception {
}
/**
+ * Verify SplitSampler contract in mapred.lib.InputSampler, which is added
+ * back for binary compatibility of M/R 1.x
+ */
+ @Test (timeout = 30000)
+ @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+ public void testMapredSplitSampler() throws Exception {
+ final int TOT_SPLITS = 15;
+ final int NUM_SPLITS = 5;
+ final int STEP_SAMPLE = 5;
+ final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
+ org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
+ sampler = new org.apache.hadoop.mapred.lib.InputSampler.SplitSampler
+ <IntWritable,NullWritable>(NUM_SAMPLES, NUM_SPLITS);
+ int inits[] = new int[TOT_SPLITS];
+ for (int i = 0; i < TOT_SPLITS; ++i) {
+ inits[i] = i * STEP_SAMPLE;
+ }
+ Object[] samples = sampler.getSample(
+ new TestMapredInputSamplerIF(100000, TOT_SPLITS, inits),
+ new JobConf());
+ assertEquals(NUM_SAMPLES, samples.length);
+ Arrays.sort(samples, new IntWritable.Comparator());
+ for (int i = 0; i < NUM_SAMPLES; ++i) {
+ // mapred.lib.InputSampler.SplitSampler has a sampling step
+ assertEquals(i % STEP_SAMPLE + TOT_SPLITS * (i / STEP_SAMPLE),
+ ((IntWritable)samples[i]).get());
+ }
+ }
+
+ /**
* Verify IntervalSampler contract, that samples are taken at regular
* intervals from the given splits.
*/
@@ -146,4 +262,33 @@ public void testIntervalSampler() throws Exception {
}
}
+ /**
+ * Verify IntervalSampler in mapred.lib.InputSampler, which is added back
+ * for binary compatibility of M/R 1.x
+ */
+ @Test (timeout = 30000)
+ @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+ public void testMapredIntervalSampler() throws Exception {
+ final int TOT_SPLITS = 16;
+ final int PER_SPLIT_SAMPLE = 4;
+ final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
+ final double FREQ = 1.0 / TOT_SPLITS;
+ org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
+ sampler = new org.apache.hadoop.mapred.lib.InputSampler.IntervalSampler
+ <IntWritable,NullWritable>(FREQ, NUM_SAMPLES);
+ int inits[] = new int[TOT_SPLITS];
+ for (int i = 0; i < TOT_SPLITS; ++i) {
+ inits[i] = i;
+ }
+ Job ignored = Job.getInstance();
+ Object[] samples = sampler.getSample(new TestInputSamplerIF(
+ NUM_SAMPLES, TOT_SPLITS, inits), ignored);
+ assertEquals(NUM_SAMPLES, samples.length);
+ Arrays.sort(samples, new IntWritable.Comparator());
+ for (int i = 0; i < NUM_SAMPLES; ++i) {
+ assertEquals(i,
+ ((IntWritable)samples[i]).get());
+ }
+ }
+
}