blob: 040a8dc1fcdc0d7983f93296f40ddf424433b73d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
/**
* A {@link NativeReader} that reads Ism files.
*
* @param <V> the type of the value written to the sink
*/
public abstract class IsmReader<V> extends NativeReader<WindowedValue<IsmRecord<V>>> {
private static final ThreadLocal<SideInputReadCounter> CURRENT_SIDE_INPUT_COUNTERS =
new ThreadLocal<>();
public static Closeable setSideInputReadContext(SideInputReadCounter readCounter) {
SideInputReadCounter previousCounter = CURRENT_SIDE_INPUT_COUNTERS.get();
CURRENT_SIDE_INPUT_COUNTERS.set(readCounter);
return new Closeable() {
@Override
public void close() throws IOException {
if (previousCounter == null) {
CURRENT_SIDE_INPUT_COUNTERS.remove();
} else {
CURRENT_SIDE_INPUT_COUNTERS.set(previousCounter);
}
}
};
}
/**
* To be able to keep counters of bytes read and time spent reading side inputs, it is necessary
* to track the current side input counter in a per-thread context. This is because IsmReader
* instances are cached, and reused across multiple threads and work items (possibly at the same
* time). When a specific thread needs to consume a side input, the IsmReader implementation will
* check what is the side input counter in the current context. Each side input counter object is
* associated to one step and one side input - so when the IsmReader implementation gets the read
* counter, it will be able to identify which side input it is reading from, and associate its
* costs to it.
*/
public static SideInputReadCounter getCurrentSideInputCounter() {
SideInputReadCounter currentCounter = CURRENT_SIDE_INPUT_COUNTERS.get();
return currentCounter != null ? currentCounter : NoopSideInputReadCounter.INSTANCE;
}
@Override
public abstract IsmPrefixReaderIterator iterator() throws IOException;
/**
* Returns a reader over a set of key components.
*
* <p>If the file is empty or their is no key with the same prefix, then we return an empty reader
* iterator.
*
* <p>If less than the required number of shard key components is passed in, then a reader
* iterator over all the keys is returned.
*
* <p>Otherwise we return a reader iterator which only iterates over keys which have the same key
* prefix.
*
* <p>See {@link #overKeyComponents(List, int, RandomAccessData)} for an optimized version of this
* method that allows the user to supply a precomputed shard id and key byte prefix.
*/
public abstract IsmPrefixReaderIterator overKeyComponents(List<?> keyComponents)
throws IOException;
/**
* Returns a reader over a set of key components. This method expects the correct shard id and key
* prefix byte representing the key components to be passed in. If key components is empty, then
* shard id should be 0 and key bytes should be empty.
*
* <p>If the file is empty or their is no key with the same prefix, then we return an empty reader
* iterator.
*
* <p>If less than the required number of shard key components is passed in, then a reader
* iterator over all the keys is returned.
*
* <p>Otherwise we return a reader iterator which only iterates over keys which have the same key
* prefix.
*/
public abstract IsmPrefixReaderIterator overKeyComponents(
List<?> keyComponents, int shardId, RandomAccessData keyBytes) throws IOException;
/** Returns whether this ISM reader has been initialized. */
public abstract boolean isInitialized();
/** Returns whether this ISM reader contains no data. */
public abstract boolean isEmpty() throws IOException;
/** Returns the resourceId that this ISM reader is responsible for. */
public abstract ResourceId getResourceId();
/** Returns the coder associated with this IsmReader. */
public abstract IsmRecordCoder<V> getCoder();
@Override
public String toString() {
return MoreObjects.toStringHelper(IsmReader.class)
.add("resource id", getResourceId())
.add("coder", getCoder())
.toString();
}
/** The base class of Ism reader iterators which operate over a given key prefix. */
abstract class IsmPrefixReaderIterator extends NativeReaderIterator<WindowedValue<IsmRecord<V>>> {
public abstract WindowedValue<IsmRecord<V>> get(List<?> additionalKeyComponents)
throws IOException;
public abstract WindowedValue<IsmRecord<V>> getLast() throws IOException;
@Override
public abstract WindowedValue<IsmRecord<V>> getCurrent();
}
}