blob: 8f0a1f67a8becdc1ceb678bc523bd95b0a8ba180 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.NonWritableChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.SortedMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.Footer;
import org.apache.beam.runners.dataflow.internal.IsmFormat.FooterCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmShard;
import org.apache.beam.runners.dataflow.internal.IsmFormat.KeyPrefix;
import org.apache.beam.runners.dataflow.internal.IsmFormat.KeyPrefixCoder;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import org.apache.beam.runners.dataflow.worker.util.ScalableBloomFilter;
import org.apache.beam.runners.dataflow.worker.util.ScalableBloomFilter.ScalableBloomFilterCoder;
import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.util.WeightedValue;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSortedMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
/**
* A {@link NativeReader} that reads Ism files.
*
* @param <V> the type of the value written to the sink
*/
// Possible real inconsistency - https://issues.apache.org/jira/browse/BEAM-6560
@SuppressFBWarnings("IS2_INCONSISTENT_SYNC")
public class IsmReaderImpl<V> extends IsmReader<V> {
/**
* This constant represents the distance we would rather read and drop bytes for versus doing an
* actual repositioning of the underlying stream. Tuned for operation within GCS.
*/
private static final int SEEK_VS_READ = 6 * 1024 * 1024;
static final int MAX_SHARD_INDEX_AND_FOOTER_SIZE = 1024 * 1024;
private final ResourceId resourceId;
private final IsmRecordCoder<V> coder;
/** Lazily initialized on first read. */
private long length;
private Footer footer;
/** A map indexed by shard id, storing the Ism shard descriptors. */
private NavigableMap<Integer, IsmShard> shardIdToShardMap;
/**
* A map sorted and indexed by the block offset for a given shard. The sorting is important so the
* {@link LazyIsmPrefixReaderIterator} can read through the file in increasing order.
*/
private NavigableMap<Long, IsmShard> shardOffsetToShardMap;
/** Values lazily initialized per shard on first keyed read of each shard. */
private Map<Integer, ImmutableSortedMap<RandomAccessData, IsmShardKey>> indexPerShard;
ScalableBloomFilter bloomFilter;
/**
* A cache instance which if set on this reader is used to cache blocks of data that are read.
* Each value represents the decoded form of a block.
*/
private final Cache<
IsmShardKey, WeightedValue<NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>>>>
cache;
/**
* Produces a reader for the specified {@code resourceId} and {@code coder}. See {@link IsmFormat}
* for encoded format details.
*/
IsmReaderImpl(
final ResourceId resourceId,
IsmRecordCoder<V> coder,
Cache<IsmShardKey, WeightedValue<NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>>>>
cache) {
checkNotNull(cache);
IsmFormat.validateCoderIsCompatible(coder);
this.resourceId = resourceId;
this.coder = coder;
this.cache = cache;
}
@Override
public IsmPrefixReaderIterator iterator() throws IOException {
SideInputReadCounter readCounter = IsmReader.getCurrentSideInputCounter();
return new LazyIsmPrefixReaderIterator(readCounter);
}
@Override
public IsmPrefixReaderIterator overKeyComponents(List<?> keyComponents) throws IOException {
if (keyComponents.isEmpty()) {
return overKeyComponents(keyComponents, 0, new RandomAccessData(0));
}
RandomAccessData keyBytes = new RandomAccessData();
int shardId = coder.encodeAndHash(keyComponents, keyBytes);
return overKeyComponents(keyComponents, shardId, keyBytes);
}
@Override
public IsmPrefixReaderIterator overKeyComponents(
List<?> keyComponents, int shardId, RandomAccessData keyBytes) throws IOException {
checkNotNull(keyComponents);
checkNotNull(keyBytes);
SideInputReadCounter readCounter = IsmReader.getCurrentSideInputCounter();
if (keyComponents.isEmpty()) {
checkArgument(
shardId == 0 && keyBytes.size() == 0,
"Expected shard id to be 0 and key bytes to be empty "
+ "but got shard id %s and key bytes of length %s",
shardId,
keyBytes.size());
}
checkArgument(
keyComponents.size() <= coder.getKeyComponentCoders().size(),
"Expected at most %s key component(s) but received %s.",
coder.getKeyComponentCoders().size(),
keyComponents);
Optional<SeekableByteChannel> inChannel =
initializeFooterAndShardIndex(Optional.<SeekableByteChannel>absent(), readCounter);
// If this file is empty, we can return an empty iterator.
if (footer.getNumberOfKeys() == 0) {
return new EmptyIsmPrefixReaderIterator(keyComponents);
}
// If not enough key components to figure out which shard was requested, we return a reader
// iterator over all the keys.
if (keyComponents.size() < coder.getNumberOfShardKeyCoders(keyComponents)) {
return new ShardAwareIsmPrefixReaderIterator(
keyComponents, openIfNeeded(inChannel), readCounter);
}
// If this file does not contain the shard or Bloom filter does not contain the key prefix,
// we know that we can return an empty reader iterator.
if (!shardIdToShardMap.containsKey(shardId)) {
return new EmptyIsmPrefixReaderIterator(keyComponents);
}
inChannel = initializeForKeyedRead(shardId, inChannel, readCounter);
closeIfPresent(inChannel);
if (!bloomFilterMightContain(keyBytes)) {
return new EmptyIsmPrefixReaderIterator(keyComponents);
}
// Otherwise we may actually contain the key so construct a reader iterator
// which will fetch the data blocks containing the requested key prefix.
// We find the first key in the index which may contain our prefix
RandomAccessData floorKey = indexPerShard.get(shardId).floorKey(keyBytes);
// We compute an upper bound on the key prefix by incrementing the prefix
RandomAccessData keyBytesUpperBound = keyBytes.increment();
// Compute the sub-range of the index map that we want to iterate over since
// any of these blocks may contain the key prefix.
Iterator<IsmShardKey> blockEntries =
indexPerShard.get(shardId).subMap(floorKey, keyBytesUpperBound).values().iterator();
return new WithinShardIsmPrefixReaderIterator(
keyComponents, keyBytes, keyBytesUpperBound, blockEntries, readCounter);
}
/** Returns whether this ISM reader has been initialized. */
@Override
public boolean isInitialized() {
return footer != null;
}
@Override
public boolean isEmpty() throws IOException {
SideInputReadCounter readCounter = IsmReader.getCurrentSideInputCounter();
closeIfPresent(
initializeFooterAndShardIndex(Optional.<SeekableByteChannel>absent(), readCounter));
return footer.getNumberOfKeys() == 0;
}
@Override
public ResourceId getResourceId() {
return resourceId;
}
@Override
public IsmRecordCoder<V> getCoder() {
return coder;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(IsmReaderImpl.class)
.add("resource id", resourceId)
.add("coder", coder)
.toString();
}
// Overridable by tests to get around the bloom filter not containing any values.
@VisibleForTesting
boolean bloomFilterMightContain(RandomAccessData keyBytes) {
return bloomFilter.mightContain(keyBytes.array(), 0, keyBytes.size());
}
/**
* Initialize this Ism reader by reading the footer and shard index. Returns a channel for re-use
* if this method was required to open one.
*/
private synchronized Optional<SeekableByteChannel> initializeFooterAndShardIndex(
Optional<SeekableByteChannel> inChannel, SideInputReadCounter readCounter)
throws IOException {
if (footer != null) {
checkState(
shardIdToShardMap != null, "Expected shard id to shard map to have been initialized.");
checkState(
shardOffsetToShardMap != null,
"Expected shard offset to shard map to have been initialized.");
return inChannel;
}
checkState(
shardIdToShardMap == null, "Expected shard id to shard map to not have been initialized.");
checkState(
shardOffsetToShardMap == null,
"Expected shard offset to shard map to not have been initialized.");
SeekableByteChannel rawChannel;
RandomAccessData data;
long startPosition;
try (Closeable closeReadCounter = readCounter.enter()) {
rawChannel = openIfNeeded(inChannel);
this.length = rawChannel.size();
// We read the last chunk of data, for small files we will capture the entire file.
// We may capture the Bloom filter, shard index, and footer for slightly larger files.
// Otherwise we are guaranteed to capture the footer and the shard index.
startPosition = Math.max(length - MAX_SHARD_INDEX_AND_FOOTER_SIZE, 0);
position(rawChannel, startPosition);
data = new RandomAccessData(ByteStreams.toByteArray(Channels.newInputStream(rawChannel)));
}
readCounter.addBytesRead(data.size());
// Read the fixed length footer.
this.footer =
FooterCoder.of()
.decode(data.asInputStream(data.size() - Footer.FIXED_LENGTH, Footer.FIXED_LENGTH));
checkState(
startPosition < footer.getIndexPosition(),
"Malformed file, expected to have been able to read entire shard index.");
int offsetWithinReadData = (int) (footer.getIndexPosition() - startPosition);
// Decode the list of Ism shard descriptors
List<IsmShard> ismShards =
IsmFormat.ISM_SHARD_INDEX_CODER.decode(
data.asInputStream(offsetWithinReadData, data.size() - offsetWithinReadData));
// Build the shard id to shard descriptor map
ImmutableSortedMap.Builder<Integer, IsmShard> shardIdToShardMapBuilder =
ImmutableSortedMap.orderedBy(Ordering.<Integer>natural());
for (IsmShard ismShard : ismShards) {
shardIdToShardMapBuilder.put(ismShard.getId(), ismShard);
}
shardIdToShardMap = shardIdToShardMapBuilder.build();
// Build the shard block offset to shard descriptor map
ImmutableSortedMap.Builder<Long, IsmShard> shardOffsetToShardMapBuilder =
ImmutableSortedMap.orderedBy(Ordering.<Long>natural());
for (IsmShard ismShard : ismShards) {
shardOffsetToShardMapBuilder.put(ismShard.getBlockOffset(), ismShard);
}
shardOffsetToShardMap = shardOffsetToShardMapBuilder.build();
// We may have gotten the Bloom filter, if so lets store it.
if (startPosition < footer.getBloomFilterPosition()) {
Optional<SeekableByteChannel> cachedDataChannel =
Optional.<SeekableByteChannel>of(
new CachedTailSeekableByteChannel(startPosition, data.array()));
initializeBloomFilterAndIndexPerShard(cachedDataChannel);
// For small files, we may have read the whole thing during initialization so
// lets cache all this information. Note that this is important for the many small files
// case since the IsmSideInputReader does initialization in parallel.
if (cache != null && startPosition == 0) {
for (IsmShard ismShard : ismShards) {
initializeForKeyedRead(ismShard.getId(), cachedDataChannel, readCounter);
}
for (SortedMap<RandomAccessData, IsmShardKey> shards : indexPerShard.values()) {
for (Map.Entry<RandomAccessData, IsmShardKey> block : shards.entrySet()) {
cache.put(
block.getValue(),
new IsmCacheLoader(block.getValue()).call(cachedDataChannel.get()));
}
}
}
}
return Optional.of(rawChannel);
}
/**
* Initializes the Bloom filter and index per shard. We prepopulate empty indices for shards where
* the index offset matches the following shard block offset. Re-uses the provided channel,
* returning it or a new one if this method was required to open one.
*/
private synchronized Optional<SeekableByteChannel> initializeBloomFilterAndIndexPerShard(
Optional<SeekableByteChannel> inChannel) throws IOException {
if (indexPerShard != null) {
checkState(bloomFilter != null, "Expected Bloom filter to have been initialized.");
return inChannel;
}
SeekableByteChannel rawChannel = openIfNeeded(inChannel);
// Set the position to where the bloom filter is and read it in.
position(rawChannel, footer.getBloomFilterPosition());
bloomFilter = ScalableBloomFilterCoder.of().decode(Channels.newInputStream(rawChannel));
indexPerShard = new HashMap<>();
// If a shard is small, it may not contain an index and we can detect this and
// prepopulate the shard index map with an empty entry if the start of the index
// and start of the next block are equal
Iterator<IsmShard> shardIterator = shardOffsetToShardMap.values().iterator();
// If file is empty we just return here.
if (!shardIterator.hasNext()) {
return Optional.of(rawChannel);
}
// If the current shard's index position is equal to the next shards block offset
// then we know that the index contains no data and we can pre-populate it with
// the empty map.
IsmShard currentShard = shardIterator.next();
while (shardIterator.hasNext()) {
IsmShard nextShard = shardIterator.next();
if (currentShard.getIndexOffset() == nextShard.getBlockOffset()) {
indexPerShard.put(
currentShard.getId(),
ImmutableSortedMap.<RandomAccessData, IsmShardKey>orderedBy(
RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR)
.put(
new RandomAccessData(0),
new IsmShardKey(
IsmReaderImpl.this.resourceId.toString(),
new RandomAccessData(0),
currentShard.getBlockOffset(),
currentShard.getIndexOffset()))
.build());
}
currentShard = nextShard;
}
// Add an entry for the last shard if its index offset is equivalent to the
// start of the Bloom filter, then we know that the index is empty.
if (currentShard.getIndexOffset() == footer.getBloomFilterPosition()) {
indexPerShard.put(
currentShard.getId(),
ImmutableSortedMap.<RandomAccessData, IsmShardKey>orderedBy(
RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR)
.put(
new RandomAccessData(0),
new IsmShardKey(
IsmReaderImpl.this.resourceId.toString(),
new RandomAccessData(0),
currentShard.getBlockOffset(),
currentShard.getIndexOffset()))
.build());
}
return Optional.of(rawChannel);
}
/** A unique key used to fully describe an Ism shard. */
static final class IsmShardKey {
private final String resourceId;
private final RandomAccessData firstKey;
private final long startOffset;
private final long endOffset;
IsmShardKey(String resourceId, RandomAccessData firstKey, long startOffset, long endOffset) {
this.resourceId = resourceId;
this.firstKey = firstKey;
this.startOffset = startOffset;
this.endOffset = endOffset;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(IsmShardKey.class)
.add("resource id", resourceId)
.add("firstKey", firstKey)
.add("startOffset", startOffset)
.add("endOffset", endOffset)
.toString();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof IsmShardKey)) {
return false;
}
IsmShardKey key = (IsmShardKey) obj;
return startOffset == key.startOffset
&& endOffset == key.endOffset
&& Objects.equals(resourceId, key.resourceId)
&& Objects.equals(firstKey, key.firstKey);
}
@Override
public int hashCode() {
int result = resourceId.hashCode();
result = result * 31 + firstKey.hashCode();
result = result * 31 + Longs.hashCode(startOffset);
result = result * 31 + Longs.hashCode(endOffset);
return result;
}
}
/**
* Initializes the footer, shard index, Bloom filter and index for the requested shard id if they
* have not been initialized yet. Re-uses the provided channel, returning it or a new one if this
* method was required to open one.
*/
// Real bug - https://issues.apache.org/jira/browse/BEAM-6559
@SuppressFBWarnings("NP_NULL_ON_SOME_PATH")
private Optional<SeekableByteChannel> initializeForKeyedRead(
int shardId, Optional<SeekableByteChannel> inChannel, SideInputReadCounter readCounter)
throws IOException {
inChannel = initializeFooterAndShardIndex(inChannel, readCounter);
IsmShard shardWithIndex = shardIdToShardMap.get(shardId);
// If this shard id is not within this file, we can return immediately.
if (shardWithIndex == null) {
return inChannel;
}
inChannel = initializeBloomFilterAndIndexPerShard(inChannel);
// If the index has been populated and contains the shard id, we can return.
if (indexPerShard != null && indexPerShard.containsKey(shardId)) {
checkState(bloomFilter != null, "Bloom filter expected to have been initialized.");
return inChannel;
}
checkState(
indexPerShard.get(shardId) == null,
"Expected to not have initialized index for shard %s",
shardId);
Long startOfNextBlock = shardOffsetToShardMap.higherKey(shardWithIndex.getBlockOffset());
// If this is the last block, then we need to grab the position of the Bloom filter
// as the upper bound.
if (startOfNextBlock == null) {
startOfNextBlock = footer.getBloomFilterPosition();
}
// Open the channel if needed and seek to the start of the index.
SeekableByteChannel rawChannel = openIfNeeded(inChannel);
rawChannel.position(shardWithIndex.getIndexOffset());
InputStream inStream = Channels.newInputStream(rawChannel);
ImmutableSortedMap.Builder<RandomAccessData, IsmShardKey> builder =
ImmutableSortedMap.orderedBy(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR);
// Read the first key
RandomAccessData currentKeyBytes = new RandomAccessData();
readKey(inStream, currentKeyBytes);
long currentOffset = VarInt.decodeLong(inStream);
// Insert the entry that happens at the beginning limiting the shard block by the
// first keys block offset.
builder.put(
new RandomAccessData(0),
new IsmShardKey(
IsmReaderImpl.this.resourceId.toString(),
new RandomAccessData(0),
shardWithIndex.getBlockOffset(),
currentOffset));
// While another index entry exists, insert an index entry with the key, and offsets
// that limit the range of the shard block.
while (rawChannel.position() < startOfNextBlock) {
RandomAccessData nextKeyBytes = currentKeyBytes.copy();
readKey(inStream, nextKeyBytes);
long nextOffset = VarInt.decodeLong(inStream);
builder.put(
currentKeyBytes,
new IsmShardKey(
IsmReaderImpl.this.resourceId.toString(),
currentKeyBytes,
currentOffset,
nextOffset));
currentKeyBytes = nextKeyBytes;
currentOffset = nextOffset;
}
// Upper bound the last entry with the index offset.
builder.put(
currentKeyBytes,
new IsmShardKey(
IsmReaderImpl.this.resourceId.toString(),
currentKeyBytes,
currentOffset,
shardWithIndex.getIndexOffset()));
indexPerShard.put(shardId, builder.build());
return Optional.of(rawChannel);
}
/** A function which takes an IsmShardKey fully describing a data block to read and return. */
private class IsmCacheLoader
implements Callable<
WeightedValue<NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>>>> {
private final IsmShardKey key;
private IsmCacheLoader(IsmShardKey key) {
this.key = key;
}
@Override
public WeightedValue<NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>>> call()
throws IOException {
// Open a channel and build a sorted map from key to value for each key value
// pair found within the data block.
try (SeekableByteChannel rawChannel = open()) {
return call(rawChannel);
}
}
public WeightedValue<NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>>> call(
SeekableByteChannel rawChannel) throws IOException {
SideInputReadCounter readCounter = IsmReader.getCurrentSideInputCounter();
try (WithinShardIsmReaderIterator readerIterator =
new WithinShardIsmReaderIterator(
rawChannel, key.firstKey, key.startOffset, key.endOffset, readCounter)) {
ImmutableSortedMap.Builder<RandomAccessData, WindowedValue<IsmRecord<V>>> mapBuilder =
ImmutableSortedMap.orderedBy(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR);
for (boolean more = readerIterator.start(); more; more = readerIterator.advance()) {
RandomAccessData nextKey = readerIterator.getCurrentKeyBytes().copy();
WindowedValue<IsmRecord<V>> next = readerIterator.getCurrent();
mapBuilder.put(nextKey, next);
}
// We return the size of the data block as the weight of this data block.
return WeightedValue.of(
(NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>>) mapBuilder.build(),
key.endOffset - key.startOffset);
}
}
}
/**
* Fetches the data block requested.
*
* <p>If the cache is available, we will load and cache the requested block. Otherwise, we will
* load and return the block.
*/
private NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>> fetch(IsmShardKey key)
throws IOException {
try {
if (cache == null) {
return new IsmCacheLoader(key).call().getValue();
} else {
return cache.get(key, new IsmCacheLoader(key)).getValue();
}
} catch (ExecutionException e) {
// Try and re-throw the root cause if its an IOException
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new IOException(e.getCause());
}
}
/** The base class of Ism reader iterators which operate over a given key prefix. */
abstract class IsmPrefixReaderIteratorImpl extends IsmPrefixReaderIterator {
private final List<?> keyComponents;
final SideInputReadCounter readCounter;
private IsmPrefixReaderIteratorImpl(List<?> keyComponents, SideInputReadCounter readCounter) {
this.keyComponents = keyComponents;
this.readCounter = readCounter;
}
/** Returns the list of key components representing this iterators key prefix. */
protected List<?> getKeyComponents() {
return keyComponents;
}
/**
* Concatenates this reader iterators key components with the additionally supplied key
* components and encodes them into their byte representations producing a key. Returns the
* exact record represented by the key generated above.
*
* <p>Null is returned if no key has all the key components as a prefix within this file.
*/
@Override
public final WindowedValue<IsmRecord<V>> get(List<?> additionalKeyComponents)
throws IOException {
RandomAccessData keyBytes = new RandomAccessData();
try (Closeable readerCloser = readCounter.enter()) {
int shardId =
coder.encodeAndHash(
ImmutableList.builder()
.addAll(keyComponents)
.addAll(additionalKeyComponents)
.build(),
keyBytes);
return getBlock(keyBytes, shardId, readCounter).get(keyBytes);
}
}
/**
* Returns the record for the last key having this iterators key prefix. Last is defined as the
* largest key with the same key prefix when comparing key's byte representations using an
* unsigned lexicographical byte order.
*
* <p>Null is returned if the prefix is not present within this file.
*/
@Override
public WindowedValue<IsmRecord<V>> getLast() throws IOException {
RandomAccessData keyBytes = new RandomAccessData();
int shardId = coder.encodeAndHash(keyComponents, keyBytes);
Optional<SeekableByteChannel> inChannel =
initializeFooterAndShardIndex(Optional.<SeekableByteChannel>absent(), readCounter);
// Key is not stored here
if (!shardIdToShardMap.containsKey(shardId) || !bloomFilterMightContain(keyBytes)) {
return null;
}
inChannel = initializeForKeyedRead(shardId, inChannel, readCounter);
closeIfPresent(inChannel);
final NavigableMap<RandomAccessData, IsmShardKey> indexInShard = indexPerShard.get(shardId);
RandomAccessData end = keyBytes.increment();
final IsmShardKey cacheEntry = indexInShard.floorEntry(end).getValue();
NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>> block;
try (Closeable readerCloser = IsmReader.setSideInputReadContext(readCounter)) {
block = fetch(cacheEntry);
}
RandomAccessData lastKey = block.lastKey();
// If the requested key is greater than the last key within the block, then it
// does not exist.
if (RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(keyBytes, lastKey) > 0) {
return null;
}
Entry<RandomAccessData, WindowedValue<IsmRecord<V>>> rval = block.floorEntry(end);
// If the prefix matches completely then we can return
if (RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.commonPrefixLength(
keyBytes, rval.getKey())
== keyBytes.size()) {
return rval.getValue();
}
return null;
}
}
/** An empty reader iterator. */
class EmptyIsmPrefixReaderIterator extends IsmPrefixReaderIteratorImpl {
private EmptyIsmPrefixReaderIterator(List<?> keyComponents) {
super(keyComponents, NoopSideInputReadCounter.INSTANCE);
}
@Override
public boolean start() throws IOException {
return false;
}
@Override
public boolean advance() throws IOException {
return false;
}
@Override
public WindowedValue<IsmRecord<V>> getCurrent() throws NoSuchElementException {
throw new NoSuchElementException();
}
}
/** A reader iterator which initializes its input stream lazily. */
class LazyIsmPrefixReaderIterator extends IsmPrefixReaderIteratorImpl {
private IsmPrefixReaderIterator delegate;
public LazyIsmPrefixReaderIterator(SideInputReadCounter readCounter) {
super(ImmutableList.of(), readCounter);
}
@Override
public boolean start() throws IOException {
checkState(delegate == null, "Already started");
try (Closeable counterCloser = IsmReader.setSideInputReadContext(readCounter)) {
delegate = overKeyComponents(getKeyComponents());
}
return delegate.start();
}
@Override
public boolean advance() throws IOException {
checkState(delegate != null, "Not started");
return delegate.advance();
}
@Override
public WindowedValue<IsmRecord<V>> getCurrent() {
checkState(delegate != null, "Not started");
return delegate.getCurrent();
}
@Override
public void close() throws IOException {
if (delegate != null) {
delegate.close();
}
}
}
/**
* Returns a map from key to value, where the keys are in increasing lexicographical order. If the
* requested key is not contained within this file, an empty map is returned.
*/
private NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>> getBlock(
RandomAccessData keyBytes, int shardId, SideInputReadCounter readCounter) throws IOException {
Optional<SeekableByteChannel> inChannel =
initializeFooterAndShardIndex(Optional.<SeekableByteChannel>absent(), readCounter);
// Key is not stored here so return an empty map.
if (!shardIdToShardMap.containsKey(shardId) || !bloomFilterMightContain(keyBytes)) {
return ImmutableSortedMap.<RandomAccessData, WindowedValue<IsmRecord<V>>>orderedBy(
RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR)
.build();
}
inChannel = initializeForKeyedRead(shardId, inChannel, readCounter);
closeIfPresent(inChannel);
final NavigableMap<RandomAccessData, IsmShardKey> indexInShard = indexPerShard.get(shardId);
final IsmShardKey cacheEntry = indexInShard.floorEntry(keyBytes).getValue();
try (Closeable readerCloseable = IsmReader.setSideInputReadContext(readCounter)) {
return fetch(cacheEntry);
}
}
/**
* A reader iterator that returns all elements from prefix (inclusive) to prefixUpperBound
* (exclusive) within the set of block entries provided.
*/
private class WithinShardIsmPrefixReaderIterator extends IsmPrefixReaderIteratorImpl {
private final Iterator<IsmShardKey> blockEntriesIterator;
Iterator<WindowedValue<IsmRecord<V>>> iterator;
private final RandomAccessData prefix;
private final RandomAccessData prefixUpperBound;
private Optional<WindowedValue<IsmRecord<V>>> current;
private WithinShardIsmPrefixReaderIterator(
List<?> keyComponents,
RandomAccessData prefix,
RandomAccessData prefixUpperBound,
Iterator<IsmShardKey> blockEntriesIterator,
SideInputReadCounter readCounter) {
super(keyComponents, readCounter);
checkNotNull(blockEntriesIterator);
this.prefix = prefix;
this.prefixUpperBound = prefixUpperBound;
this.blockEntriesIterator = blockEntriesIterator;
}
@Override
public boolean start() throws IOException {
return advance();
}
@Override
public boolean advance() throws IOException {
// This is in a while loop because the blocks that we are asked to look into may
// not contain the key prefix.
while (iterator == null || !iterator.hasNext()) {
// If there are no blocks to iterate over we can return false
if (!blockEntriesIterator.hasNext()) {
return false;
}
NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>> map;
try (Closeable counterCloseable = IsmReader.setSideInputReadContext(readCounter)) {
IsmShardKey nextBlock = blockEntriesIterator.next();
map = fetch(nextBlock);
}
SortedMap<RandomAccessData, WindowedValue<IsmRecord<V>>> submap =
map.subMap(prefix, prefixUpperBound);
Collection<WindowedValue<IsmRecord<V>>> values = submap.values();
iterator = values.iterator();
}
current = Optional.of(iterator.next());
return true;
}
@Override
public WindowedValue<IsmRecord<V>> getCurrent() throws NoSuchElementException {
if (!current.isPresent()) {
throw new NoSuchElementException();
}
return current.get();
}
}
/** A reader iterator that returns all records across all shards contained within this file. */
private class ShardAwareIsmPrefixReaderIterator extends IsmPrefixReaderIteratorImpl {
private final SeekableByteChannel rawChannel;
private WithinShardIsmReaderIterator delegate;
private Iterator<IsmShard> shardEntries;
private ShardAwareIsmPrefixReaderIterator(
List<?> keyComponents, SeekableByteChannel rawChannel, SideInputReadCounter readCounter)
throws IOException {
super(keyComponents, readCounter);
checkState(
shardOffsetToShardMap.size() > 0,
"Expected that shard offset to shard map has been initialized and is not empty.");
this.rawChannel = rawChannel;
this.shardEntries = shardOffsetToShardMap.values().iterator();
IsmShard firstShard = shardEntries.next();
delegate =
new WithinShardIsmReaderIterator(
rawChannel,
new RandomAccessData(),
firstShard.getBlockOffset(),
firstShard.getIndexOffset(),
readCounter);
}
@Override
public boolean start() throws IOException {
checkState(delegate.start(), "Expected each shard to contain at least one entry");
return true;
}
@Override
public boolean advance() throws IOException {
if (delegate.advance()) {
return true;
}
// If our current shard index is empty, we need to move to the next one.
if (!shardEntries.hasNext()) {
return false;
}
IsmShard nextIsmShard = shardEntries.next();
checkState(
nextIsmShard.getBlockOffset() >= rawChannel.position(),
"Expected channel read order to be sequential.");
delegate =
new WithinShardIsmReaderIterator(
rawChannel,
new RandomAccessData(),
nextIsmShard.getBlockOffset(),
nextIsmShard.getIndexOffset(),
readCounter);
checkState(delegate.start(), "Expected each shard to contain at least one entry.");
return true;
}
@Override
public WindowedValue<IsmRecord<V>> getCurrent() throws NoSuchElementException {
return delegate.getCurrent();
}
@Override
public void close() throws IOException {
rawChannel.close();
}
@Override
public WindowedValue<IsmRecord<V>> getLast() throws IOException {
// Since this is an iterator over all of the file, we return the last record within the file.
try (Closeable readerCloser = readCounter.enter()) {
int lastShardId = shardOffsetToShardMap.lastEntry().getValue().getId();
initializeForKeyedRead(lastShardId, Optional.<SeekableByteChannel>absent(), readCounter);
NavigableMap<RandomAccessData, WindowedValue<IsmRecord<V>>> lastBlock =
getBlock(indexPerShard.get(lastShardId).lastKey(), lastShardId, readCounter);
return lastBlock.lastEntry().getValue();
}
}
}
/**
* A reader iterator for Ism formatted files which returns a sequence of Ism records from the
* underlying channel from where it is currently positioned till the supplied limit.
*
* <p>This reader iterator will use the supplied bytes as the bytes for the previous key which is
* required to do the KeyPrefix based key decoding.
*/
private class WithinShardIsmReaderIterator
extends NativeReaderIterator<WindowedValue<IsmRecord<V>>> {
private final SeekableByteChannel rawChannel;
private final InputStream inStream;
private final SideInputReadCounter readCounter;
private long readLimit;
private RandomAccessData keyBytes;
private long position;
private Optional<WindowedValue<IsmRecord<V>>> current;
private WithinShardIsmReaderIterator(
SeekableByteChannel rawChannel,
RandomAccessData currentKeyBytes,
long newPosition,
long newLimit,
SideInputReadCounter readCounter)
throws IOException {
checkNotNull(rawChannel);
checkNotNull(currentKeyBytes);
checkArgument(newLimit >= 0L);
checkArgument(newPosition <= newLimit);
this.rawChannel = rawChannel;
this.inStream = Channels.newInputStream(rawChannel);
this.keyBytes = currentKeyBytes.copy();
this.position = newPosition;
this.readLimit = newLimit;
this.readCounter = readCounter;
IsmReaderImpl.position(rawChannel, newPosition);
}
@Override
public boolean start() throws IOException {
return advance();
}
@Override
public boolean advance() throws IOException, NoSuchElementException {
checkState(position <= readLimit, "Read past end of stream");
if (position == readLimit) {
current = Optional.absent();
return false;
}
final IsmRecord<V> ismRecord;
readKey(inStream, keyBytes);
InputStream keyInputStream = keyBytes.asInputStream(0, keyBytes.size());
List<Object> keyComponents = new ArrayList<>(coder.getKeyComponentCoders().size());
for (int i = 0; i < coder.getKeyComponentCoders().size(); ++i) {
keyComponents.add(coder.getKeyComponentCoder(i).decode(keyInputStream));
}
if (IsmFormat.isMetadataKey(keyComponents)) {
byte[] metadata = ByteArrayCoder.of().decode(inStream);
ismRecord = IsmRecord.<V>meta(keyComponents, metadata);
} else {
V value = coder.getValueCoder().decode(inStream);
ismRecord = IsmRecord.<V>of(keyComponents, value);
}
long newPosition = rawChannel.position();
IsmReaderImpl.this.notifyElementRead(newPosition - position);
readCounter.addBytesRead(newPosition - position);
position = newPosition;
current = Optional.of(new ValueInEmptyWindows<>(ismRecord));
return true;
}
@Override
public WindowedValue<IsmRecord<V>> getCurrent() {
if (!current.isPresent()) {
throw new NoSuchElementException();
}
return current.get();
}
public RandomAccessData getCurrentKeyBytes() {
if (!current.isPresent()) {
throw new NoSuchElementException();
}
return keyBytes;
}
}
/**
* Decodes a KeyPrefix from the stream and then reads unshared key bytes from the stream placing
* them into the supplied keyBytes at position keyBytes[shared key bytes : shared key bytes +
* unshared key bytes].
*/
private static void readKey(InputStream inStream, RandomAccessData keyBytes) throws IOException {
KeyPrefix keyPrefix = KeyPrefixCoder.of().decode(inStream);
// currentKey = prevKey[0 : sharedKeySize] + read(unsharedKeySize)
keyBytes.readFrom(
inStream,
keyPrefix.getSharedKeySize() /* start to overwrite the previous key at sharedKeySize */,
keyPrefix.getUnsharedKeySize() /* read unsharedKeySize bytes from the stream */);
// Reset the length incase the next key was shorter.
keyBytes.resetTo(keyPrefix.getSharedKeySize() + keyPrefix.getUnsharedKeySize());
}
/** Closes the underlying channel if present. */
private void closeIfPresent(Optional<SeekableByteChannel> inChannel) throws IOException {
if (inChannel.isPresent()) {
inChannel.get().close();
}
}
/** Returns a channel, opening a new one if required. */
private SeekableByteChannel openIfNeeded(Optional<SeekableByteChannel> inChannel)
throws IOException {
if (inChannel.isPresent()) {
return inChannel.get();
}
return open();
}
/** Opens a new channel. */
private SeekableByteChannel open() throws IOException {
ReadableByteChannel channel = FileSystems.open(resourceId);
Preconditions.checkArgument(
channel instanceof SeekableByteChannel,
"IsmReaderImpl requires a SeekableByteChannel for path %s but received %s.",
resourceId,
channel);
return (SeekableByteChannel) channel;
}
/**
* Seeks into the channel intelligently by either resetting the position or reading and discarding
* bytes.
*/
private static void position(SeekableByteChannel inChannel, long newPosition) throws IOException {
long currentPosition = inChannel.position();
// If just doing a read is cheaper discarding the bytes lets just do the read
if (currentPosition < newPosition && newPosition - currentPosition <= SEEK_VS_READ) {
ByteStreams.skipFully(Channels.newInputStream(inChannel), newPosition - currentPosition);
} else {
// Otherwise we will perform a seek
inChannel.position(newPosition);
}
}
/**
* A {@link SeekableByteChannel} which uses a cached data segment representing the tail of a
* {@link ReadableByteChannel}. Note that this channel only supports read operations. Closing this
* channel is a no-op.
*/
static class CachedTailSeekableByteChannel implements SeekableByteChannel {
final long offset;
final byte[] data;
// Represents the relative position inside of data
int localPosition;
CachedTailSeekableByteChannel(long offset, byte[] data) {
checkArgument(offset >= 0, "Offset must be non-negative.");
checkNotNull(data, "Cached data must not be null.");
this.offset = offset;
this.data = data;
}
@Override
public boolean isOpen() {
return true;
}
@Override
public void close() throws IOException {}
@Override
public int read(ByteBuffer dst) throws IOException {
if (localPosition >= data.length) {
return -1;
}
int length = Math.min(dst.remaining(), data.length - localPosition);
dst.put(data, localPosition, length);
localPosition += length;
return length;
}
@Override
public int write(ByteBuffer src) throws IOException {
throw new NonWritableChannelException();
}
@Override
public long position() throws IOException {
// The external position is our relative position inside of data plus the offset.
return offset + localPosition;
}
@Override
public SeekableByteChannel position(long newPosition) throws IOException {
checkArgument(
newPosition >= offset && newPosition <= offset + data.length,
"Cannot seek to position %s which is outside of cached data range [%s, %s].",
newPosition,
offset,
offset + data.length);
localPosition = Ints.checkedCast(newPosition - offset);
return this;
}
@Override
public long size() throws IOException {
// The external size is the offset plus the amount of data that we are caching.
return offset + data.length;
}
@Override
public SeekableByteChannel truncate(long size) throws IOException {
throw new NonWritableChannelException();
}
}
}