blob: d59526f986dbebab12122df2e107bb89b1a00b6d [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.beam.runners.dataflow.worker;
import static;
import static;
import static;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.Footer;
import org.apache.beam.runners.dataflow.internal.IsmFormat.FooterCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmShard;
import org.apache.beam.runners.dataflow.internal.IsmFormat.KeyPrefix;
import org.apache.beam.runners.dataflow.internal.IsmFormat.KeyPrefixCoder;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import org.apache.beam.runners.dataflow.worker.util.ScalableBloomFilter;
import org.apache.beam.runners.dataflow.worker.util.ScalableBloomFilter.ScalableBloomFilterCoder;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.util.WindowedValue;
* A {@link Sink} that writes Ism files.
* @param <V> the type of the value written to the sink
"nullness" // TODO(
public class IsmSink<V> extends Sink<WindowedValue<IsmRecord<V>>> {
static final int BLOCK_SIZE_BYTES = 1024 * 1024;
private final ResourceId resourceId;
private final IsmRecordCoder<V> coder;
private final long bloomFilterSizeLimitBytes;
* Produces a sink for the specified {@code filename} and {@code coder}. See {@link IsmFormat} for
* encoded format details.
IsmSink(ResourceId resourceId, IsmRecordCoder<V> coder, long bloomFilterSizeLimitBytes) {
this.resourceId = resourceId;
this.coder = coder;
this.bloomFilterSizeLimitBytes = bloomFilterSizeLimitBytes;
public SinkWriter<WindowedValue<IsmRecord<V>>> writer() throws IOException {
return new IsmSinkWriter(FileSystems.create(resourceId, MimeTypes.BINARY));
// Can be overridden by tests to generate files with smaller block sizes for testing.
long getBlockSize() {
private class IsmSinkWriter implements SinkWriter<WindowedValue<IsmRecord<V>>> {
private final CountingOutputStream out;
private final RandomAccessData indexOut;
private RandomAccessData previousKeyBytes;
private Optional<Integer> previousShard;
private RandomAccessData currentKeyBytes;
private RandomAccessData lastIndexKeyBytes;
private long lastIndexedPosition;
private long numberOfKeysWritten;
private SortedMap<Integer, IsmShard> shardKeyToShardMap;
private final ScalableBloomFilter.Builder bloomFilterBuilder;
/** Creates an IsmSinkWriter for the given channel. */
private IsmSinkWriter(WritableByteChannel channel) {
out = new CountingOutputStream(Channels.newOutputStream(channel));
indexOut = new RandomAccessData();
previousShard = Optional.absent();
previousKeyBytes = new RandomAccessData();
currentKeyBytes = new RandomAccessData();
lastIndexKeyBytes = new RandomAccessData();
bloomFilterBuilder = ScalableBloomFilter.withMaximumSizeBytes(bloomFilterSizeLimitBytes);
shardKeyToShardMap = new TreeMap<>();
public long add(WindowedValue<IsmRecord<V>> windowedRecord) throws IOException {
// The windowed portion of the value is ignored.
IsmRecord<V> record = windowedRecord.getValue();
checkArgument(coder.getKeyComponentCoders().size() == record.getKeyComponents().size());
List<Integer> keyOffsetPositions = new ArrayList<>();
final int currentShard =
coder.encodeAndHash(record.getKeyComponents(), currentKeyBytes, keyOffsetPositions);
// Put each component of the key into the Bloom filter so that we can use the Bloom
// filter for key prefix checks.
for (Integer offsetPosition : keyOffsetPositions) {
bloomFilterBuilder.put(currentKeyBytes.array(), 0, offsetPosition);
// If we are moving to another shard, finish outputting the last shard.
if (previousShard.isPresent() && currentShard != previousShard.get()) {
// We reset last shard to be empty.
long positionOfCurrentKey = out.getCount();
// If we are outputting our first key for this shard, then we can assume 0 bytes are saved
// from the previous key.
int sharedKeySize;
if (!previousShard.isPresent()) {
sharedKeySize = 0;
// Create a new shard record for the current value being output validating
// that we have never seen this shard before.
IsmShard ismShard = IsmShard.of(currentShard, positionOfCurrentKey);
shardKeyToShardMap.put(currentShard, ismShard) == null,
"Unexpected insertion of keys %s for shard which already exists %s. "
+ "Ism files expect that all shards are written contiguously.",
} else {
sharedKeySize = commonPrefixLengthWithOrderCheck(previousKeyBytes, currentKeyBytes);
// Put key-value mapping record into block buffer
int unsharedKeySize = currentKeyBytes.size() - sharedKeySize;
KeyPrefix keyPrefix = KeyPrefix.of(sharedKeySize, unsharedKeySize);
KeyPrefixCoder.of().encode(keyPrefix, out);
currentKeyBytes.writeTo(out, sharedKeySize, unsharedKeySize);
if (IsmFormat.isMetadataKey(record.getKeyComponents())) {
ByteArrayCoder.of().encode(record.getMetadata(), out);
} else {
coder.getValueCoder().encode(record.getValue(), out);
// If the start of the current elements position is more than block size away from our
// last indexed position
if (positionOfCurrentKey > lastIndexedPosition + getBlockSize()) {
// Note that the first key of each shard will never be in the index allowing us to ignore
// the fact that there is an empty key and use the lastIndexKeyBytes to be the 0 byte array
// at the start of each shard.
int sharedIndexKeySize =
commonPrefixLengthWithOrderCheck(lastIndexKeyBytes, currentKeyBytes);
int unsharedIndexKeySize = currentKeyBytes.size() - sharedIndexKeySize;
KeyPrefix indexKeyPrefix = KeyPrefix.of(sharedIndexKeySize, unsharedIndexKeySize);
KeyPrefixCoder.of().encode(indexKeyPrefix, indexOut.asOutputStream());
indexOut.asOutputStream(), sharedIndexKeySize, unsharedIndexKeySize);
VarInt.encode(positionOfCurrentKey, indexOut.asOutputStream());
currentKeyBytes.writeTo(lastIndexKeyBytes.asOutputStream(), 0, currentKeyBytes.size());
lastIndexedPosition = out.getCount();
// Remember the shard for the current key.
previousShard = Optional.of(currentShard);
// Swap the current key and the previous key, resetting the previous key to be re-used.
RandomAccessData temp = previousKeyBytes;
previousKeyBytes = currentKeyBytes;
currentKeyBytes = temp;
numberOfKeysWritten += 1;
return out.getCount() - positionOfCurrentKey;
* Compute the length of the common prefix of the previous key and the given key and perform a
* key order check. We check that the currently being inserted key is strictly greater than the
* previous key.
private int commonPrefixLengthWithOrderCheck(
RandomAccessData prevKeyBytes, RandomAccessData currentKeyBytes) {
int offset =
prevKeyBytes, currentKeyBytes);
int compare =
prevKeyBytes, currentKeyBytes, offset);
if (compare < 0) {
return offset;
} else if (compare == 0) {
throw new IllegalArgumentException(
+ " expects keys to be written in strictly increasing order but was given "
+ prevKeyBytes
+ " as the previous key and "
+ currentKeyBytes
+ " as the current key. Expected "
+ prevKeyBytes.array()[offset + 1]
+ " <= "
+ currentKeyBytes.array()[offset + 1]
+ " at position "
+ (offset + 1)
+ ".");
} else {
throw new IllegalArgumentException(
+ " expects keys to be written in strictly increasing order but was given "
+ prevKeyBytes
+ " as the previous key and "
+ currentKeyBytes
+ " as the current key. Expected length of previous key "
+ prevKeyBytes.size()
+ " <= "
+ currentKeyBytes.size()
+ " to current key.");
* Outputs the end of a shard. This is done by:
* <ul>
* <li>updating the shard index for the current shard with the index offset
* <li>writing out the index for the shard
* <li>resetting the last indexed position
* <li>forgetting the last shard
* </ul>
private void finishShard() throws IOException {
// Update the last shard record as to the position of the index.
IsmShard ismShard = shardKeyToShardMap.get(previousShard.get());
shardKeyToShardMap.put(previousShard.get(), ismShard.withIndexOffset(out.getCount()));
indexOut.writeTo(out, 0, indexOut.size());
// Reset the last indexed position to the start of the next shard allowing us
// to not need to index the first key.
lastIndexedPosition = out.getCount();
lastIndexKeyBytes = new RandomAccessData();
// Clear the last shard.
previousShard = Optional.absent();
* Completes the construction of the Ism file. This is done by:
* <ul>
* <li>finishing the last shard if present
* <li>writing out the Bloom filter
* <li>writing out the shard index
* <li>writing out the footer
* </ul>
* @throws IOException if an underlying write fails
private void finish() throws IOException {
// Update the last shard if at least one element was written.
if (previousShard.isPresent()) {
long startOfBloomFilter = out.getCount();
ScalableBloomFilterCoder.of().encode(, out);
long startOfIndex = out.getCount();
IsmFormat.ISM_SHARD_INDEX_CODER.encode(new ArrayList<>(shardKeyToShardMap.values()), out);
.encode(Footer.of(startOfIndex, startOfBloomFilter, numberOfKeysWritten), out);
public void close() throws IOException {
public void abort() throws IOException {