blob: ffc09c3729558d7477cbf076e28e3b00bfdde0b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.crunch.PipelineResult.StageResult;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.At;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.To;
import org.apache.crunch.test.StringWrapper;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.apache.crunch.types.writable.Writables;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
public class MultipleOutputIT {
@Rule
public TemporaryPath tmpDir = TemporaryPaths.create();
public static PCollection<String> evenCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
return words.parallelDo("even", new FilterFn<String>() {
@Override
public boolean accept(String input) {
return input.length() % 2 == 0;
}
}, typeFamily.strings());
}
public static PCollection<String> oddCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
return words.parallelDo("odd", new FilterFn<String>() {
@Override
public boolean accept(String input) {
return input.length() % 2 != 0;
}
}, typeFamily.strings());
}
public static PTable<String, Long> substr(PTable<String, Long> ptable) {
return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
if (input.first().length() > 0) {
emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
}
}
}, ptable.getPTableType());
}
@Test
public void testWritables() throws IOException {
run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
}
@Test
public void testAvro() throws IOException {
run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
}
@Test
public void testParallelDosFused() throws IOException {
PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()),
WritableTypeFamily.getInstance());
// Ensure our multiple outputs were fused into a single job.
assertEquals("parallel Dos not fused into a single job", 1, result.getStageResults().size());
}
@Test
public void testCountersEnabled() throws IOException {
PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()),
WritableTypeFamily.getInstance());
assertEquals(1, result.getStageResults().size());
StageResult stageResult = result.getStageResults().get(0);
String counterGroup = CrunchOutputs.class.getName();
assertEquals(3, stageResult.getCounterNames().get(counterGroup).size());
assertEquals(1l, stageResult.getCounterValue(counterGroup, "out1"));
assertEquals(1l, stageResult.getCounterValue(counterGroup, "out2"));
assertEquals(0l, stageResult.getCounterValue(counterGroup, "out3"));
}
@Test
public void testCountersDisabled() throws IOException {
Configuration configuration = tmpDir.getDefaultConfiguration();
configuration.setBoolean(CrunchOutputs.CRUNCH_DISABLE_OUTPUT_COUNTERS, true);
PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, configuration),
WritableTypeFamily.getInstance());
assertEquals(1, result.getStageResults().size());
StageResult stageResult = result.getStageResults().get(0);
assertFalse(stageResult.getCounterNames().containsKey(CrunchOutputs.CRUNCH_OUTPUTS));
}
public PipelineResult run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
String inputPath = tmpDir.copyResourceFileName("letters.txt");
String outputPathEven = tmpDir.getFileName("even");
String outputPathOdd = tmpDir.getFileName("odd");
String outputPathReduce = tmpDir.getFileName("reduce");
PCollection<String> words = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
PCollection<String> evenCountWords = evenCountLetters(words, typeFamily);
PCollection<String> oddCountWords = oddCountLetters(words, typeFamily);
pipeline.writeTextFile(evenCountWords, outputPathEven);
pipeline.writeTextFile(oddCountWords, outputPathOdd);
evenCountWords.by(new FirstLetterFn(), typeFamily.strings())
.groupByKey()
.combineValues(Aggregators.<String>FIRST_N(10))
.write(To.textFile(outputPathReduce));
PipelineResult result = pipeline.done();
checkFileContents(outputPathEven, Arrays.asList("bb"));
checkFileContents(outputPathOdd, Arrays.asList("a"));
checkNotEmpty(outputPathReduce);
return result;
}
static class FirstLetterFn extends MapFn<String, String> {
@Override
public String map(String input) {
return input.substring(0, 1);
}
}
/**
* Mutates the state of an input and then emits the mutated object.
*/
static class AppendFn extends DoFn<StringWrapper, StringWrapper> {
private String value;
public AppendFn(String value) {
this.value = value;
}
@Override
public void process(StringWrapper input, Emitter<StringWrapper> emitter) {
input.setValue(input.getValue() + value);
emitter.emit(input);
}
}
/**
* Fusing multiple pipelines has a risk of running into object reuse bugs.
* This test verifies that mutating the state of an object that is passed
* through multiple streams of a pipeline doesn't allow one stream to affect
* another.
*/
@Test
public void testFusedMappersObjectReuseBug() throws IOException {
Pipeline pipeline = new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration());
PCollection<StringWrapper> stringWrappers = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
.parallelDo(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class));
PCollection<String> stringsA = stringWrappers.parallelDo(new AppendFn("A"), stringWrappers.getPType())
.parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
PCollection<String> stringsB = stringWrappers.parallelDo(new AppendFn("B"), stringWrappers.getPType())
.parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
String outputA = tmpDir.getFileName("stringsA");
String outputB = tmpDir.getFileName("stringsB");
pipeline.writeTextFile(stringsA, outputA);
pipeline.writeTextFile(stringsB, outputB);
PipelineResult pipelineResult = pipeline.done();
// Make sure fusing did actually occur
assertEquals(1, pipelineResult.getStageResults().size());
checkFileContents(outputA, Lists.newArrayList("cA", "dA", "aA"));
checkFileContents(outputB, Lists.newArrayList("cB", "dB", "aB"));
}
private void checkNotEmpty(String filePath) throws IOException {
File dir = new File(filePath);
File[] partFiles = dir.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("part");
}
});
assertTrue(partFiles.length > 0);
assertTrue(Files.readLines(partFiles[0], Charset.defaultCharset()).size() > 0);
}
private void checkFileContents(String filePath, List<String> expected) throws IOException {
File outputFile = new File(filePath, "part-m-00000");
List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
assertEquals(expected, lines);
}
}