blob: 994aae981b7a15aea8ee8b1893e6937dda8db9d1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch;
import com.google.common.collect.Lists;
import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.lib.Sort;
import org.apache.crunch.test.StringWrapper;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import static org.apache.crunch.lib.Sort.ColumnOrder.by;
import static org.apache.crunch.lib.Sort.Order.ASCENDING;
import static org.apache.crunch.lib.Sort.Order.DESCENDING;
import static org.apache.crunch.test.StringWrapper.wrap;
import static org.junit.Assert.assertEquals;
public class SparkSortIT implements Serializable {
@Rule
public transient TemporaryPath tmpDir = new TemporaryPath();
@Test
public void testWritableSortAsc() throws Exception {
runSingle(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), Sort.Order.ASCENDING,
"A\tand this text as well");
}
@Test
public void testWritableSortDesc() throws Exception {
runSingle(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), Sort.Order.DESCENDING,
"B\tthis doc has some text");
}
@Test
public void testWritableSortAscDesc() throws Exception {
runPair(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
"this doc has this text");
}
@Test
public void testWritableSortSecondDescFirstAsc() throws Exception {
runPair(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
"this doc has this text");
}
@Test
public void testWritableSortTripleAscDescAsc() throws Exception {
runTriple(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
by(3, ASCENDING), "A", "this", "doc");
}
@Test
public void testWritableSortQuadAscDescAscDesc() throws Exception {
runQuad(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
}
@Test
public void testWritableSortTupleNAscDesc() throws Exception {
runTupleN(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(),
new Sort.ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
}
@Test
public void testWritableSortTable() throws Exception {
runTable(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), "A");
}
@Test
public void testAvroSortAsc() throws Exception {
runSingle(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), Sort.Order.ASCENDING, "A\tand this text as well");
}
@Test
public void testAvroSortDesc() throws Exception {
runSingle(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), Sort.Order.DESCENDING, "B\tthis doc has some text");
}
@Test
public void testAvroSortPairAscDesc() throws Exception {
runPair(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
"this doc has this text");
}
@Test
public void testAvroSortPairSecondDescFirstAsc() throws Exception {
runPair(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
"this doc has this text");
}
@Test
public void testAvroSortTripleAscDescAsc() throws Exception {
runTriple(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
by(3, ASCENDING), "A", "this", "doc");
}
@Test
public void testAvroSortQuadAscDescAscDesc() throws Exception {
runQuad(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
}
@Test
public void testAvroSortTupleNAscDesc() throws Exception {
runTupleN(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(),
new Sort.ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
}
@Test
public void testAvroReflectSortPair() throws IOException {
Pipeline pipeline = new SparkPipeline("local", "sort");
pipeline.enableDebug();
String rsrc = tmpDir.copyResourceFileName("set2.txt");
PCollection<Pair<String, StringWrapper>> in = pipeline.readTextFile(rsrc)
.parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
@Override
public Pair<String, StringWrapper> map(String input) {
return Pair.of(input, wrap(input));
}
}, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class)));
PCollection<Pair<String, StringWrapper>> sorted = Sort.sort(in, Sort.Order.ASCENDING);
List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
expected.add(Pair.of("a", wrap("a")));
expected.add(Pair.of("c", wrap("c")));
expected.add(Pair.of("d", wrap("d")));
assertEquals(expected, Lists.newArrayList(sorted.materialize()));
pipeline.done();
}
@Test
public void testAvroReflectSortTable() throws IOException {
Pipeline pipeline = new SparkPipeline("local", "sort");
PTable<String, StringWrapper> unsorted = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")).parallelDo(
new MapFn<String, Pair<String, StringWrapper>>() {
@Override
public Pair<String, StringWrapper> map(String input) {
return Pair.of(input, wrap(input));
}
}, Avros.tableOf(Avros.strings(), Avros.reflects(StringWrapper.class)));
PTable<String, StringWrapper> sorted = Sort.sort(unsorted);
List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
expected.add(Pair.of("a", wrap("a")));
expected.add(Pair.of("c", wrap("c")));
expected.add(Pair.of("d", wrap("d")));
assertEquals(expected, Lists.newArrayList(sorted.materialize()));
pipeline.done();
}
@Test
public void testAvroSortTable() throws Exception {
runTable(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), "A");
}
private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Sort.Order order, String firstLine) throws IOException {
String inputPath = tmpDir.copyResourceFileName("docs.txt");
PCollection<String> input = pipeline.readTextFile(inputPath);
// following turns the input from Writables to required type family
PCollection<String> input2 = input.parallelDo(new DoFn<String, String>() {
@Override
public void process(String input, Emitter<String> emitter) {
emitter.emit(input);
}
}, typeFamily.strings());
PCollection<String> sorted = Sort.sort(input2, order);
Iterable<String> lines = sorted.materialize();
assertEquals(firstLine, lines.iterator().next());
pipeline.done(); // TODO: finally
}
private void runPair(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder first, Sort.ColumnOrder second,
String firstField, String secondField) throws IOException {
String inputPath = tmpDir.copyResourceFileName("docs.txt");
PCollection<String> input = pipeline.readTextFile(inputPath);
PTable<String, String> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() {
@Override
public void process(String input, Emitter<Pair<String, String>> emitter) {
String[] split = input.split("[\t]+");
emitter.emit(Pair.of(split[0], split[1]));
}
}, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
List<Pair<String, String>> lines = Lists.newArrayList(sorted.materialize());
Pair<String, String> l = lines.iterator().next();
assertEquals(firstField, l.first());
assertEquals(secondField, l.second());
pipeline.done();
}
private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder first, Sort.ColumnOrder second,
Sort.ColumnOrder third, String firstField, String secondField, String thirdField) throws IOException {
String inputPath = tmpDir.copyResourceFileName("docs.txt");
PCollection<String> input = pipeline.readTextFile(inputPath);
PCollection<Tuple3<String, String, String>> kv = input.parallelDo(
new DoFn<String, Tuple3<String, String, String>>() {
@Override
public void process(String input, Emitter<Tuple3<String, String, String>> emitter) {
String[] split = input.split("[\t ]+");
int len = split.length;
emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len]));
}
}, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third);
List<Tuple3<String, String, String>> lines = Lists.newArrayList(sorted.materialize());
Tuple3<String, String, String> l = lines.iterator().next();
assertEquals(firstField, l.first());
assertEquals(secondField, l.second());
assertEquals(thirdField, l.third());
pipeline.done();
}
private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder first, Sort.ColumnOrder second,
Sort.ColumnOrder third, Sort.ColumnOrder fourth, String firstField, String secondField, String thirdField,
String fourthField) throws IOException {
String inputPath = tmpDir.copyResourceFileName("docs.txt");
PCollection<String> input = pipeline.readTextFile(inputPath);
PCollection<Tuple4<String, String, String, String>> kv = input.parallelDo(
new DoFn<String, Tuple4<String, String, String, String>>() {
@Override
public void process(String input, Emitter<Tuple4<String, String, String, String>> emitter) {
String[] split = input.split("[\t ]+");
int len = split.length;
emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
}
}, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second, third, fourth);
Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize();
Tuple4<String, String, String, String> l = lines.iterator().next();
assertEquals(firstField, l.first());
assertEquals(secondField, l.second());
assertEquals(thirdField, l.third());
assertEquals(fourthField, l.fourth());
pipeline.done();
}
private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder[] orders, String[] fields)
throws IOException {
String inputPath = tmpDir.copyResourceFileName("docs.txt");
PCollection<String> input = pipeline.readTextFile(inputPath);
PType[] types = new PType[orders.length];
Arrays.fill(types, typeFamily.strings());
PCollection<TupleN> kv = input.parallelDo(new DoFn<String, TupleN>() {
@Override
public void process(String input, Emitter<TupleN> emitter) {
String[] split = input.split("[\t]+");
emitter.emit(new TupleN(split));
}
}, typeFamily.tuples(types));
PCollection<TupleN> sorted = Sort.sortTuples(kv, orders);
Iterable<TupleN> lines = sorted.materialize();
TupleN l = lines.iterator().next();
int i = 0;
for (String field : fields) {
assertEquals(field, l.get(i++));
}
pipeline.done();
}
private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey) throws IOException {
String inputPath = tmpDir.copyResourceFileName("docs.txt");
PCollection<String> input = pipeline.readTextFile(inputPath);
PTable<String, String> table = input.parallelDo(new DoFn<String, Pair<String, String>>() {
@Override
public void process(String input, Emitter<Pair<String, String>> emitter) {
String[] split = input.split("[\t]+");
emitter.emit(Pair.of(split[0], split[1]));
}
}, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
PTable<String, String> sorted = Sort.sort(table);
Iterable<Pair<String, String>> lines = sorted.materialize();
Pair<String, String> l = lines.iterator().next();
assertEquals(firstKey, l.first());
pipeline.done();
}
}