blob: 668fa3cbb3056a2237d3644f66f125303eeeca16 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import static;
import com.amazonaws.regions.Regions;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* {@link PTransform}s for reading from and writing to <a
* href="">Kinesis</a> streams.
* <h3>Reading from Kinesis</h3>
* <p>Example usage:
* <pre>{@code
* p.apply(
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withAWSClientsProvider("AWS_KEY", _"AWS_SECRET", STREAM_REGION)
* .apply( ... ) // other transformations
* }</pre>
* <p>As you can see you need to provide 3 things:
* <ul>
* <li>name of the stream you're going to read
* <li>position in the stream where reading should start. There are two options:
* <ul>
* <li>{@link InitialPositionInStream#LATEST} - reading will begin from end of the stream
* <li>{@link InitialPositionInStream#TRIM_HORIZON} - reading will begin at the very
* beginning of the stream
* </ul>
* <li>data used to initialize {@link AmazonKinesis} and {@link AmazonCloudWatch} clients:
* <ul>
* <li>credentials (aws key, aws secret)
* <li>region where the stream is located
* </ul>
* </ul>
* <p>In case when you want to set up {@link AmazonKinesis} or {@link AmazonCloudWatch} client by
* your own (for example if you're using more sophisticated authorization methods like Amazon STS,
* etc.) you can do it by implementing {@link AWSClientsProvider} class:
* <pre>{@code
* public class MyCustomKinesisClientProvider implements AWSClientsProvider {
* {@literal @}Override
* public AmazonKinesis getKinesisClient() {
* // set up your client here
* }
* public AmazonCloudWatch getCloudWatchClient() {
* // set up your client here
* }
* }
* }</pre>
* <p>Usage is pretty straightforward:
* <pre>{@code
* p.apply(
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withAWSClientsProvider(new MyCustomKinesisClientProvider())
* .apply( ... ) // other transformations
* }</pre>
* <p>There’s also possibility to start reading using arbitrary point in time - in this case you
* need to provide {@link Instant} object:
* <pre>{@code
* p.apply(
* .withStreamName("streamName")
* .withInitialTimestampInStream(instant)
* .withAWSClientsProvider(new MyCustomKinesisClientProvider())
* .apply( ... ) // other transformations
* }</pre>
* <p>Kinesis IO uses ArrivalTimeWatermarkPolicy by default. To use Processing time as event time:
* <pre>{@code
* p.apply(
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withProcessingTimeWatermarkPolicy())
* }</pre>
* <p>It is also possible to specify a custom watermark policy to control watermark computation.
* Below is an example
* <pre>{@code
* // custom policy
* class MyCustomPolicy implements WatermarkPolicy {
* private WatermarkPolicyFactory.CustomWatermarkPolicy customWatermarkPolicy;
* MyCustomPolicy() {
* this.customWatermarkPolicy = new WatermarkPolicyFactory.CustomWatermarkPolicy(WatermarkParameters.create());
* }
* @Override
* public Instant getWatermark() {
* return customWatermarkPolicy.getWatermark();
* }
* @Override
* public void update(KinesisRecord record) {
* customWatermarkPolicy.update(record);
* }
* }
* // custom factory
* class MyCustomPolicyFactory implements WatermarkPolicyFactory {
* @Override
* public WatermarkPolicy createWatermarkPolicy() {
* return new MyCustomPolicy();
* }
* }
* p.apply(
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withCustomWatermarkPolicy(new MyCustomPolicyFactory())
* }</pre>
* <h3>Writing to Kinesis</h3>
* <p>Example usage:
* <pre>{@code
* PCollection<byte[]> data = ...;
* data.apply(KinesisIO.write()
* .withStreamName("streamName")
* .withPartitionKey("partitionKey")
* }</pre>
* <p>As a client, you need to provide at least 3 things:
* <ul>
* <li>name of the stream where you're going to write
* <li>partition key (or implementation of {@link KinesisPartitioner}) that defines which
* partition will be used for writing
* <li>data used to initialize {@link AmazonKinesis} and {@link AmazonCloudWatch} clients:
* <ul>
* <li>credentials (aws key, aws secret)
* <li>region where the stream is located
* </ul>
* </ul>
* <p>In case if you need to define more complicated logic for key partitioning then you can create
* your own implementation of {@link KinesisPartitioner} and set it by {@link
* KinesisIO.Write#withPartitioner(KinesisPartitioner)}
* <p>Internally, {@link KinesisIO.Write} relies on Amazon Kinesis Producer Library (KPL). This
* library can be configured with a set of {@link Properties} if needed.
* <p>Example usage of KPL configuration:
* <pre>{@code
* Properties properties = new Properties();
* properties.setProperty("KinesisEndpoint", "localhost");
* properties.setProperty("KinesisPort", "4567");
* PCollection<byte[]> data = ...;
* data.apply(KinesisIO.write()
* .withStreamName("streamName")
* .withPartitionKey("partitionKey")
* .withProducerProperties(properties));
* }</pre>
* <p>For more information about configuratiom parameters, see the <a
* href="">sample
* of configuration file</a>.
public final class KinesisIO {
private static final Logger LOG = LoggerFactory.getLogger(KinesisIO.class);
private static final int DEFAULT_NUM_RETRIES = 6;
/** Returns a new {@link Read} transform for reading from Kinesis. */
public static Read read() {
return new AutoValue_KinesisIO_Read.Builder()
/** A {@link PTransform} writing data to Kinesis. */
public static Write write() {
return new AutoValue_KinesisIO_Write.Builder().setRetries(DEFAULT_NUM_RETRIES).build();
/** Implementation of {@link #read}. */
public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
abstract String getStreamName();
abstract StartingPoint getInitialPosition();
abstract AWSClientsProvider getAWSClientsProvider();
abstract long getMaxNumRecords();
abstract Duration getMaxReadTime();
abstract Duration getUpToDateThreshold();
abstract Integer getRequestRecordsLimit();
abstract WatermarkPolicyFactory getWatermarkPolicyFactory();
abstract Integer getMaxCapacityPerShard();
abstract Builder toBuilder();
abstract static class Builder {
abstract Builder setStreamName(String streamName);
abstract Builder setInitialPosition(StartingPoint startingPoint);
abstract Builder setAWSClientsProvider(AWSClientsProvider clientProvider);
abstract Builder setMaxNumRecords(long maxNumRecords);
abstract Builder setMaxReadTime(Duration maxReadTime);
abstract Builder setUpToDateThreshold(Duration upToDateThreshold);
abstract Builder setRequestRecordsLimit(Integer limit);
abstract Builder setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory);
abstract Builder setMaxCapacityPerShard(Integer maxCapacity);
abstract Read build();
/** Specify reading from streamName. */
public Read withStreamName(String streamName) {
return toBuilder().setStreamName(streamName).build();
/** Specify reading from some initial position in stream. */
public Read withInitialPositionInStream(InitialPositionInStream initialPosition) {
return toBuilder().setInitialPosition(new StartingPoint(initialPosition)).build();
* Specify reading beginning at given {@link Instant}. This {@link Instant} must be in the past,
* i.e. before {@link Instant#now()}.
public Read withInitialTimestampInStream(Instant initialTimestamp) {
return toBuilder().setInitialPosition(new StartingPoint(initialTimestamp)).build();
* Allows to specify custom {@link AWSClientsProvider}. {@link AWSClientsProvider} provides
* {@link AmazonKinesis} and {@link AmazonCloudWatch} instances which are later used for
* communication with Kinesis. You should use this method if {@link
* Read#withAWSClientsProvider(String, String, Regions)} does not suit your needs.
public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) {
return toBuilder().setAWSClientsProvider(awsClientsProvider).build();
* Specify credential details and region to be used to read from Kinesis. If you need more
* sophisticated credential protocol, then you should look at {@link
* Read#withAWSClientsProvider(AWSClientsProvider)}.
public Read withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) {
return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
* Specify credential details and region to be used to read from Kinesis. If you need more
* sophisticated credential protocol, then you should look at {@link
* Read#withAWSClientsProvider(AWSClientsProvider)}.
* <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
* the tests with a kinesis service emulator.
public Read withAWSClientsProvider(
String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) {
return withAWSClientsProvider(
new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
/** Specifies to read at most a given number of records. */
public Read withMaxNumRecords(long maxNumRecords) {
maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
return toBuilder().setMaxNumRecords(maxNumRecords).build();
/** Specifies to read records during {@code maxReadTime}. */
public Read withMaxReadTime(Duration maxReadTime) {
checkArgument(maxReadTime != null, "maxReadTime can not be null");
return toBuilder().setMaxReadTime(maxReadTime).build();
* Specifies how late records consumed by this source can be to still be considered on time.
* When this limit is exceeded the actual backlog size will be evaluated and the runner might
* decide to scale the amount of resources allocated to the pipeline in order to speed up
* ingestion.
public Read withUpToDateThreshold(Duration upToDateThreshold) {
checkArgument(upToDateThreshold != null, "upToDateThreshold can not be null");
return toBuilder().setUpToDateThreshold(upToDateThreshold).build();
* Specifies the maximum number of records in GetRecordsResult returned by GetRecords call which
* is limited by 10K records. If should be adjusted according to average size of data record to
* prevent shard overloading. More details can be found here: <a
* href="">API_GetRecords</a>
public Read withRequestRecordsLimit(int limit) {
checkArgument(limit > 0, "limit must be positive, but was: %s", limit);
checkArgument(limit <= 10_000, "limit must be up to 10,000, but was: %s", limit);
return toBuilder().setRequestRecordsLimit(limit).build();
/** Specifies the {@code WatermarkPolicyFactory} as ArrivalTimeWatermarkPolicyFactory. */
public Read withArrivalTimeWatermarkPolicy() {
return toBuilder()
* Specifies the {@code WatermarkPolicyFactory} as ArrivalTimeWatermarkPolicyFactory.
* <p>{@param watermarkIdleDurationThreshold} Denotes the duration for which the watermark can
* be idle.
public Read withArrivalTimeWatermarkPolicy(Duration watermarkIdleDurationThreshold) {
return toBuilder()
/** Specifies the {@code WatermarkPolicyFactory} as ProcessingTimeWatermarkPolicyFactory. */
public Read withProcessingTimeWatermarkPolicy() {
return toBuilder()
* Specifies the {@code WatermarkPolicyFactory} as a custom watermarkPolicyFactory.
* @param watermarkPolicyFactory Custom Watermark policy factory.
public Read withCustomWatermarkPolicy(WatermarkPolicyFactory watermarkPolicyFactory) {
checkArgument(watermarkPolicyFactory != null, "watermarkPolicyFactory cannot be null");
return toBuilder().setWatermarkPolicyFactory(watermarkPolicyFactory).build();
/** Specifies the maximum number of messages per one shard. */
public Read withMaxCapacityPerShard(Integer maxCapacity) {
checkArgument(maxCapacity > 0, "maxCapacity must be positive, but was: %s", maxCapacity);
return toBuilder().setMaxCapacityPerShard(maxCapacity).build();
public PCollection<KinesisRecord> expand(PBegin input) {
Unbounded<KinesisRecord> unbounded =
new KinesisSource(
PTransform<PBegin, PCollection<KinesisRecord>> transform = unbounded;
if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
transform =
return input.apply(transform);
/** Implementation of {@link #write}. */
public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> {
abstract String getStreamName();
abstract String getPartitionKey();
abstract KinesisPartitioner getPartitioner();
abstract Properties getProducerProperties();
abstract AWSClientsProvider getAWSClientsProvider();
abstract int getRetries();
abstract Builder builder();
abstract static class Builder {
abstract Builder setStreamName(String streamName);
abstract Builder setPartitionKey(String partitionKey);
abstract Builder setPartitioner(KinesisPartitioner partitioner);
abstract Builder setProducerProperties(Properties properties);
abstract Builder setAWSClientsProvider(AWSClientsProvider clientProvider);
abstract Builder setRetries(int retries);
abstract Write build();
/** Specify Kinesis stream name which will be used for writing, this name is required. */
public Write withStreamName(String streamName) {
return builder().setStreamName(streamName).build();
* Specify default partition key.
* <p>In case if you need to define more complicated logic for key partitioning then you can
* create your own implementation of {@link KinesisPartitioner} and specify it by {@link
* KinesisIO.Write#withPartitioner(KinesisPartitioner)}
* <p>Using one of the methods {@link KinesisIO.Write#withPartitioner(KinesisPartitioner)} or
* {@link KinesisIO.Write#withPartitionKey(String)} is required but not both in the same time.
public Write withPartitionKey(String partitionKey) {
return builder().setPartitionKey(partitionKey).build();
* Allows to specify custom implementation of {@link KinesisPartitioner}.
* <p>This method should be used to balance a distribution of new written records among all
* stream shards.
* <p>Using one of the methods {@link KinesisIO.Write#withPartitioner(KinesisPartitioner)} or
* {@link KinesisIO.Write#withPartitionKey(String)} is required but not both in the same time.
public Write withPartitioner(KinesisPartitioner partitioner) {
return builder().setPartitioner(partitioner).build();
* Specify the configuration properties for Kinesis Producer Library (KPL).
* <p>Example of creating new KPL configuration:
* <p>{@code Properties properties = new Properties();
* properties.setProperty("CollectionMaxCount", "1000");
* properties.setProperty("ConnectTimeout", "10000");}
public Write withProducerProperties(Properties properties) {
return builder().setProducerProperties(properties).build();
* Allows to specify custom {@link AWSClientsProvider}. {@link AWSClientsProvider} creates new
* {@link IKinesisProducer} which is later used for writing to Kinesis.
* <p>This method should be used if {@link Write#withAWSClientsProvider(String, String,
* Regions)} does not suit well.
public Write withAWSClientsProvider(AWSClientsProvider awsClientsProvider) {
return builder().setAWSClientsProvider(awsClientsProvider).build();
* Specify credential details and region to be used to write to Kinesis. If you need more
* sophisticated credential protocol, then you should look at {@link
* Write#withAWSClientsProvider(AWSClientsProvider)}.
public Write withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) {
return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
* Specify credential details and region to be used to write to Kinesis. If you need more
* sophisticated credential protocol, then you should look at {@link
* Write#withAWSClientsProvider(AWSClientsProvider)}.
* <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
* the tests with Kinesis service emulator.
public Write withAWSClientsProvider(
String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) {
return withAWSClientsProvider(
new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
* Specify the number of retries that will be used to flush the outstanding records in case if
* they were not flushed from the first time. Default number of retries is {@code
* <p>This is used for testing.
Write withRetries(int retries) {
return builder().setRetries(retries).build();
public PDone expand(PCollection<byte[]> input) {
checkArgument(getStreamName() != null, "withStreamName() is required");
(getPartitionKey() != null) || (getPartitioner() != null),
"withPartitionKey() or withPartitioner() is required");
getPartitionKey() == null || (getPartitioner() == null),
"only one of either withPartitionKey() or withPartitioner() is possible");
checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() is required");
input.apply(ParDo.of(new KinesisWriterFn(this)));
private static class KinesisWriterFn extends DoFn<byte[], Void> {
private static final int MAX_NUM_FAILURES = 10;
private final KinesisIO.Write spec;
private static transient IKinesisProducer producer;
private transient KinesisPartitioner partitioner;
private transient LinkedBlockingDeque<KinesisWriteException> failures;
private transient List<Future<UserRecordResult>> putFutures;
KinesisWriterFn(KinesisIO.Write spec) {
this.spec = spec;
public void setup() {
// Use custom partitioner if it exists
if (spec.getPartitioner() != null) {
partitioner = spec.getPartitioner();
public void startBundle() {
putFutures = Collections.synchronizedList(new ArrayList<>());
/** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
private synchronized void initKinesisProducer() {
// Init producer config
Properties props = spec.getProducerProperties();
if (props == null) {
props = new Properties();
KinesisProducerConfiguration config = KinesisProducerConfiguration.fromProperties(props);
// Fix to avoid the following message "WARNING: Exception during updateCredentials" during
// producer.destroy() call. More details can be found in this thread:
// Init Kinesis producer
if (producer == null) {
producer = spec.getAWSClientsProvider().createKinesisProducer(config);
private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
* It adds a record asynchronously which then should be delivered by Kinesis producer in
* background (Kinesis producer forks native processes to do this job).
* <p>The records can be batched and then they will be sent in one HTTP request. Amazon KPL
* supports two types of batching - aggregation and collection - and they can be configured by
* producer properties.
* <p>More details can be found here: <a
* href="">KPL Key
* Concepts</a> and <a
* href="">Configuring
* the KPL</a>
public void processElement(ProcessContext c) {
ByteBuffer data = ByteBuffer.wrap(c.element());
String partitionKey = spec.getPartitionKey();
String explicitHashKey = null;
// Use custom partitioner
if (partitioner != null) {
partitionKey = partitioner.getPartitionKey(c.element());
explicitHashKey = partitioner.getExplicitHashKey(c.element());
ListenableFuture<UserRecordResult> f =
producer.addUserRecord(spec.getStreamName(), partitionKey, explicitHashKey, data);
public void finishBundle() throws Exception {
* Flush outstanding records until the total number of failed records will be less than 0 or
* the number of retries will be exhausted. The retry timeout starts from 1 second and it
* doubles on every iteration.
private void flushBundle() throws InterruptedException, ExecutionException, IOException {
int retries = spec.getRetries();
int numFailedRecords;
int retryTimeout = 1000; // initial timeout, 1 sec
String message = "";
do {
numFailedRecords = 0;
// Wait for puts to finish and check the results
for (Future<UserRecordResult> f : putFutures) {
UserRecordResult result = f.get(); // this does block
if (!result.isSuccessful()) {
// wait until outstanding records will be flushed
retryTimeout *= 2; // exponential backoff
} while (numFailedRecords > 0 && retries-- > 0);
if (numFailedRecords > 0) {
for (Future<UserRecordResult> f : putFutures) {
UserRecordResult result = f.get();
if (!result.isSuccessful()) {
new KinesisWriteException(
"Put record was not successful.", new UserRecordFailedException(result)));
message =
"After [%d] retries, number of failed records [%d] is still greater than 0",
spec.getRetries(), numFailedRecords);
/** If any write has asynchronously failed, fail the bundle with a useful error. */
private void checkForFailures(String message) throws IOException {
if (failures.isEmpty()) {
StringBuilder logEntry = new StringBuilder();
int i = 0;
while (!failures.isEmpty()) {
KinesisWriteException exc = failures.remove();
Throwable cause = exc.getCause();
if (cause != null) {
logEntry.append(": ").append(cause.getMessage());
if (cause instanceof UserRecordFailedException) {
List<Attempt> attempts =
((UserRecordFailedException) cause).getResult().getAttempts();
for (Attempt attempt : attempts) {
if (attempt.getErrorMessage() != null) {
String errorMessage =
"Some errors occurred writing to Kinesis. First %d errors: %s",
i, logEntry.toString());
throw new IOException(errorMessage);
public void teardown() throws Exception {
if (producer != null && producer.getOutstandingRecordsCount() > 0) {
producer = null;
/** An exception that puts information about the failed record. */
static class KinesisWriteException extends IOException {
KinesisWriteException(String message, Throwable cause) {
super(message, cause);