blob: a220a2fa194e768ce63dfdd349a5e4ce79fe1553 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.manual;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.Sorter;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Assert;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;
/** Test {@link ExternalSorter} on a large set of {@link StringValue}. */
public class MassiveStringValueSorting {
private static final long SEED = 347569784659278346L;
public void testStringValueSorting() {
File input = null;
File sorted = null;
try {
// the source file
input =
generateFileWithStrings(
300000, "http://some-uri.com/that/is/a/common/prefix/to/all");
// the sorted file
sorted = File.createTempFile("sorted_strings", "txt");
String[] command = {
"/bin/bash",
"-c",
"export LC_ALL=\"C\" && cat \""
+ input.getAbsolutePath()
+ "\" | sort > \""
+ sorted.getAbsolutePath()
+ "\""
};
Process p = null;
try {
p = Runtime.getRuntime().exec(command);
int retCode = p.waitFor();
if (retCode != 0) {
throw new Exception("Command failed with return code " + retCode);
}
p = null;
} finally {
if (p != null) {
p.destroy();
}
}
// sort the data
Sorter<StringValue> sorter = null;
BufferedReader reader = null;
BufferedReader verifyReader = null;
MemoryManager mm = null;
try (IOManager ioMan = new IOManagerAsync()) {
mm = MemoryManagerBuilder.newBuilder().setMemorySize(1024 * 1024).build();
TypeSerializer<StringValue> serializer =
new CopyableValueSerializer<StringValue>(StringValue.class);
TypeComparator<StringValue> comparator =
new CopyableValueComparator<StringValue>(true, StringValue.class);
reader = new BufferedReader(new FileReader(input));
MutableObjectIterator<StringValue> inputIterator =
new StringValueReaderMutableObjectIterator(reader);
sorter =
ExternalSorter.newBuilder(mm, new DummyInvokable(), serializer, comparator)
.maxNumFileHandles(128)
.enableSpilling(ioMan, 0.8f)
.memoryFraction(1.0)
.objectReuse(true)
.largeRecords(true)
.build(inputIterator);
MutableObjectIterator<StringValue> sortedData = sorter.getIterator();
reader.close();
// verify
verifyReader = new BufferedReader(new FileReader(sorted));
String nextVerify;
StringValue nextFromFlinkSort = new StringValue();
while ((nextVerify = verifyReader.readLine()) != null) {
nextFromFlinkSort = sortedData.next(nextFromFlinkSort);
Assert.assertNotNull(nextFromFlinkSort);
Assert.assertEquals(nextVerify, nextFromFlinkSort.getValue());
}
} finally {
if (reader != null) {
reader.close();
}
if (verifyReader != null) {
verifyReader.close();
}
if (sorter != null) {
sorter.close();
}
if (mm != null) {
mm.shutdown();
}
}
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
if (input != null) {
//noinspection ResultOfMethodCallIgnored
input.delete();
}
if (sorted != null) {
//noinspection ResultOfMethodCallIgnored
sorted.delete();
}
}
}
@SuppressWarnings("unchecked")
public void testStringValueTuplesSorting() {
final int numStrings = 300000;
File input = null;
File sorted = null;
try {
// the source file
input =
generateFileWithStringTuples(
numStrings, "http://some-uri.com/that/is/a/common/prefix/to/all");
// the sorted file
sorted = File.createTempFile("sorted_strings", "txt");
String[] command = {
"/bin/bash",
"-c",
"export LC_ALL=\"C\" && cat \""
+ input.getAbsolutePath()
+ "\" | sort > \""
+ sorted.getAbsolutePath()
+ "\""
};
Process p = null;
try {
p = Runtime.getRuntime().exec(command);
int retCode = p.waitFor();
if (retCode != 0) {
throw new Exception("Command failed with return code " + retCode);
}
p = null;
} finally {
if (p != null) {
p.destroy();
}
}
// sort the data
Sorter<Tuple2<StringValue, StringValue[]>> sorter = null;
BufferedReader reader = null;
BufferedReader verifyReader = null;
MemoryManager mm = null;
try (IOManager ioMan = new IOManagerAsync()) {
mm = MemoryManagerBuilder.newBuilder().setMemorySize(1024 * 1024).build();
TupleTypeInfo<Tuple2<StringValue, StringValue[]>> typeInfo =
(TupleTypeInfo<Tuple2<StringValue, StringValue[]>>)
new TypeHint<Tuple2<StringValue, StringValue[]>>() {}.getTypeInfo();
TypeSerializer<Tuple2<StringValue, StringValue[]>> serializer =
typeInfo.createSerializer(new ExecutionConfig());
TypeComparator<Tuple2<StringValue, StringValue[]>> comparator =
typeInfo.createComparator(
new int[] {0}, new boolean[] {true}, 0, new ExecutionConfig());
reader = new BufferedReader(new FileReader(input));
MutableObjectIterator<Tuple2<StringValue, StringValue[]>> inputIterator =
new StringValueTupleReaderMutableObjectIterator(reader);
sorter =
ExternalSorter.newBuilder(mm, new DummyInvokable(), serializer, comparator)
.maxNumFileHandles(4)
.enableSpilling(ioMan, 0.8f)
.memoryFraction(1.0)
.objectReuse(false)
.largeRecords(true)
.build(inputIterator);
// use this part to verify that all if good when sorting in memory
// List<MemorySegment> memory = mm.allocatePages(new DummyInvokable(),
// mm.computeNumberOfPages(1024*1024*1024));
// NormalizedKeySorter<Tuple2<String, String[]>> nks = new
// NormalizedKeySorter<Tuple2<String,String[]>>(serializer, comparator, memory);
//
// {
// Tuple2<String, String[]> wi = new Tuple2<String, String[]>("", new
// String[0]);
// while ((wi = inputIterator.next(wi)) != null) {
// Assert.assertTrue(nks.write(wi));
// }
//
// new QuickSort().sort(nks);
// }
//
// MutableObjectIterator<Tuple2<String, String[]>> sortedData =
// nks.getIterator();
MutableObjectIterator<Tuple2<StringValue, StringValue[]>> sortedData =
sorter.getIterator();
reader.close();
// verify
verifyReader = new BufferedReader(new FileReader(sorted));
MutableObjectIterator<Tuple2<StringValue, StringValue[]>> verifyIterator =
new StringValueTupleReaderMutableObjectIterator(verifyReader);
Tuple2<StringValue, StringValue[]> nextVerify =
new Tuple2<StringValue, StringValue[]>(
new StringValue(), new StringValue[0]);
Tuple2<StringValue, StringValue[]> nextFromFlinkSort =
new Tuple2<StringValue, StringValue[]>(
new StringValue(), new StringValue[0]);
int num = 0;
while ((nextVerify = verifyIterator.next(nextVerify)) != null) {
num++;
nextFromFlinkSort = sortedData.next(nextFromFlinkSort);
Assert.assertNotNull(nextFromFlinkSort);
Assert.assertEquals(nextVerify.f0, nextFromFlinkSort.f0);
Assert.assertArrayEquals(nextVerify.f1, nextFromFlinkSort.f1);
}
Assert.assertNull(sortedData.next(nextFromFlinkSort));
Assert.assertEquals(numStrings, num);
} finally {
if (reader != null) {
reader.close();
}
if (verifyReader != null) {
verifyReader.close();
}
if (sorter != null) {
sorter.close();
}
if (mm != null) {
mm.shutdown();
}
}
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
if (input != null) {
//noinspection ResultOfMethodCallIgnored
input.delete();
}
if (sorted != null) {
//noinspection ResultOfMethodCallIgnored
sorted.delete();
}
}
}
// --------------------------------------------------------------------------------------------
private static final class StringValueReaderMutableObjectIterator
implements MutableObjectIterator<StringValue> {
private final BufferedReader reader;
public StringValueReaderMutableObjectIterator(BufferedReader reader) {
this.reader = reader;
}
@Override
public StringValue next(StringValue reuse) throws IOException {
String line = reader.readLine();
if (line == null) {
return null;
}
reuse.setValue(line);
return reuse;
}
@Override
public StringValue next() throws IOException {
return next(new StringValue());
}
}
private static final class StringValueTupleReaderMutableObjectIterator
implements MutableObjectIterator<Tuple2<StringValue, StringValue[]>> {
private final BufferedReader reader;
public StringValueTupleReaderMutableObjectIterator(BufferedReader reader) {
this.reader = reader;
}
@Override
public Tuple2<StringValue, StringValue[]> next(Tuple2<StringValue, StringValue[]> reuse)
throws IOException {
String line = reader.readLine();
if (line == null) {
return null;
}
String[] parts = line.split(" ");
reuse.f0.setValue(parts[0]);
reuse.f1 = new StringValue[parts.length];
for (int i = 0; i < parts.length; i++) {
reuse.f1[i] = new StringValue(parts[i]);
}
return reuse;
}
@Override
public Tuple2<StringValue, StringValue[]> next() throws IOException {
return next(
new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0]));
}
}
// --------------------------------------------------------------------------------------------
private File generateFileWithStrings(int numStrings, String prefix) throws IOException {
final Random rnd = new Random(SEED);
final StringBuilder bld = new StringBuilder();
final int resetValue = prefix.length();
bld.append(prefix);
File f = File.createTempFile("strings", "txt");
BufferedWriter wrt = null;
try {
wrt = new BufferedWriter(new FileWriter(f));
for (int i = 0; i < numStrings; i++) {
bld.setLength(resetValue);
int len = rnd.nextInt(20) + 300;
for (int k = 0; k < len; k++) {
char c = (char) (rnd.nextInt(80) + 40);
bld.append(c);
}
String str = bld.toString();
wrt.write(str);
wrt.newLine();
}
} finally {
if (wrt != null) {
wrt.close();
}
}
return f;
}
private File generateFileWithStringTuples(int numStrings, String prefix) throws IOException {
final Random rnd = new Random(SEED);
final StringBuilder bld = new StringBuilder();
File f = File.createTempFile("strings", "txt");
BufferedWriter wrt = null;
try {
wrt = new BufferedWriter(new FileWriter(f));
for (int i = 0; i < numStrings; i++) {
bld.setLength(0);
int numComps = rnd.nextInt(5) + 1;
for (int z = 0; z < numComps; z++) {
if (z > 0) {
bld.append(' ');
}
bld.append(prefix);
int len = rnd.nextInt(20) + 10;
for (int k = 0; k < len; k++) {
char c = (char) (rnd.nextInt(80) + 40);
bld.append(c);
}
}
String str = bld.toString();
wrt.write(str);
wrt.newLine();
}
} finally {
if (wrt != null) {
wrt.close();
}
}
return f;
}
// --------------------------------------------------------------------------------------------
public static void main(String[] args) {
new MassiveStringValueSorting().testStringValueSorting();
new MassiveStringValueSorting().testStringValueTuplesSorting();
}
}