blob: 817b6f3222e4fb3be561fe0507cde53d397f5e7a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.examples;
import java.io.Serializable;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.lib.SecondarySort;
import org.apache.crunch.io.To;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Splitter;
@SuppressWarnings("serial")
public class SecondarySortExample extends Configured implements Tool, Serializable {
static enum COUNTERS {
CORRUPT_TIMESTAMP,
CORRUPT_LINE
}
// Input records are comma separated. The first field is grouping record. The
// second is the one to sort on (a long in this implementation). The rest is
// payload to be sorted.
// For example:
// one,1,1
// one,2,-3
// two,4,5
// two,2,6
// two,1,7,9
// three,0,-1
// one,-5,10
// one,-10,garbage
private static final char SPLIT_ON = ',';
private static final Splitter INPUT_SPLITTER = Splitter.on(SPLIT_ON).trimResults().omitEmptyStrings().limit(3);
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println();
System.err.println("Usage: " + this.getClass().getName()
+ " [generic options] input output");
System.err.println();
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(SecondarySortExample.class, getConf());
// Reference a given text file as a collection of Strings.
PCollection<String> lines = pipeline.readTextFile(args[0]);
// Define a function that parses each line in a PCollection of Strings into
// a pair of pairs, the first of which will be grouped by (first member) and
// the sorted by (second memeber). The second pair is payload which can be
// passed in an Iterable object.
PTable<String, Pair<Long, String>> pairs = lines.parallelDo("extract_records",
new DoFn<String, Pair<String, Pair<Long, String>>>() {
@Override
public void process(String line, Emitter<Pair<String, Pair<Long, String>>> emitter) {
int i = 0;
String key = "";
long timestamp = 0;
String value = "";
for (String element : INPUT_SPLITTER.split(line)) {
switch (++i) {
case 1:
key = element;
break;
case 2:
try {
timestamp = Long.parseLong(element);
} catch (NumberFormatException e) {
System.out.println("Timestamp not in long format '" + line + "'");
this.increment(COUNTERS.CORRUPT_TIMESTAMP);
}
break;
case 3:
value = element;
break;
default:
System.err.println("i = " + i + " should never happen!");
break;
}
}
if (i == 3) {
Long sortby = new Long(timestamp);
emitter.emit(Pair.of(key, Pair.of(sortby, value)));
} else {
this.increment(COUNTERS.CORRUPT_LINE);
}
}}, Avros.tableOf(Avros.strings(), Avros.pairs(Avros.longs(), Avros.strings())));
// The output of the above input will be (with one reducer):
// one : [[-10,garbage],[-5,10],[1,1],[2,-3]]
// three : [[0,-1]]
// two : [[1,7,9],[2,6],[4,5]]
SecondarySort.sortAndApply(pairs,
new DoFn<Pair<String, Iterable<Pair<Long, String>>>, String>() {
final StringBuilder sb = new StringBuilder();
@Override
public void process(Pair<String, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
sb.setLength(0);
sb.append(input.first());
sb.append(" : [");
boolean first = true;
for(Pair<Long, String> pair : input.second()) {
if (first) {
first = false;
} else {
sb.append(',');
}
sb.append(pair);
}
sb.append("]");
emitter.emit(sb.toString());
}
}, Writables.strings()).write(To.textFile(args[1]));
// Execute the pipeline as a MapReduce.
return pipeline.done().succeeded() ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = -1;
try {
exitCode = ToolRunner.run(new Configuration(), new SecondarySortExample(), args);
} catch (Throwable e) {
e.printStackTrace();
}
System.exit(exitCode);
}
}