blob: d51c26154328ca0efa957531d0c6fd4323eb5499 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.sparkreceiver;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.streaming.receiver.Receiver;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Streaming sources for Spark {@link Receiver}.
*
* <h3>Reading using {@link SparkReceiverIO}</h3>
*
* <p>You will need to pass a {@link ReceiverBuilder} which is responsible for instantiating new
* {@link Receiver} objects.
*
* <p>{@link Receiver} that will be used should implement {@link HasOffset} interface. You will need
* to pass {@code getOffsetFn} which is a {@link SerializableFunction} that defines how to get
* {@code Long offset} from {@code V record}.
*
* <p>Optionally you can pass {@code timestampFn} which is a {@link SerializableFunction} that
* defines how to get {@code Instant timestamp} from {@code V record}, you can pass {@code
* startOffset} which is inclusive start offset from which the reading should be started.
*
* <p>Optionally you can pass {@code pullFrequencySec} which is a delay in seconds between polling
* for new records updates. Also, you can pass {@code startPollTimeoutSec} which is delay in seconds
* before start polling.
*
* <p>Example of {@link SparkReceiverIO#read()} usage:
*
* <pre>{@code
* Pipeline p = ...; // Create pipeline.
*
* // Create ReceiverBuilder for CustomReceiver
* ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
* new ReceiverBuilder<>(CustomReceiver.class).withConstructorArgs();
*
* //Read from CustomReceiver
* p.apply("Spark Receiver Read",
* SparkReceiverIO.Read<String> reader =
* SparkReceiverIO.<String>read()
* .withGetOffsetFn(Long::valueOf)
* .withTimestampFn(Instant::parse)
* .withPullFrequencySec(1L)
* .withStartPollTimeoutSec(2L)
* .withStartOffset(10L)
* .withSparkReceiverBuilder(receiverBuilder);
* }</pre>
*/
public class SparkReceiverIO {
private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIO.class);
public static <V> Read<V> read() {
return new AutoValue_SparkReceiverIO_Read.Builder<V>().build();
}
/** A {@link PTransform} to read from Spark {@link Receiver}. */
@AutoValue
@AutoValue.CopyAnnotations
public abstract static class Read<V> extends PTransform<PBegin, PCollection<V>> {
abstract @Nullable ReceiverBuilder<V, ? extends Receiver<V>> getSparkReceiverBuilder();
abstract @Nullable SerializableFunction<V, Long> getGetOffsetFn();
abstract @Nullable SerializableFunction<V, Instant> getTimestampFn();
abstract @Nullable Long getPullFrequencySec();
abstract @Nullable Long getStartPollTimeoutSec();
abstract @Nullable Long getStartOffset();
abstract Builder<V> toBuilder();
@AutoValue.Builder
abstract static class Builder<V> {
abstract Builder<V> setSparkReceiverBuilder(
ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder);
abstract Builder<V> setGetOffsetFn(SerializableFunction<V, Long> getOffsetFn);
abstract Builder<V> setTimestampFn(SerializableFunction<V, Instant> timestampFn);
abstract Builder<V> setPullFrequencySec(Long pullFrequencySec);
abstract Builder<V> setStartPollTimeoutSec(Long startPollTimeoutSec);
abstract Builder<V> setStartOffset(Long startOffset);
abstract Read<V> build();
}
/** Sets {@link ReceiverBuilder} with value and custom Spark {@link Receiver} class. */
public Read<V> withSparkReceiverBuilder(
ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder) {
checkArgument(sparkReceiverBuilder != null, "Spark receiver builder can not be null");
return toBuilder().setSparkReceiverBuilder(sparkReceiverBuilder).build();
}
/** A function to get offset in order to start {@link Receiver} from it. */
public Read<V> withGetOffsetFn(SerializableFunction<V, Long> getOffsetFn) {
checkArgument(getOffsetFn != null, "Get offset function can not be null");
return toBuilder().setGetOffsetFn(getOffsetFn).build();
}
/** A function to calculate timestamp for a record. */
public Read<V> withTimestampFn(SerializableFunction<V, Instant> timestampFn) {
checkArgument(timestampFn != null, "Timestamp function can not be null");
return toBuilder().setTimestampFn(timestampFn).build();
}
/** Delay in seconds between polling for new records updates. */
public Read<V> withPullFrequencySec(Long pullFrequencySec) {
checkArgument(pullFrequencySec != null, "Pull frequency can not be null");
return toBuilder().setPullFrequencySec(pullFrequencySec).build();
}
/** Waiting time after the {@link Receiver} starts. Required to prepare for polling. */
public Read<V> withStartPollTimeoutSec(Long startPollTimeoutSec) {
checkArgument(startPollTimeoutSec != null, "Start poll timeout can not be null");
return toBuilder().setStartPollTimeoutSec(startPollTimeoutSec).build();
}
/** Inclusive start offset from which the reading should be started. */
public Read<V> withStartOffset(Long startOffset) {
checkArgument(startOffset != null, "Start offset can not be null");
return toBuilder().setStartOffset(startOffset).build();
}
@Override
public PCollection<V> expand(PBegin input) {
validateTransform();
return input.apply(new ReadFromSparkReceiverViaSdf<>(this));
}
public void validateTransform() {
ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder = getSparkReceiverBuilder();
checkStateNotNull(sparkReceiverBuilder, "withSparkReceiverBuilder() is required");
checkStateNotNull(getGetOffsetFn(), "withGetOffsetFn() is required");
}
}
static class ReadFromSparkReceiverViaSdf<V> extends PTransform<PBegin, PCollection<V>> {
private final Read<V> sparkReceiverRead;
ReadFromSparkReceiverViaSdf(Read<V> sparkReceiverRead) {
this.sparkReceiverRead = sparkReceiverRead;
}
@Override
public PCollection<V> expand(PBegin input) {
final ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder =
sparkReceiverRead.getSparkReceiverBuilder();
checkStateNotNull(sparkReceiverBuilder, "withSparkReceiverBuilder() is required");
if (!HasOffset.class.isAssignableFrom(sparkReceiverBuilder.getSparkReceiverClass())) {
throw new UnsupportedOperationException(
String.format(
"Given Spark Receiver class %s doesn't implement HasOffset interface,"
+ " therefore it is not supported!",
sparkReceiverBuilder.getSparkReceiverClass().getName()));
} else {
LOG.info("{} started reading", ReadFromSparkReceiverWithOffsetDoFn.class.getSimpleName());
return input
.apply(Impulse.create())
.apply(ParDo.of(new ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
// TODO: Split data from SparkReceiver into multiple workers
}
}
}
}