blob: 6a33f388bd0420c2c5afd9b3a31f111a2a0e8fe1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* A {@link PTransform} that produces longs starting from the given value, and either up to the
* given limit or until {@link Long#MAX_VALUE} / until the given time elapses.
*
* <p>The bounded {@link GenerateSequence} is implemented based on {@link OffsetBasedSource} and
* {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it
* supports dynamic work rebalancing.
*
* <p>To produce a bounded {@code PCollection<Long>}:
*
* <pre>{@code
* Pipeline p = ...
* PCollection<Long> bounded = p.apply(GenerateSequence.from(0).to(1000));
* }</pre>
*
* <p>To produce an unbounded {@code PCollection<Long>}, simply do not specify {@link #to(long)},
* calling {@link #withTimestampFn(SerializableFunction)} to provide values with timestamps other
* than {@link Instant#now}.
*
* <pre>{@code
* Pipeline p = ...
*
* // To use processing time as the element timestamp.
* PCollection<Long> unbounded = p.apply(GenerateSequence.from(0));
* // Or, to use a provided function to set the element timestamp.
* PCollection<Long> unboundedWithTimestamps =
* p.apply(GenerateSequence.from(0).withTimestampFn(someFn));
* }</pre>
*
* <p>In all cases, the sequence of numbers is generated in parallel, so there is no inherent
* ordering between the generated values - it is only guaranteed that all values in the given range
* will be present in the resulting {@link PCollection}.
*/
@AutoValue
public abstract class GenerateSequence extends PTransform<PBegin, PCollection<Long>> {
abstract long getFrom();
abstract long getTo();
@Nullable
abstract SerializableFunction<Long, Instant> getTimestampFn();
abstract long getElementsPerPeriod();
@Nullable
abstract Duration getPeriod();
@Nullable
abstract Duration getMaxReadTime();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder
implements ExternalTransformBuilder<
External.ExternalConfiguration, PBegin, PCollection<Long>> {
abstract Builder setFrom(long from);
abstract Builder setTo(long to);
abstract Builder setTimestampFn(SerializableFunction<Long, Instant> timestampFn);
abstract Builder setElementsPerPeriod(long elementsPerPeriod);
abstract Builder setPeriod(Duration period);
abstract Builder setMaxReadTime(Duration maxReadTime);
abstract GenerateSequence build();
@Override
public GenerateSequence buildExternal(External.ExternalConfiguration config) {
Preconditions.checkNotNull(config.start, "Parameters 'from' must not be null.");
setFrom(config.start);
setTo(-1);
setElementsPerPeriod(0);
if (config.stop != null) {
setTo(config.stop);
}
if (config.period != null) {
setPeriod(Duration.millis(config.period));
}
if (config.maxReadTime != null) {
setMaxReadTime(Duration.millis(config.maxReadTime));
}
if (config.elementsPerPeriod != null) {
setElementsPerPeriod(config.elementsPerPeriod);
}
return build();
}
}
/** Exposes GenerateSequence as an external transform for cross-language usage. */
@AutoService(ExternalTransformRegistrar.class)
public static class External implements ExternalTransformRegistrar {
public static final String URN = "beam:external:java:generate_sequence:v1";
@Override
public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
return ImmutableMap.of(URN, AutoValue_GenerateSequence.Builder.class);
}
/** Parameters class to expose the transform to an external SDK. */
public static class ExternalConfiguration {
private Long start;
@Nullable private Long stop;
@Nullable private Long period;
@Nullable private Long maxReadTime;
@Nullable private Long elementsPerPeriod;
public void setStart(Long start) {
this.start = start;
}
public void setStop(@Nullable Long stop) {
this.stop = stop;
}
public void setPeriod(@Nullable Long period) {
this.period = period;
}
public void setMaxReadTime(@Nullable Long maxReadTime) {
this.maxReadTime = maxReadTime;
}
public void setElementsPerPeriod(@Nullable Long elementsPerPeriod) {
this.elementsPerPeriod = elementsPerPeriod;
}
}
}
/** Specifies the minimum number to generate (inclusive). */
public static GenerateSequence from(long from) {
checkArgument(from >= 0, "Value of from must be non-negative, but was: %s", from);
return new AutoValue_GenerateSequence.Builder()
.setFrom(from)
.setTo(-1)
.setElementsPerPeriod(0)
.build();
}
/** Specifies the maximum number to generate (exclusive). */
public GenerateSequence to(long to) {
checkArgument(
getTo() == -1 || getTo() >= getFrom(), "Degenerate range [%s, %s)", getFrom(), getTo());
return toBuilder().setTo(to).build();
}
/** Specifies the function to use to assign timestamps to the elements. */
public GenerateSequence withTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
return toBuilder().setTimestampFn(timestampFn).build();
}
/** Specifies to generate at most a given number of elements per a given period. */
public GenerateSequence withRate(long numElements, Duration periodLength) {
checkArgument(
numElements > 0,
"Number of elements in withRate must be positive, but was: %s",
numElements);
checkArgument(periodLength != null, "periodLength can not be null");
return toBuilder().setElementsPerPeriod(numElements).setPeriod(periodLength).build();
}
/** Specifies to stop generating elements after the given time. */
public GenerateSequence withMaxReadTime(Duration maxReadTime) {
return toBuilder().setMaxReadTime(maxReadTime).build();
}
@Override
public PCollection<Long> expand(PBegin input) {
boolean isRangeUnbounded = getTo() < 0;
boolean usesUnboundedFeatures =
getTimestampFn() != null || getElementsPerPeriod() > 0 || getMaxReadTime() != null;
if (!isRangeUnbounded && !usesUnboundedFeatures) {
// This is the only case when we can use the bounded CountingSource.
return input.apply(Read.from(CountingSource.createSourceForSubrange(getFrom(), getTo())));
}
CountingSource.UnboundedCountingSource source = CountingSource.createUnboundedFrom(getFrom());
if (getTimestampFn() != null) {
source = source.withTimestampFn(getTimestampFn());
}
if (getElementsPerPeriod() > 0) {
source = source.withRate(getElementsPerPeriod(), getPeriod());
}
Read.Unbounded<Long> readUnbounded = Read.from(source);
if (getMaxReadTime() == null) {
if (isRangeUnbounded) {
return input.apply(readUnbounded);
} else {
return input.apply(readUnbounded.withMaxNumRecords(getTo() - getFrom()));
}
} else {
BoundedReadFromUnboundedSource<Long> withMaxReadTime =
readUnbounded.withMaxReadTime(getMaxReadTime());
if (isRangeUnbounded) {
return input.apply(withMaxReadTime);
} else {
return input.apply(withMaxReadTime.withMaxNumRecords(getTo() - getFrom()));
}
}
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("from", getFrom()).withLabel("Generate sequence from"));
builder.addIfNotDefault(
DisplayData.item("to", getTo()).withLabel("Generate sequence to (exclusive)"), -1L);
builder.addIfNotNull(
DisplayData.item(
"timestampFn", getTimestampFn() == null ? null : getTimestampFn().getClass())
.withLabel("Timestamp Function"));
builder.addIfNotNull(
DisplayData.item("maxReadTime", getMaxReadTime()).withLabel("Maximum Read Time"));
if (getElementsPerPeriod() > 0) {
builder.add(
DisplayData.item("elementsPerPeriod", getElementsPerPeriod())
.withLabel("Elements per period"));
builder.add(DisplayData.item("period", getPeriod()).withLabel("Period"));
}
}
}