blob: 8aa280d831bf8372b5387fd3582062a8db6551f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sorter;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.UnsignedBytes;
/**
* Sorts {@code <key, value>} pairs in memory. Based on the configured size of the memory buffer,
* will reject additional pairs.
*/
class InMemorySorter implements Sorter {
/** {@code Options} contains configuration of the sorter. */
public static class Options implements Serializable {
private long memoryMB = 100;
/** Sets the size of the memory buffer in megabytes. */
public void setMemoryMB(long memoryMB) {
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
this.memoryMB = memoryMB;
}
/** Returns the configured size of the memory buffer. */
public long getMemoryMB() {
return memoryMB;
}
}
/** The comparator to use to sort the records by key. */
private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
/** How many bytes per word in the running JVM. Assumes 64 bit/8 bytes if unknown. */
private static final long NUM_BYTES_PER_WORD = getNumBytesPerWord();
/**
* Estimate of memory overhead per KV record in bytes not including memory associated with keys
* and values.
*
* <ul>
* <li>Object reference within {@link ArrayList} (1 word),
* <li>A {@link KV} (2 words),
* <li>Two byte arrays (2 words for array lengths),
* <li>Per-object overhead (JVM-specific, guessing 2 words * 3 objects)
* </ul>
*/
private static final long RECORD_MEMORY_OVERHEAD_ESTIMATE = 11 * NUM_BYTES_PER_WORD;
/** Maximum size of the buffer in bytes. */
private final long maxBufferSize;
/** Current number of stored bytes. Including estimated overhead bytes. */
private long numBytes;
/** Whether sort has been called. */
private boolean sortCalled;
/** The stored records to be sorted. */
private final ArrayList<KV<byte[], byte[]>> records = new ArrayList<>();
/** Private constructor. */
private InMemorySorter(Options options) {
maxBufferSize = options.getMemoryMB() * 1024L * 1024L;
}
/** Create a new sorter from provided options. */
public static InMemorySorter create(Options options) {
return new InMemorySorter(options);
}
@Override
public void add(KV<byte[], byte[]> record) {
checkState(addIfRoom(record), "No space remaining for in memory sorting");
}
/** Adds the record is there is room and returns true. Otherwise returns false. */
public boolean addIfRoom(KV<byte[], byte[]> record) {
checkState(!sortCalled, "Records can only be added before sort()");
long recordBytes = estimateRecordBytes(record);
if (roomInBuffer(numBytes + recordBytes, records.size() + 1L)) {
records.add(record);
numBytes += recordBytes;
return true;
} else {
return false;
}
}
@Override
public Iterable<KV<byte[], byte[]>> sort() {
checkState(!sortCalled, "sort() can only be called once.");
sortCalled = true;
Comparator<KV<byte[], byte[]>> kvComparator =
(o1, o2) -> COMPARATOR.compare(o1.getKey(), o2.getKey());
records.sort(kvComparator);
return Collections.unmodifiableList(records);
}
/**
* Estimate the number of additional bytes required to store this record. Including the key, the
* value and any overhead for objects and references.
*/
private long estimateRecordBytes(KV<byte[], byte[]> record) {
return RECORD_MEMORY_OVERHEAD_ESTIMATE + record.getKey().length + record.getValue().length;
}
/**
* Check whether we have room to store the provided total number of bytes and total number of
* records.
*/
private boolean roomInBuffer(long numBytes, long numRecords) {
// Collections.sort may allocate up to n/2 extra object references.
// Also, ArrayList grows by a factor of 1.5x, so there might be up to n/2 null object
// references in the backing array.
// And finally, in Java 7, Collections.sort performs a defensive copy to an array in case the
// input list is a LinkedList.
// So assume we need an additional overhead of two words per record in the worst case
return (numBytes + (numRecords * NUM_BYTES_PER_WORD * 2)) < maxBufferSize;
}
/**
* Returns the number of bytes in a word according to the JVM. Defaults to 8 for 64 bit if answer
* unknown.
*/
private static long getNumBytesPerWord() {
String bitsPerWord = System.getProperty("sun.arch.data.model");
try {
return Long.parseLong(bitsPerWord) / 8;
} catch (Exception e) {
// Can't determine whether 32 or 64 bit, so assume 64
return 8;
}
}
}