blob: 6032b008e22d46536f7da0f67cf33a0b457b2864 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sorter;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.values.KV;
/**
* {@link Sorter} that will use in memory sorting until the values can't fit into memory and will
* then fall back to external sorting.
*/
public class BufferedExternalSorter implements Sorter {
public static Options options() {
return new Options("/tmp", 100);
}
/** Contains configuration for the sorter. */
public static class Options implements Serializable {
private final String tempLocation;
private final int memoryMB;
private Options(String tempLocation, int memoryMB) {
this.tempLocation = tempLocation;
this.memoryMB = memoryMB;
}
/** Sets the path to a temporary location where the sorter writes intermediate files. */
public Options withTempLocation(String tempLocation) {
checkArgument(
!tempLocation.startsWith("gs://"),
"BufferedExternalSorter does not support GCS temporary location");
return new Options(tempLocation, memoryMB);
}
/** Returns the configured temporary location. */
public String getTempLocation() {
return tempLocation;
}
/**
* Sets the size of the memory buffer in megabytes. This controls both the buffer for initial in
* memory sorting and the buffer used when external sorting. Must be greater than zero and less
* than 2048.
*/
public Options withMemoryMB(int memoryMB) {
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
// Hadoop's external sort stores the number of available memory bytes in an int, this prevents
// overflow
checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
return new Options(tempLocation, memoryMB);
}
/** Returns the configured size of the memory buffer. */
public int getMemoryMB() {
return memoryMB;
}
}
private final ExternalSorter externalSorter;
private InMemorySorter inMemorySorter;
boolean inMemorySorterFull;
BufferedExternalSorter(ExternalSorter externalSorter, InMemorySorter inMemorySorter) {
this.externalSorter = externalSorter;
this.inMemorySorter = inMemorySorter;
}
public static BufferedExternalSorter create(Options options) {
ExternalSorter.Options externalSorterOptions = new ExternalSorter.Options();
externalSorterOptions.setMemoryMB(options.getMemoryMB());
externalSorterOptions.setTempLocation(options.getTempLocation());
InMemorySorter.Options inMemorySorterOptions = new InMemorySorter.Options();
inMemorySorterOptions.setMemoryMB(options.getMemoryMB());
return new BufferedExternalSorter(
ExternalSorter.create(externalSorterOptions), InMemorySorter.create(inMemorySorterOptions));
}
@Override
public void add(KV<byte[], byte[]> record) throws IOException {
if (!inMemorySorterFull) {
if (inMemorySorter.addIfRoom(record)) {
return;
} else {
// Flushing contents of in memory sorter to external sorter so we can rely on external
// from here on out
inMemorySorterFull = true;
transferToExternalSorter();
}
}
// In memory sorter is full, so put in external sorter instead
externalSorter.add(record);
}
/**
* Transfers all of the records loaded so far into the in memory sorter over to the external
* sorter.
*/
private void transferToExternalSorter() throws IOException {
for (KV<byte[], byte[]> record : inMemorySorter.sort()) {
externalSorter.add(record);
}
// Allow in memory sorter and its contents to be garbage collected
inMemorySorter = null;
}
@Override
public Iterable<KV<byte[], byte[]>> sort() throws IOException {
if (!inMemorySorterFull) {
return inMemorySorter.sort();
} else {
return externalSorter.sort();
}
}
}