blob: b02ca93b0fb39387565021e53e1afbf791f29ec5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.manual;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.operators.hash.MutableHashTable;
import org.apache.flink.util.MutableObjectIterator;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.fail;
/** Manual test for growing hash tables. */
public class HashTableRecordWidthCombinations {
public static void main(String[] args) throws Exception {
@SuppressWarnings("unchecked")
final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer =
new TupleSerializer<Tuple2<Long, byte[]>>(
(Class<Tuple2<Long, byte[]>>) (Class<?>) Tuple2.class,
new TypeSerializer<?>[] {
LongSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE
});
final TypeSerializer<Long> probeSerializer = LongSerializer.INSTANCE;
final TypeComparator<Tuple2<Long, byte[]>> buildComparator =
new TupleComparator<Tuple2<Long, byte[]>>(
new int[] {0},
new TypeComparator<?>[] {new LongComparator(true)},
new TypeSerializer<?>[] {LongSerializer.INSTANCE});
final TypeComparator<Long> probeComparator = new LongComparator(true);
final TypePairComparator<Long, Tuple2<Long, byte[]>> pairComparator =
new TypePairComparator<Long, Tuple2<Long, byte[]>>() {
private long ref;
@Override
public void setReference(Long reference) {
ref = reference;
}
@Override
public boolean equalToReference(Tuple2<Long, byte[]> candidate) {
//noinspection UnnecessaryUnboxing
return candidate.f0.longValue() == ref;
}
@Override
public int compareToReference(Tuple2<Long, byte[]> candidate) {
long x = ref;
long y = candidate.f0;
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
};
try (final IOManager ioMan = new IOManagerAsync()) {
final int pageSize = 32 * 1024;
final int numSegments = 34;
for (int num = 3400; num < 3550; num++) {
final int numRecords = num;
for (int recordLen = 270; recordLen < 320; recordLen++) {
final byte[] payload = new byte[recordLen - 8 - 4];
System.out.println("testing " + numRecords + " / " + recordLen);
List<MemorySegment> memory = getMemory(numSegments, pageSize);
// we create a hash table that thinks the records are super large. that makes it
// choose initially
// a lot of memory for the partition buffers, and start with a smaller hash
// table. that way
// we trigger a hash table growth early.
MutableHashTable<Tuple2<Long, byte[]>, Long> table =
new MutableHashTable<>(
buildSerializer,
probeSerializer,
buildComparator,
probeComparator,
pairComparator,
memory,
ioMan,
16,
false);
final MutableObjectIterator<Tuple2<Long, byte[]>> buildInput =
new MutableObjectIterator<Tuple2<Long, byte[]>>() {
private int count = 0;
@Override
public Tuple2<Long, byte[]> next(Tuple2<Long, byte[]> reuse) {
return next();
}
@Override
public Tuple2<Long, byte[]> next() {
if (count++ < numRecords) {
return new Tuple2<>(42L, payload);
} else {
return null;
}
}
};
// probe side
final MutableObjectIterator<Long> probeInput =
new MutableObjectIterator<Long>() {
private final long numRecords = 10000;
private long value = 0;
@Override
public Long next(Long aLong) {
return next();
}
@Override
public Long next() {
if (value < numRecords) {
return value++;
} else {
return null;
}
}
};
table.open(buildInput, probeInput);
try {
while (table.nextRecord()) {
MutableObjectIterator<Tuple2<Long, byte[]>> matches =
table.getBuildSideIterator();
while (matches.next() != null) {}
}
} catch (RuntimeException e) {
if (!e.getMessage().contains("exceeded maximum number of recursions")) {
throw e;
}
} finally {
table.close();
}
// make sure no temp files are left
checkNoTempFilesRemain(ioMan);
}
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
for (int i = 0; i < numSegments; i++) {
list.add(MemorySegmentFactory.allocateUnpooledSegment(segmentSize));
}
return list;
}
private static void checkNoTempFilesRemain(IOManager ioManager) {
for (File dir : ioManager.getSpillingDirectories()) {
for (String file : dir.list()) {
if (file != null && !(file.equals(".") || file.equals(".."))) {
fail("hash table did not clean up temp files. remaining file: " + file);
}
}
}
}
}