| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| |
| import com.google.auto.service.AutoService; |
| import com.google.auto.value.AutoValue; |
| import java.util.Map; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; |
| import org.apache.beam.sdk.transforms.ExternalTransformBuilder; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.values.PBegin; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| |
| /** |
| * A {@link PTransform} that produces longs starting from the given value, and either up to the |
| * given limit or until {@link Long#MAX_VALUE} / until the given time elapses. |
| * |
| * <p>The bounded {@link GenerateSequence} is implemented based on {@link OffsetBasedSource} and |
| * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it |
| * supports dynamic work rebalancing. |
| * |
| * <p>To produce a bounded {@code PCollection<Long>}: |
| * |
| * <pre>{@code |
| * Pipeline p = ... |
| * PCollection<Long> bounded = p.apply(GenerateSequence.from(0).to(1000)); |
| * }</pre> |
| * |
| * <p>To produce an unbounded {@code PCollection<Long>}, simply do not specify {@link #to(long)}, |
| * calling {@link #withTimestampFn(SerializableFunction)} to provide values with timestamps other |
| * than {@link Instant#now}. |
| * |
| * <pre>{@code |
| * Pipeline p = ... |
| * |
| * // To use processing time as the element timestamp. |
| * PCollection<Long> unbounded = p.apply(GenerateSequence.from(0)); |
| * // Or, to use a provided function to set the element timestamp. |
| * PCollection<Long> unboundedWithTimestamps = |
| * p.apply(GenerateSequence.from(0).withTimestampFn(someFn)); |
| * }</pre> |
| * |
| * <p>In all cases, the sequence of numbers is generated in parallel, so there is no inherent |
| * ordering between the generated values - it is only guaranteed that all values in the given range |
| * will be present in the resulting {@link PCollection}. |
| */ |
| @AutoValue |
| public abstract class GenerateSequence extends PTransform<PBegin, PCollection<Long>> { |
| abstract long getFrom(); |
| |
| abstract long getTo(); |
| |
| @Nullable |
| abstract SerializableFunction<Long, Instant> getTimestampFn(); |
| |
| abstract long getElementsPerPeriod(); |
| |
| @Nullable |
| abstract Duration getPeriod(); |
| |
| @Nullable |
| abstract Duration getMaxReadTime(); |
| |
| abstract Builder toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder |
| implements ExternalTransformBuilder< |
| External.ExternalConfiguration, PBegin, PCollection<Long>> { |
| abstract Builder setFrom(long from); |
| |
| abstract Builder setTo(long to); |
| |
| abstract Builder setTimestampFn(SerializableFunction<Long, Instant> timestampFn); |
| |
| abstract Builder setElementsPerPeriod(long elementsPerPeriod); |
| |
| abstract Builder setPeriod(Duration period); |
| |
| abstract Builder setMaxReadTime(Duration maxReadTime); |
| |
| abstract GenerateSequence build(); |
| |
| @Override |
| public GenerateSequence buildExternal(External.ExternalConfiguration config) { |
| Preconditions.checkNotNull(config.start, "Parameters 'from' must not be null."); |
| setFrom(config.start); |
| setTo(-1); |
| setElementsPerPeriod(0); |
| if (config.stop != null) { |
| setTo(config.stop); |
| } |
| if (config.period != null) { |
| setPeriod(Duration.millis(config.period)); |
| } |
| if (config.maxReadTime != null) { |
| setMaxReadTime(Duration.millis(config.maxReadTime)); |
| } |
| if (config.elementsPerPeriod != null) { |
| setElementsPerPeriod(config.elementsPerPeriod); |
| } |
| return build(); |
| } |
| } |
| |
| /** Exposes GenerateSequence as an external transform for cross-language usage. */ |
| @AutoService(ExternalTransformRegistrar.class) |
| public static class External implements ExternalTransformRegistrar { |
| |
| public static final String URN = "beam:external:java:generate_sequence:v1"; |
| |
| @Override |
| public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() { |
| return ImmutableMap.of(URN, AutoValue_GenerateSequence.Builder.class); |
| } |
| |
| /** Parameters class to expose the transform to an external SDK. */ |
| public static class ExternalConfiguration { |
| private Long start; |
| @Nullable private Long stop; |
| @Nullable private Long period; |
| @Nullable private Long maxReadTime; |
| @Nullable private Long elementsPerPeriod; |
| |
| public void setStart(Long start) { |
| this.start = start; |
| } |
| |
| public void setStop(@Nullable Long stop) { |
| this.stop = stop; |
| } |
| |
| public void setPeriod(@Nullable Long period) { |
| this.period = period; |
| } |
| |
| public void setMaxReadTime(@Nullable Long maxReadTime) { |
| this.maxReadTime = maxReadTime; |
| } |
| |
| public void setElementsPerPeriod(@Nullable Long elementsPerPeriod) { |
| this.elementsPerPeriod = elementsPerPeriod; |
| } |
| } |
| } |
| |
| /** Specifies the minimum number to generate (inclusive). */ |
| public static GenerateSequence from(long from) { |
| checkArgument(from >= 0, "Value of from must be non-negative, but was: %s", from); |
| return new AutoValue_GenerateSequence.Builder() |
| .setFrom(from) |
| .setTo(-1) |
| .setElementsPerPeriod(0) |
| .build(); |
| } |
| |
| /** Specifies the maximum number to generate (exclusive). */ |
| public GenerateSequence to(long to) { |
| checkArgument( |
| getTo() == -1 || getTo() >= getFrom(), "Degenerate range [%s, %s)", getFrom(), getTo()); |
| return toBuilder().setTo(to).build(); |
| } |
| |
| /** Specifies the function to use to assign timestamps to the elements. */ |
| public GenerateSequence withTimestampFn(SerializableFunction<Long, Instant> timestampFn) { |
| return toBuilder().setTimestampFn(timestampFn).build(); |
| } |
| |
| /** Specifies to generate at most a given number of elements per a given period. */ |
| public GenerateSequence withRate(long numElements, Duration periodLength) { |
| checkArgument( |
| numElements > 0, |
| "Number of elements in withRate must be positive, but was: %s", |
| numElements); |
| checkArgument(periodLength != null, "periodLength can not be null"); |
| return toBuilder().setElementsPerPeriod(numElements).setPeriod(periodLength).build(); |
| } |
| |
| /** Specifies to stop generating elements after the given time. */ |
| public GenerateSequence withMaxReadTime(Duration maxReadTime) { |
| return toBuilder().setMaxReadTime(maxReadTime).build(); |
| } |
| |
| @Override |
| public PCollection<Long> expand(PBegin input) { |
| boolean isRangeUnbounded = getTo() < 0; |
| boolean usesUnboundedFeatures = |
| getTimestampFn() != null || getElementsPerPeriod() > 0 || getMaxReadTime() != null; |
| if (!isRangeUnbounded && !usesUnboundedFeatures) { |
| // This is the only case when we can use the bounded CountingSource. |
| return input.apply(Read.from(CountingSource.createSourceForSubrange(getFrom(), getTo()))); |
| } |
| |
| CountingSource.UnboundedCountingSource source = CountingSource.createUnboundedFrom(getFrom()); |
| if (getTimestampFn() != null) { |
| source = source.withTimestampFn(getTimestampFn()); |
| } |
| if (getElementsPerPeriod() > 0) { |
| source = source.withRate(getElementsPerPeriod(), getPeriod()); |
| } |
| |
| Read.Unbounded<Long> readUnbounded = Read.from(source); |
| |
| if (getMaxReadTime() == null) { |
| if (isRangeUnbounded) { |
| return input.apply(readUnbounded); |
| } else { |
| return input.apply(readUnbounded.withMaxNumRecords(getTo() - getFrom())); |
| } |
| } else { |
| BoundedReadFromUnboundedSource<Long> withMaxReadTime = |
| readUnbounded.withMaxReadTime(getMaxReadTime()); |
| if (isRangeUnbounded) { |
| return input.apply(withMaxReadTime); |
| } else { |
| return input.apply(withMaxReadTime.withMaxNumRecords(getTo() - getFrom())); |
| } |
| } |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| builder.add(DisplayData.item("from", getFrom()).withLabel("Generate sequence from")); |
| builder.addIfNotDefault( |
| DisplayData.item("to", getTo()).withLabel("Generate sequence to (exclusive)"), -1L); |
| builder.addIfNotNull( |
| DisplayData.item( |
| "timestampFn", getTimestampFn() == null ? null : getTimestampFn().getClass()) |
| .withLabel("Timestamp Function")); |
| builder.addIfNotNull( |
| DisplayData.item("maxReadTime", getMaxReadTime()).withLabel("Maximum Read Time")); |
| if (getElementsPerPeriod() > 0) { |
| builder.add( |
| DisplayData.item("elementsPerPeriod", getElementsPerPeriod()) |
| .withLabel("Elements per period")); |
| builder.add(DisplayData.item("period", getPeriod()).withLabel("Period")); |
| } |
| } |
| } |