blob: 8e2329a1d8fe7882a9977cfee9d2d3cbab62a8bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.hadoop.mapreduce;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests for {@link HadoopOutputFormat}.
*/
public class HadoopOutputFormatTest {
private static final String MAPRED_OUTPUT_PATH = "an/ignored/file/";
private static final String MAPRED_OUTPUT_DIR_KEY = "mapred.output.dir";
@Test
public void testWriteRecord() throws Exception {
RecordWriter<String, Long> recordWriter = mock(DummyRecordWriter.class);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
Job.getInstance(), recordWriter, null, new Configuration());
hadoopOutputFormat.writeRecord(new Tuple2<String, Long>());
verify(recordWriter, times(1)).write(anyString(), anyLong());
}
@Test
public void testOpen() throws Exception {
OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
OutputCommitter outputCommitter = setupOutputCommitter(true);
when(dummyOutputFormat.getOutputCommitter(any(TaskAttemptContext.class))).thenReturn(outputCommitter);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(dummyOutputFormat,
Job.getInstance(), new DummyRecordWriter(), setupOutputCommitter(true), new Configuration());
hadoopOutputFormat.open(1, 4);
verify(hadoopOutputFormat.outputCommitter, times(1)).setupJob(any(JobContext.class));
verify(hadoopOutputFormat.mapreduceOutputFormat, times(1)).getRecordWriter(any(TaskAttemptContext.class));
}
@Test
public void testCloseWithNeedsTaskCommitTrue() throws Exception {
RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
OutputCommitter outputCommitter = setupOutputCommitter(true);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
Job.getInstance(), recordWriter, outputCommitter, new Configuration());
hadoopOutputFormat.close();
verify(outputCommitter, times(1)).commitTask(any(TaskAttemptContext.class));
verify(recordWriter, times(1)).close(any(TaskAttemptContext.class));
}
@Test
public void testCloseWithNeedsTaskCommitFalse() throws Exception {
RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
OutputCommitter outputCommitter = setupOutputCommitter(false);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
Job.getInstance(), recordWriter, outputCommitter, new Configuration());
hadoopOutputFormat.close();
verify(outputCommitter, times(0)).commitTask(any(TaskAttemptContext.class));
verify(recordWriter, times(1)).close(any(TaskAttemptContext.class));
}
@Test
public void testConfigure() throws Exception {
ConfigurableDummyOutputFormat outputFormat = mock(ConfigurableDummyOutputFormat.class);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(outputFormat, Job.getInstance(),
null, null, new Configuration());
hadoopOutputFormat.configure(new org.apache.flink.configuration.Configuration());
verify(outputFormat, times(1)).setConf(any(Configuration.class));
}
@Test
public void testFinalizedGlobal() throws Exception {
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
Job.getInstance(), null, null, new Configuration());
hadoopOutputFormat.finalizeGlobal(1);
verify(hadoopOutputFormat.outputCommitter, times(1)).commitJob(any(JobContext.class));
}
private OutputCommitter setupOutputCommitter(boolean needsTaskCommit) throws IOException {
OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(needsTaskCommit);
doNothing().when(outputCommitter).commitTask(any(TaskAttemptContext.class));
return outputCommitter;
}
private HadoopOutputFormat<String, Long> setupHadoopOutputFormat(
OutputFormat<String, Long> outputFormat,
Job job,
RecordWriter<String, Long> recordWriter,
OutputCommitter outputCommitter,
Configuration configuration) {
HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(outputFormat, job);
hadoopOutputFormat.recordWriter = recordWriter;
hadoopOutputFormat.outputCommitter = outputCommitter;
hadoopOutputFormat.configuration = configuration;
hadoopOutputFormat.configuration.set(MAPRED_OUTPUT_DIR_KEY, MAPRED_OUTPUT_PATH);
return hadoopOutputFormat;
}
class DummyRecordWriter extends RecordWriter<String, Long> {
@Override
public void write(String key, Long value) throws IOException, InterruptedException {
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
}
}
class DummyOutputFormat extends OutputFormat<String, Long> {
@Override
public RecordWriter<String, Long> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
return null;
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
doNothing().when(outputCommitter).setupJob(any(JobContext.class));
return outputCommitter;
}
}
class ConfigurableDummyOutputFormat extends DummyOutputFormat implements Configurable {
@Override
public void setConf(Configuration configuration) {}
@Override
public Configuration getConf() {
return null;
}
}
}