blob: 354d08bb896184d03ba92d9517d962f59b93050b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
*
* <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#split}, and
* element timestamps are propagated. While any elements remain, the watermark is the beginning of
* time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced the
* watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
*
* <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner {@link
* BoundedSource}. Sources that cannot be split are read entirely into memory, so this transform
* does not work well with large, unsplittable sources.
*
* <p>This transform is intended to be used by a runner during pipeline translation to convert a
* Read.Bounded into a Read.Unbounded.
*/
public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);
// Using 64MB in cases where we cannot compute a valid estimated size for a source.
private static final long DEFAULT_ESTIMATED_SIZE = 64 * 1024 * 1024;
private final BoundedSource<T> source;
/**
* Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
*/
public UnboundedReadFromBoundedSource(BoundedSource<T> source) {
this.source = source;
}
@Override
public PCollection<T> expand(PBegin input) {
return input.getPipeline().apply(Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
}
@Override
protected Coder<T> getDefaultOutputCoder() {
return source.getDefaultOutputCoder();
}
@Override
public String getKindString() {
return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder.add(DisplayData.item("source", source.getClass())).include("source", source);
}
/** A {@code BoundedSource} to {@code UnboundedSource} adapter. */
@VisibleForTesting
public static class BoundedToUnboundedSourceAdapter<T>
extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
private BoundedSource<T> boundedSource;
public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
this.boundedSource = boundedSource;
}
@Override
public void validate() {
boundedSource.validate();
}
@Override
public List<BoundedToUnboundedSourceAdapter<T>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
try {
long estimatedSize = boundedSource.getEstimatedSizeBytes(options);
if (estimatedSize <= 0) {
// Source is unable to provide a valid estimated size. So using default size.
LOG.warn(
"Cannot determine a valid estimated size for BoundedSource {}. Using default "
+ "size of {} bytes",
boundedSource,
DEFAULT_ESTIMATED_SIZE);
estimatedSize = DEFAULT_ESTIMATED_SIZE;
}
// Each split should at least be of size 1 byte.
long desiredBundleSize = Math.max(estimatedSize / desiredNumSplits, 1);
List<? extends BoundedSource<T>> splits = boundedSource.split(desiredBundleSize, options);
if (splits.size() == 0) {
splits = ImmutableList.of(boundedSource);
}
return splits.stream()
.map(input -> new BoundedToUnboundedSourceAdapter<>(input))
.collect(Collectors.toList());
} catch (Exception e) {
LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
return ImmutableList.of(this);
}
}
@Override
public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
throws IOException {
if (checkpoint == null) {
return new Reader(null /* residualElements */, boundedSource, options);
} else {
return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
}
}
@Override
public Coder<T> getDefaultOutputCoder() {
return boundedSource.getDefaultOutputCoder();
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
}
/**
* A marker representing the progress and state of an {@link BoundedToUnboundedSourceAdapter}.
*/
@VisibleForTesting
public static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
private final @Nullable List<TimestampedValue<T>> residualElements;
private final @Nullable BoundedSource<T> residualSource;
public Checkpoint(
@Nullable List<TimestampedValue<T>> residualElements,
@Nullable BoundedSource<T> residualSource) {
this.residualElements = residualElements;
this.residualSource = residualSource;
}
@Override
public void finalizeCheckpoint() {}
@VisibleForTesting
@Nullable
List<TimestampedValue<T>> getResidualElements() {
return residualElements;
}
@VisibleForTesting
@Nullable
BoundedSource<T> getResidualSource() {
return residualSource;
}
}
@VisibleForTesting
static class CheckpointCoder<T> extends StructuredCoder<Checkpoint<T>> {
// The coder for a list of residual elements and their timestamps
private final Coder<List<TimestampedValue<T>>> elemsCoder;
// The coder from the BoundedReader for coding each element
private final Coder<T> elemCoder;
// The nullable and serializable coder for the BoundedSource.
@SuppressWarnings("rawtypes")
private final Coder<BoundedSource> sourceCoder;
CheckpointCoder(Coder<T> elemCoder) {
this.elemsCoder =
NullableCoder.of(ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
this.elemCoder = elemCoder;
this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
}
@Override
public void encode(Checkpoint<T> value, OutputStream outStream)
throws CoderException, IOException {
elemsCoder.encode(value.residualElements, outStream);
sourceCoder.encode(value.residualSource, outStream);
}
@SuppressWarnings("unchecked")
@Override
public Checkpoint<T> decode(InputStream inStream) throws CoderException, IOException {
return new Checkpoint<>(elemsCoder.decode(inStream), sourceCoder.decode(inStream));
}
@Override
public List<Coder<?>> getCoderArguments() {
return Arrays.asList(elemCoder);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
throw new NonDeterministicException(
this, "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
}
}
/**
* An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into {@link
* ResidualElements} and {@link ResidualSource}.
*
* <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
* the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
* be split into {@link ResidualElements} and {@link ResidualSource}.
*/
@VisibleForTesting
class Reader extends UnboundedReader<T> {
// Initialized in init()
private @Nullable ResidualElements residualElements;
private @Nullable ResidualSource residualSource;
private final PipelineOptions options;
private boolean done;
Reader(
@Nullable List<TimestampedValue<T>> residualElementsList,
@Nullable BoundedSource<T> residualSource,
PipelineOptions options) {
init(residualElementsList, residualSource, options);
this.options = checkNotNull(options, "options");
this.done = false;
}
private void init(
@Nullable List<TimestampedValue<T>> residualElementsList,
@Nullable BoundedSource<T> residualSource,
PipelineOptions options) {
this.residualElements =
residualElementsList == null
? new ResidualElements(Collections.emptyList())
: new ResidualElements(residualElementsList);
this.residualSource =
residualSource == null ? null : new ResidualSource(residualSource, options);
}
@Override
public boolean start() throws IOException {
return advance();
}
@Override
public boolean advance() throws IOException {
if (residualElements.advance()) {
return true;
} else if (residualSource != null && residualSource.advance()) {
return true;
} else {
done = true;
return false;
}
}
@Override
public void close() throws IOException {
if (residualSource != null) {
residualSource.close();
}
}
@Override
public T getCurrent() throws NoSuchElementException {
if (residualElements.hasCurrent()) {
return residualElements.getCurrent();
} else if (residualSource != null) {
return residualSource.getCurrent();
} else {
throw new NoSuchElementException();
}
}
@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
if (residualElements.hasCurrent()) {
return residualElements.getCurrentTimestamp();
} else if (residualSource != null) {
return residualSource.getCurrentTimestamp();
} else {
throw new NoSuchElementException();
}
}
@Override
public Instant getWatermark() {
return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
}
/**
* {@inheritDoc}
*
* <p>If only part of the {@link ResidualElements} is consumed, the new checkpoint will
* contain the remaining elements in {@link ResidualElements} and the {@link ResidualSource}.
*
* <p>If all {@link ResidualElements} and part of the {@link ResidualSource} are consumed, the
* new checkpoint is done by splitting {@link ResidualSource} into new {@link
* ResidualElements} and {@link ResidualSource}. {@link ResidualSource} is the source split
* from the current source, and {@link ResidualElements} contains rest elements from the
* current source after the splitting. For unsplittable source, it will put all remaining
* elements into the {@link ResidualElements}.
*/
@Override
public Checkpoint<T> getCheckpointMark() {
Checkpoint<T> newCheckpoint;
if (!residualElements.done()) {
// Part of residualElements are consumed.
// Checkpoints the remaining elements and residualSource.
newCheckpoint =
new Checkpoint<>(
residualElements.getRestElements(),
residualSource == null ? null : residualSource.getSource());
} else if (residualSource != null) {
newCheckpoint = residualSource.getCheckpointMark();
} else {
newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
}
// Re-initialize since the residualElements and the residualSource might be
// consumed or split by checkpointing.
init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
return newCheckpoint;
}
@Override
public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
return BoundedToUnboundedSourceAdapter.this;
}
}
private class ResidualElements {
private final List<TimestampedValue<T>> elementsList;
private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
private @Nullable TimestampedValue<T> currentT;
private boolean hasCurrent;
private boolean done;
ResidualElements(List<TimestampedValue<T>> residualElementsList) {
this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
this.elementsIterator = null;
this.currentT = null;
this.hasCurrent = false;
this.done = false;
}
public boolean advance() {
if (elementsIterator == null) {
elementsIterator = elementsList.iterator();
}
if (elementsIterator.hasNext()) {
currentT = elementsIterator.next();
hasCurrent = true;
return true;
} else {
done = true;
hasCurrent = false;
return false;
}
}
boolean hasCurrent() {
return hasCurrent;
}
boolean done() {
return done;
}
TimestampedValue<T> getCurrentTimestampedValue() {
if (!hasCurrent) {
throw new NoSuchElementException();
}
return currentT;
}
T getCurrent() {
return getCurrentTimestampedValue().getValue();
}
Instant getCurrentTimestamp() {
return getCurrentTimestampedValue().getTimestamp();
}
List<TimestampedValue<T>> getRestElements() {
if (elementsIterator == null) {
return elementsList;
} else {
List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
while (elementsIterator.hasNext()) {
newResidualElements.add(elementsIterator.next());
}
return newResidualElements;
}
}
}
private class ResidualSource {
private BoundedSource<T> residualSource;
private PipelineOptions options;
private @Nullable BoundedReader<T> reader;
private boolean closed;
private boolean readerDone;
public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
this.residualSource = checkNotNull(residualSource, "residualSource");
this.options = checkNotNull(options, "options");
this.reader = null;
this.closed = false;
this.readerDone = false;
}
private boolean advance() throws IOException {
checkArgument(!closed, "advance() call on closed %s", getClass().getName());
if (readerDone) {
return false;
}
if (reader == null) {
reader = residualSource.createReader(options);
readerDone = !reader.start();
} else {
readerDone = !reader.advance();
}
return !readerDone;
}
T getCurrent() throws NoSuchElementException {
if (reader == null) {
throw new NoSuchElementException();
}
return reader.getCurrent();
}
Instant getCurrentTimestamp() throws NoSuchElementException {
if (reader == null) {
throw new NoSuchElementException();
}
return reader.getCurrentTimestamp();
}
void close() throws IOException {
if (reader != null) {
reader.close();
reader = null;
}
closed = true;
}
BoundedSource<T> getSource() {
return residualSource;
}
Checkpoint<T> getCheckpointMark() {
if (reader == null) {
// Reader hasn't started, checkpoint the residualSource.
return new Checkpoint<>(null /* residualElements */, residualSource);
} else {
// Part of residualSource are consumed.
// Splits the residualSource and tracks the new residualElements in current source.
BoundedSource<T> residualSplit = null;
Double fractionConsumed = reader.getFractionConsumed();
if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
double fractionRest = 1 - fractionConsumed;
int splitAttempts = 8;
for (int i = 0; i < 8 && residualSplit == null; ++i) {
double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
residualSplit = reader.splitAtFraction(fractionToSplit);
}
}
List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
try {
while (advance()) {
newResidualElements.add(
TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
}
} catch (IOException e) {
throw new RuntimeException("Failed to read elements from the bounded reader.", e);
}
return new Checkpoint<>(newResidualElements, residualSplit);
}
}
}
}
}