blob: e54b0acb833ec3dbd44dba5587afdcb0f46be118 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.hadoop.io.IntWritable;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.NoSuchElementException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link HadoopTupleUnwrappingIterator}. */
class HadoopTupleUnwrappingIteratorTest {
@Test
void testValueIterator() {
HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt =
new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(
new WritableSerializer<IntWritable>(IntWritable.class));
// many values
ArrayList<Tuple2<IntWritable, IntWritable>> tList =
new ArrayList<Tuple2<IntWritable, IntWritable>>();
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(1)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(2)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(3)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(4)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(5)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(6)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(7)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(8)));
int expectedKey = 1;
int[] expectedValues = new int[] {1, 2, 3, 4, 5, 6, 7, 8};
valIt.set(tList.iterator());
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
for (int expectedValue : expectedValues) {
assertThat(valIt.hasNext()).isTrue();
assertThat(valIt.next().get()).isEqualTo(expectedValue);
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
}
assertThat(valIt.hasNext()).isFalse();
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
// one value
tList.clear();
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2), new IntWritable(10)));
expectedKey = 2;
expectedValues = new int[] {10};
valIt.set(tList.iterator());
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
for (int expectedValue : expectedValues) {
assertThat(valIt.hasNext()).isTrue();
assertThat(valIt.next().get()).isEqualTo(expectedValue);
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
}
assertThat(valIt.hasNext()).isFalse();
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
// more values
tList.clear();
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(10)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(4)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(7)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(9)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(21)));
expectedKey = 3;
expectedValues = new int[] {10, 4, 7, 9, 21};
valIt.set(tList.iterator());
assertThat(valIt.hasNext()).isTrue();
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
for (int expectedValue : expectedValues) {
assertThat(valIt.hasNext()).isTrue();
assertThat(valIt.next().get()).isEqualTo(expectedValue);
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
}
assertThat(valIt.hasNext()).isFalse();
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
// no has next calls
tList.clear();
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(5)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(8)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(42)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(-1)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(0)));
expectedKey = 4;
expectedValues = new int[] {5, 8, 42, -1, 0};
valIt.set(tList.iterator());
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
for (int expectedValue : expectedValues) {
assertThat(valIt.next().get()).isEqualTo(expectedValue);
}
assertThatThrownBy(() -> valIt.next()).isInstanceOf(NoSuchElementException.class);
assertThat(valIt.hasNext()).isFalse();
assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
}
}