blob: 8b3017dc9ad28c7209538e79dbfa7bcc6dd8ff15 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.Footer;
import org.apache.beam.runners.dataflow.internal.IsmFormat.FooterCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmShard;
import org.apache.beam.runners.dataflow.internal.IsmFormat.KeyPrefix;
import org.apache.beam.runners.dataflow.internal.IsmFormat.KeyPrefixCoder;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import org.apache.beam.runners.dataflow.worker.util.ScalableBloomFilter;
import org.apache.beam.runners.dataflow.worker.util.ScalableBloomFilter.ScalableBloomFilterCoder;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.CountingOutputStream;
/**
* A {@link Sink} that writes Ism files.
*
* @param <V> the type of the value written to the sink
*/
public class IsmSink<V> extends Sink<WindowedValue<IsmRecord<V>>> {
static final int BLOCK_SIZE_BYTES = 1024 * 1024;
private final ResourceId resourceId;
private final IsmRecordCoder<V> coder;
private final long bloomFilterSizeLimitBytes;
/**
* Produces a sink for the specified {@code filename} and {@code coder}. See {@link IsmFormat} for
* encoded format details.
*/
IsmSink(ResourceId resourceId, IsmRecordCoder<V> coder, long bloomFilterSizeLimitBytes) {
IsmFormat.validateCoderIsCompatible(coder);
this.resourceId = resourceId;
this.coder = coder;
this.bloomFilterSizeLimitBytes = bloomFilterSizeLimitBytes;
}
@Override
public SinkWriter<WindowedValue<IsmRecord<V>>> writer() throws IOException {
return new IsmSinkWriter(FileSystems.create(resourceId, MimeTypes.BINARY));
}
// Can be overridden by tests to generate files with smaller block sizes for testing.
@VisibleForTesting
long getBlockSize() {
return BLOCK_SIZE_BYTES;
}
private class IsmSinkWriter implements SinkWriter<WindowedValue<IsmRecord<V>>> {
private final CountingOutputStream out;
private final RandomAccessData indexOut;
private RandomAccessData previousKeyBytes;
private Optional<Integer> previousShard;
private RandomAccessData currentKeyBytes;
private RandomAccessData lastIndexKeyBytes;
private long lastIndexedPosition;
private long numberOfKeysWritten;
private SortedMap<Integer, IsmShard> shardKeyToShardMap;
private final ScalableBloomFilter.Builder bloomFilterBuilder;
/** Creates an IsmSinkWriter for the given channel. */
private IsmSinkWriter(WritableByteChannel channel) {
checkNotNull(channel);
out = new CountingOutputStream(Channels.newOutputStream(channel));
indexOut = new RandomAccessData();
previousShard = Optional.absent();
previousKeyBytes = new RandomAccessData();
currentKeyBytes = new RandomAccessData();
lastIndexKeyBytes = new RandomAccessData();
bloomFilterBuilder = ScalableBloomFilter.withMaximumSizeBytes(bloomFilterSizeLimitBytes);
shardKeyToShardMap = new TreeMap<>();
}
@Override
public long add(WindowedValue<IsmRecord<V>> windowedRecord) throws IOException {
// The windowed portion of the value is ignored.
IsmRecord<V> record = windowedRecord.getValue();
checkArgument(coder.getKeyComponentCoders().size() == record.getKeyComponents().size());
List<Integer> keyOffsetPositions = new ArrayList<>();
final int currentShard =
coder.encodeAndHash(record.getKeyComponents(), currentKeyBytes, keyOffsetPositions);
// Put each component of the key into the Bloom filter so that we can use the Bloom
// filter for key prefix checks.
for (Integer offsetPosition : keyOffsetPositions) {
bloomFilterBuilder.put(currentKeyBytes.array(), 0, offsetPosition);
}
// If we are moving to another shard, finish outputting the last shard.
if (previousShard.isPresent() && currentShard != previousShard.get()) {
// We reset last shard to be empty.
finishShard();
}
long positionOfCurrentKey = out.getCount();
// If we are outputting our first key for this shard, then we can assume 0 bytes are saved
// from the previous key.
int sharedKeySize;
if (!previousShard.isPresent()) {
sharedKeySize = 0;
// Create a new shard record for the current value being output validating
// that we have never seen this shard before.
IsmShard ismShard = IsmShard.of(currentShard, positionOfCurrentKey);
checkState(
shardKeyToShardMap.put(currentShard, ismShard) == null,
"Unexpected insertion of keys %s for shard which already exists %s. "
+ "Ism files expect that all shards are written contiguously.",
record.getKeyComponents(),
ismShard);
} else {
sharedKeySize = commonPrefixLengthWithOrderCheck(previousKeyBytes, currentKeyBytes);
}
// Put key-value mapping record into block buffer
int unsharedKeySize = currentKeyBytes.size() - sharedKeySize;
KeyPrefix keyPrefix = KeyPrefix.of(sharedKeySize, unsharedKeySize);
KeyPrefixCoder.of().encode(keyPrefix, out);
currentKeyBytes.writeTo(out, sharedKeySize, unsharedKeySize);
if (IsmFormat.isMetadataKey(record.getKeyComponents())) {
ByteArrayCoder.of().encode(record.getMetadata(), out);
} else {
coder.getValueCoder().encode(record.getValue(), out);
}
// If the start of the current elements position is more than block size away from our
// last indexed position
if (positionOfCurrentKey > lastIndexedPosition + getBlockSize()) {
// Note that the first key of each shard will never be in the index allowing us to ignore
// the fact that there is an empty key and use the lastIndexKeyBytes to be the 0 byte array
// at the start of each shard.
int sharedIndexKeySize =
commonPrefixLengthWithOrderCheck(lastIndexKeyBytes, currentKeyBytes);
int unsharedIndexKeySize = currentKeyBytes.size() - sharedIndexKeySize;
KeyPrefix indexKeyPrefix = KeyPrefix.of(sharedIndexKeySize, unsharedIndexKeySize);
KeyPrefixCoder.of().encode(indexKeyPrefix, indexOut.asOutputStream());
currentKeyBytes.writeTo(
indexOut.asOutputStream(), sharedIndexKeySize, unsharedIndexKeySize);
VarInt.encode(positionOfCurrentKey, indexOut.asOutputStream());
lastIndexKeyBytes.resetTo(0);
currentKeyBytes.writeTo(lastIndexKeyBytes.asOutputStream(), 0, currentKeyBytes.size());
lastIndexedPosition = out.getCount();
}
// Remember the shard for the current key.
previousShard = Optional.of(currentShard);
// Swap the current key and the previous key, resetting the previous key to be re-used.
RandomAccessData temp = previousKeyBytes;
previousKeyBytes = currentKeyBytes;
currentKeyBytes = temp;
currentKeyBytes.resetTo(0);
numberOfKeysWritten += 1;
return out.getCount() - positionOfCurrentKey;
}
/**
* Compute the length of the common prefix of the previous key and the given key and perform a
* key order check. We check that the currently being inserted key is strictly greater than the
* previous key.
*/
private int commonPrefixLengthWithOrderCheck(
RandomAccessData prevKeyBytes, RandomAccessData currentKeyBytes) {
int offset =
RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.commonPrefixLength(
prevKeyBytes, currentKeyBytes);
int compare =
RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
prevKeyBytes, currentKeyBytes, offset);
if (compare < 0) {
return offset;
} else if (compare == 0) {
throw new IllegalArgumentException(
IsmSinkWriter.class.getSimpleName()
+ " expects keys to be written in strictly increasing order but was given "
+ prevKeyBytes
+ " as the previous key and "
+ currentKeyBytes
+ " as the current key. Expected "
+ prevKeyBytes.array()[offset + 1]
+ " <= "
+ currentKeyBytes.array()[offset + 1]
+ " at position "
+ (offset + 1)
+ ".");
} else {
throw new IllegalArgumentException(
IsmSinkWriter.class.getSimpleName()
+ " expects keys to be written in strictly increasing order but was given "
+ prevKeyBytes
+ " as the previous key and "
+ currentKeyBytes
+ " as the current key. Expected length of previous key "
+ prevKeyBytes.size()
+ " <= "
+ currentKeyBytes.size()
+ " to current key.");
}
}
/**
* Outputs the end of a shard. This is done by:
*
* <ul>
* <li>updating the shard index for the current shard with the index offset
* <li>writing out the index for the shard
* <li>resetting the last indexed position
* <li>forgetting the last shard
* </ul>
*/
private void finishShard() throws IOException {
// Update the last shard record as to the position of the index.
IsmShard ismShard = shardKeyToShardMap.get(previousShard.get());
shardKeyToShardMap.put(previousShard.get(), ismShard.withIndexOffset(out.getCount()));
indexOut.writeTo(out, 0, indexOut.size());
indexOut.resetTo(0);
// Reset the last indexed position to the start of the next shard allowing us
// to not need to index the first key.
lastIndexedPosition = out.getCount();
lastIndexKeyBytes = new RandomAccessData();
// Clear the last shard.
previousShard = Optional.absent();
}
/**
* Completes the construction of the Ism file. This is done by:
*
* <ul>
* <li>finishing the last shard if present
* <li>writing out the Bloom filter
* <li>writing out the shard index
* <li>writing out the footer
* </ul>
*
* @throws IOException if an underlying write fails
*/
private void finish() throws IOException {
// Update the last shard if at least one element was written.
if (previousShard.isPresent()) {
finishShard();
}
long startOfBloomFilter = out.getCount();
ScalableBloomFilterCoder.of().encode(bloomFilterBuilder.build(), out);
long startOfIndex = out.getCount();
IsmFormat.ISM_SHARD_INDEX_CODER.encode(new ArrayList<>(shardKeyToShardMap.values()), out);
FooterCoder.of()
.encode(Footer.of(startOfIndex, startOfBloomFilter, numberOfKeysWritten), out);
}
@Override
public void close() throws IOException {
finish();
out.close();
}
@Override
public void abort() throws IOException {
close();
}
}
}