| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.synthetic; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.fasterxml.jackson.databind.annotation.JsonDeserialize; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.commons.math3.distribution.ConstantRealDistribution; |
| import org.joda.time.Duration; |
| |
| /** |
| * Synthetic bounded source options. These options are all JSON, see documentations of individual |
| * fields for details. {@code SyntheticSourceOptions} uses jackson annotations which |
| * PipelineOptionsFactory can use to parse and construct an instance. |
| */ |
| public class SyntheticSourceOptions extends SyntheticOptions { |
| private static final long serialVersionUID = 0; |
| |
| /** Total number of generated records. */ |
| @JsonProperty public long numRecords; |
| |
| /** |
| * Only records whose index is a multiple of this will be split points. 0 means the source is not |
| * dynamically splittable (but is perfectly statically splittable). In that case it also doesn't |
| * report progress at all. |
| */ |
| @JsonProperty public long splitPointFrequencyRecords = 1; |
| |
| /** |
| * Distribution for generating initial split bundles. |
| * |
| * <p>When splitting into "desiredBundleSizeBytes", we'll compute the desired number of bundles N, |
| * then sample this many numbers from this distribution, normalize their sum to 1, and use that as |
| * the boundaries of generated bundles. |
| * |
| * <p>The Zipf distribution is expected to be particularly useful here. |
| * |
| * <p>E.g., empirically, with 100 bundles, the Zipf distribution with a parameter of 3.5 will |
| * generate bundles where the largest is about 3x-10x larger than the median; with a parameter of |
| * 3.0 this ratio will be about 5x-50x; with 2.5, 5x-100x (i.e. 1 bundle can be as large as all |
| * others combined). |
| */ |
| @JsonDeserialize(using = SamplerDeserializer.class) |
| public Sampler bundleSizeDistribution = fromRealDistribution(new ConstantRealDistribution(1)); |
| |
| /** |
| * If specified, this source will split into exactly this many bundles regardless of the hints |
| * provided by the service. |
| */ |
| @JsonProperty public Integer forceNumInitialBundles; |
| |
| /** See {@link ProgressShape}. */ |
| @JsonProperty public ProgressShape progressShape = ProgressShape.LINEAR; |
| |
| /** |
| * The distribution for the delay when reading from synthetic source starts. This delay is |
| * independent of the per-record delay and uses the same types of distributions as {@link |
| * #delayDistribution}. |
| */ |
| @JsonDeserialize(using = SamplerDeserializer.class) |
| final Sampler initializeDelayDistribution = fromRealDistribution(new ConstantRealDistribution(0)); |
| |
| /** |
| * Generates a random delay value for the synthetic source initialization using the distribution |
| * defined by {@link #initializeDelayDistribution}. |
| */ |
| public Duration nextInitializeDelay(long seed) { |
| return Duration.millis((long) initializeDelayDistribution.sample(seed)); |
| } |
| |
| /** |
| * The delay between event and processing time. uses same types of distributions as any other |
| * delay in {@link SyntheticSourceOptions}. |
| * |
| * <p>Example: we can use ConstantRealDistribution(10) to simulate constant 10 millis delay |
| * between event and processing times for each record generated by UnboundedSyntheticSource. |
| */ |
| @JsonDeserialize(using = SamplerDeserializer.class) |
| Sampler processingTimeDelayDistribution = fromRealDistribution(new ConstantRealDistribution(0)); |
| |
| /** |
| * Generates a random delay value between event and processing time using the distribution defined |
| * by {@link #processingTimeDelayDistribution}. |
| */ |
| public Duration nextProcessingTimeDelay(long seed) { |
| return Duration.millis((long) processingTimeDelayDistribution.sample(seed)); |
| } |
| |
| /** |
| * Defines how many elements should the watermark function check in advance to "predict" how the |
| * record distribution will look like. |
| */ |
| @JsonProperty public Integer watermarkSearchInAdvanceCount = 100; |
| |
| /** |
| * Could be either positive and negative. Positive drift will "push away" the watermark from the |
| * actual records event times. Negative will bring it closer, possibly causing some events to be |
| * "late". |
| * |
| * <p>By default there is no drift at all. |
| */ |
| @JsonProperty public Integer watermarkDriftMillis = 0; |
| |
| @Override |
| public void validate() { |
| super.validate(); |
| checkArgument( |
| numRecords >= 0, "numRecords should be a non-negative number, but found %s.", numRecords); |
| checkNotNull(bundleSizeDistribution, "bundleSizeDistribution"); |
| checkArgument( |
| forceNumInitialBundles == null || forceNumInitialBundles > 0, |
| "forceNumInitialBundles, if specified, must be positive, but found %s", |
| forceNumInitialBundles); |
| checkArgument( |
| splitPointFrequencyRecords >= 0, |
| "splitPointFrequencyRecords must be non-negative, but found %s", |
| splitPointFrequencyRecords); |
| } |
| |
| public Record genRecord(long position) { |
| // This method is supposed to generate random records deterministically, |
| // so that results can be reproduced by running the same scenario a second time. |
| // We need to initiate a Random object for each position to make the record deterministic |
| // because liquid sharding could split the Source at any position. |
| // And we also need a seed to initiate a Random object. The mapping from the position to |
| // the seed should be fixed. Using the position as seed to feed Random objects will cause the |
| // generated values to not be random enough because the position values are |
| // close to each other. To make seeds fed into the Random objects unrelated, |
| // we use a hashing function to map the position to its corresponding hashcode, |
| // and use the hashcode as a seed to feed into the Random object. |
| long hashCodeOfPosition = hashFunction().hashLong(position).asLong(); |
| return new Record(genKvPair(hashCodeOfPosition), nextDelay(hashCodeOfPosition)); |
| } |
| |
| /** Record generated by {@link #genRecord}. */ |
| public static class Record { |
| public final KV<byte[], byte[]> kv; |
| public final Duration sleepMsec; |
| |
| Record(KV<byte[], byte[]> kv, long sleepMsec) { |
| this.kv = kv; |
| this.sleepMsec = new Duration(sleepMsec); |
| } |
| } |
| |
| /** |
| * Shape of the progress reporting curve as a function of the current offset in the {@link |
| * SyntheticBoundedSource}. |
| */ |
| public enum ProgressShape { |
| /** Reported progress grows linearly from 0 to 1. */ |
| LINEAR, |
| /** Reported progress decreases linearly from 0.9 to 0.1. */ |
| LINEAR_REGRESSING, |
| } |
| } |