blob: 3390987005bc5157159a0a881c5e85f675d19018 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.hadoop.mapred.wrapper;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link HadoopInputSplit}. */
class HadoopInputSplitTest {
private JobConf conf;
@BeforeEach
void before() {
Configuration configuration = new Configuration();
for (int i = 0; i < 10000; i++) {
configuration.set("key-" + i, "value-" + i);
}
this.conf = new JobConf(configuration);
}
private void testInner(
FileSplit fileSplit,
Consumer<Integer> serializeSizeChecker,
Consumer<InputSplit> splitChecker)
throws IOException, ClassNotFoundException {
HadoopInputSplit split = new HadoopInputSplit(5, fileSplit, conf);
byte[] bytes = InstantiationUtil.serializeObject(split);
serializeSizeChecker.accept(bytes.length);
split = InstantiationUtil.deserializeObject(bytes, split.getClass().getClassLoader());
assertThat(split.getSplitNumber()).isEqualTo(5);
assertThat(split.getHostnames()).containsExactly("host0");
splitChecker.accept(split.getHadoopInputSplit());
}
@Test
void testFileSplit() throws IOException, ClassNotFoundException {
FileSplit fileSplit = new FileSplit(new Path("/test"), 0, 100, new String[] {"host0"});
testInner(
fileSplit,
i -> assertThat(i).isLessThan(10000),
split -> assertThat(split).isEqualTo(fileSplit));
}
@Test
void testConfigurable() throws IOException, ClassNotFoundException {
ConfigurableFileSplit fileSplit =
new ConfigurableFileSplit(new Path("/test"), 0, 100, new String[] {"host0"});
testInner(
fileSplit,
i -> {},
inputSplit -> {
ConfigurableFileSplit split = (ConfigurableFileSplit) inputSplit;
assertThat(split.getConf()).isNotNull();
assertThat(split).isEqualTo(fileSplit);
});
}
@Test
void testJobConfigurable() throws IOException, ClassNotFoundException {
JobConfigurableFileSplit fileSplit =
new JobConfigurableFileSplit(new Path("/test"), 0, 100, new String[] {"host0"});
testInner(
fileSplit,
i -> {},
inputSplit -> {
JobConfigurableFileSplit split = (JobConfigurableFileSplit) inputSplit;
assertThat(split.getConf()).isNotNull();
assertThat(split).isEqualTo(fileSplit);
});
}
/**
* Because of test class conflict, we can not use hadoop FileSplit, so we create a new FileSplit
* to test.
*/
private static class FileSplit implements InputSplit {
private Path file;
private long start;
private long length;
private String[] hosts;
public FileSplit() {}
private FileSplit(Path file, long start, long length, String[] hosts) {
this.file = file;
this.start = start;
this.length = length;
this.hosts = hosts;
}
@Override
public long getLength() throws IOException {
return length;
}
@Override
public String[] getLocations() throws IOException {
return hosts;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(file.toString());
out.writeLong(start);
out.writeLong(length);
out.writeInt(hosts.length);
for (String host : hosts) {
out.writeUTF(host);
}
}
@Override
public void readFields(DataInput in) throws IOException {
file = new Path(in.readUTF());
start = in.readLong();
length = in.readLong();
int size = in.readInt();
hosts = new String[size];
for (int i = 0; i < size; i++) {
hosts[i] = in.readUTF();
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FileSplit fileSplit = (FileSplit) o;
return start == fileSplit.start
&& length == fileSplit.length
&& Objects.equals(file, fileSplit.file)
&& Arrays.equals(hosts, fileSplit.hosts);
}
}
private static class ConfigurableFileSplit extends FileSplit implements Configurable {
private Configuration conf;
public ConfigurableFileSplit() {}
private ConfigurableFileSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts);
}
@Override
public void setConf(Configuration configuration) {
this.conf = configuration;
}
@Override
public Configuration getConf() {
return conf;
}
}
private static class JobConfigurableFileSplit extends FileSplit implements JobConfigurable {
private JobConf jobConf;
public JobConfigurableFileSplit() {}
private JobConfigurableFileSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts);
}
@Override
public void configure(JobConf jobConf) {
this.jobConf = jobConf;
}
private JobConf getConf() {
return jobConf;
}
}
}