blob: 01d482edc8adc548c446638e7e5a35ad92cbb837 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.sdk.kinesis.egress;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
/**
* A builder for creating an {@link EgressSpec} for writing data to AWS Kinesis.
*
* @param <T> The type written to AWS Kinesis.
*/
public final class KinesisEgressBuilder<T> {
private final EgressIdentifier<T> id;
private Class<? extends KinesisEgressSerializer<T>> serializerClass;
private int maxOutstandingRecords = 1000;
private AwsRegion awsRegion = AwsRegion.fromDefaultProviderChain();
private AwsCredentials awsCredentials = AwsCredentials.fromDefaultProviderChain();
private final Properties properties = new Properties();
private KinesisEgressBuilder(EgressIdentifier<T> id) {
this.id = Objects.requireNonNull(id);
}
/**
* @param id A unique egress identifier.
* @param <T> The type consumed from Kinesis.
* @return A new {@link KinesisEgressBuilder}.
*/
public static <T> KinesisEgressBuilder<T> forIdentifier(EgressIdentifier<T> id) {
return new KinesisEgressBuilder<>(id);
}
/**
* @param serializerClass The serializer used to convert from Java objects to Kinesis's byte
* messages.
*/
public KinesisEgressBuilder<T> withSerializer(
Class<? extends KinesisEgressSerializer<T>> serializerClass) {
this.serializerClass = Objects.requireNonNull(serializerClass);
return this;
}
/**
* The AWS region to connect to. By default, AWS's default provider chain is consulted.
*
* @param awsRegion The AWS region to connect to.
* @see <a
* href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-region-selection.html#automatically-determine-the-aws-region-from-the-environment">Automatically
* Determine the AWS Region from the Environment</a>.
* @see AwsRegion
*/
public KinesisEgressBuilder<T> withAwsRegion(AwsRegion awsRegion) {
this.awsRegion = Objects.requireNonNull(awsRegion);
return this;
}
/**
* The AWS region to connect to, specified by the AWS region's unique id. By default, AWS's
* default provider chain is consulted.
*
* @param regionName The unique id of the AWS region to connect to.
*/
public KinesisEgressBuilder<T> withAwsRegion(String regionName) {
this.awsRegion = AwsRegion.ofId(regionName);
return this;
}
/**
* The AWS credentials to use. By default, AWS's default provider chain is consulted.
*
* @param awsCredentials The AWS credentials to use.
* @see <a
* href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default">Using
* the Default Credential Provider Chain</a>.
* @see AwsCredentials
*/
public KinesisEgressBuilder<T> withAwsCredentials(AwsCredentials awsCredentials) {
this.awsCredentials = Objects.requireNonNull(awsCredentials);
return this;
}
/**
* The maximum number of buffered outstanding records, before backpressure is applied by the
* egress.
*
* @param maxOutstandingRecords the maximum number of buffered outstanding records
*/
public KinesisEgressBuilder<T> withMaxOutstandingRecords(int maxOutstandingRecords) {
if (maxOutstandingRecords <= 0) {
throw new IllegalArgumentException("Max outstanding records must be larger than 0.");
}
this.maxOutstandingRecords = maxOutstandingRecords;
return this;
}
/**
* Sets a AWS client configuration to be used by the egress.
*
* <p>Supported values are properties of AWS's <a
* href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/latest/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">ccom.amazonaws.services.kinesis.producer.KinesisProducerConfiguration</a>.
* Please see <a
* href="https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties">Default
* Configuration Properties</a> for a full list of the keys.
*
* @param key the property to set.
* @param value the value for the property.
* @see <a
* href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
* @deprecated Please use {@link #withProperty(String, String)} instead.
*/
public KinesisEgressBuilder<T> withClientConfigurationProperty(String key, String value) {
Objects.requireNonNull(key);
Objects.requireNonNull(value);
this.properties.setProperty(key, value);
return this;
}
public KinesisEgressBuilder<T> withProperty(String key, String value) {
Objects.requireNonNull(key);
Objects.requireNonNull(value);
this.properties.setProperty(key, value);
return this;
}
public KinesisEgressBuilder<T> withProperties(Properties properties) {
Objects.requireNonNull(properties);
this.properties.putAll(properties);
return this;
}
/** @return A new {@link KinesisEgressSpec}. */
public KinesisEgressSpec<T> build() {
return new KinesisEgressSpec<>(
id, serializerClass, maxOutstandingRecords, awsRegion, awsCredentials, properties);
}
}