blob: 449dd1e3af16530768b17f2ec24e47eab98fa0b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws.sns;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.GetTopicAttributesResult;
import com.amazonaws.services.sns.model.InternalErrorException;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.http.HttpStatus;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link PTransform}s for writing to <a href="https://aws.amazon.com/sns/">SNS</a>.
*
* <h3>Writing to SNS</h3>
*
* <p>Example usage:
*
* <pre>{@code
* PCollection<PublishRequest> data = ...;
*
* data.apply(SnsIO.write()
* .withTopicName("topicName")
* .withRetryConfiguration(
* SnsIO.RetryConfiguration.create(
* 4, org.joda.time.Duration.standardSeconds(10)))
* .withAWSClientsProvider(new BasicSnsProvider(accessKey, secretKey, region))
* .withResultOutputTag(results));
* }</pre>
*
* <p>As a client, you need to provide at least the following things:
*
* <ul>
* <li>name of the SNS topic you're going to write to
* <li>retry configuration
* <li>need to specify AwsClientsProvider. You can pass on the default one BasicSnsProvider
* <li>an output tag where you can get results. Example in SnsIOTest
* </ul>
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public final class SnsIO {
// Write data tp SNS
public static Write write() {
return new AutoValue_SnsIO_Write.Builder().build();
}
/**
* A POJO encapsulating a configuration for retry behavior when issuing requests to SNS. A retry
* will be attempted until the maxAttempts or maxDuration is exceeded, whichever comes first, for
* any of the following exceptions:
*
* <ul>
* <li>{@link IOException}
* </ul>
*/
@AutoValue
public abstract static class RetryConfiguration implements Serializable {
@VisibleForTesting
static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();
abstract int getMaxAttempts();
abstract Duration getMaxDuration();
abstract RetryPredicate getRetryPredicate();
abstract Builder builder();
public static RetryConfiguration create(int maxAttempts, Duration maxDuration) {
checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0");
checkArgument(
maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
"maxDuration should be greater than 0");
return new AutoValue_SnsIO_RetryConfiguration.Builder()
.setMaxAttempts(maxAttempts)
.setMaxDuration(maxDuration)
.setRetryPredicate(DEFAULT_RETRY_PREDICATE)
.build();
}
@AutoValue.Builder
abstract static class Builder {
abstract SnsIO.RetryConfiguration.Builder setMaxAttempts(int maxAttempts);
abstract SnsIO.RetryConfiguration.Builder setMaxDuration(Duration maxDuration);
abstract SnsIO.RetryConfiguration.Builder setRetryPredicate(RetryPredicate retryPredicate);
abstract SnsIO.RetryConfiguration build();
}
/**
* An interface used to control if we retry the SNS Publish call when a {@link Throwable}
* occurs. If {@link RetryPredicate#test(Object)} returns true, {@link Write} tries to resend
* the requests to the Solr server if the {@link RetryConfiguration} permits it.
*/
@FunctionalInterface
interface RetryPredicate extends Predicate<Throwable>, Serializable {}
private static class DefaultRetryPredicate implements RetryPredicate {
private static final ImmutableSet<Integer> ELIGIBLE_CODES =
ImmutableSet.of(HttpStatus.SC_SERVICE_UNAVAILABLE);
@Override
public boolean test(Throwable throwable) {
return (throwable instanceof IOException
|| (throwable instanceof InternalErrorException)
|| (throwable instanceof InternalErrorException
&& ELIGIBLE_CODES.contains(((InternalErrorException) throwable).getStatusCode())));
}
}
}
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write
extends PTransform<PCollection<PublishRequest>, PCollectionTuple> {
@Nullable
abstract String getTopicName();
@Nullable
abstract AwsClientsProvider getAWSClientsProvider();
@Nullable
abstract RetryConfiguration getRetryConfiguration();
@Nullable
abstract TupleTag<PublishResult> getResultOutputTag();
abstract Builder builder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setTopicName(String topicName);
abstract Builder setAWSClientsProvider(AwsClientsProvider clientProvider);
abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration);
abstract Builder setResultOutputTag(TupleTag<PublishResult> results);
abstract Write build();
}
/**
* Specify the SNS topic which will be used for writing, this name is mandatory.
*
* @param topicName topicName
*/
public Write withTopicName(String topicName) {
return builder().setTopicName(topicName).build();
}
/**
* Allows to specify custom {@link AwsClientsProvider}. {@link AwsClientsProvider} creates new
* {@link AmazonSNS} which is later used for writing to a SNS topic.
*/
public Write withAWSClientsProvider(AwsClientsProvider awsClientsProvider) {
return builder().setAWSClientsProvider(awsClientsProvider).build();
}
/**
* Specify credential details and region to be used to write to SNS. If you need more
* sophisticated credential protocol, then you should look at {@link
* Write#withAWSClientsProvider(AwsClientsProvider)}.
*/
public Write withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) {
return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
}
/**
* Specify credential details and region to be used to write to SNS. If you need more
* sophisticated credential protocol, then you should look at {@link
* Write#withAWSClientsProvider(AwsClientsProvider)}.
*
* <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
* the tests with Kinesis service emulator.
*/
public Write withAWSClientsProvider(
String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) {
return withAWSClientsProvider(
new BasicSnsProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
}
/**
* Provides configuration to retry a failed request to publish a message to SNS. Users should
* consider that retrying might compound the underlying problem which caused the initial
* failure. Users should also be aware that once retrying is exhausted the error is surfaced to
* the runner which <em>may</em> then opt to retry the current partition in entirety or abort if
* the max number of retries of the runner is completed. Retrying uses an exponential backoff
* algorithm, with minimum backoff of 5 seconds and then surfacing the error once the maximum
* number of retries or maximum configuration duration is exceeded.
*
* <p>Example use:
*
* <pre>{@code
* SnsIO.write()
* .withRetryConfiguration(SnsIO.RetryConfiguration.create(5, Duration.standardMinutes(1))
* ...
* }</pre>
*
* @param retryConfiguration the rules which govern the retry behavior
* @return the {@link Write} with retrying configured
*/
public Write withRetryConfiguration(RetryConfiguration retryConfiguration) {
checkArgument(retryConfiguration != null, "retryConfiguration is required");
return builder().setRetryConfiguration(retryConfiguration).build();
}
/** Tuple tag to store results. Mandatory field. */
public Write withResultOutputTag(TupleTag<PublishResult> results) {
return builder().setResultOutputTag(results).build();
}
@Override
public PCollectionTuple expand(PCollection<PublishRequest> input) {
checkArgument(getTopicName() != null, "withTopicName() is required");
return input.apply(
ParDo.of(new SnsWriterFn(this))
.withOutputTags(getResultOutputTag(), TupleTagList.empty()));
}
static class SnsWriterFn extends DoFn<PublishRequest, PublishResult> {
@VisibleForTesting
static final String RETRY_ATTEMPT_LOG = "Error writing to SNS. Retry attempt[%d]";
private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds(5);
private transient FluentBackoff retryBackoff; // defaults to no retries
private static final Logger LOG = LoggerFactory.getLogger(SnsWriterFn.class);
private static final Counter SNS_WRITE_FAILURES =
Metrics.counter(SnsWriterFn.class, "SNS_Write_Failures");
private final SnsIO.Write spec;
private transient AmazonSNS producer;
SnsWriterFn(SnsIO.Write spec) {
this.spec = spec;
}
@Setup
public void setup() throws Exception {
// Initialize SnsPublisher
producer = spec.getAWSClientsProvider().createSnsPublisher();
checkArgument(
topicExists(producer, spec.getTopicName()),
"Topic %s does not exist",
spec.getTopicName());
retryBackoff =
FluentBackoff.DEFAULT
.withMaxRetries(0) // default to no retrying
.withInitialBackoff(RETRY_INITIAL_BACKOFF);
if (spec.getRetryConfiguration() != null) {
retryBackoff =
retryBackoff
.withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1)
.withMaxCumulativeBackoff(spec.getRetryConfiguration().getMaxDuration());
}
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
PublishRequest request = context.element();
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackoff.backoff();
int attempt = 0;
while (true) {
attempt++;
try {
PublishResult pr = producer.publish(request);
context.output(pr);
break;
} catch (Exception ex) {
// Fail right away if there is no retry configuration
if (spec.getRetryConfiguration() == null
|| !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
SNS_WRITE_FAILURES.inc();
LOG.info("Unable to publish message {} due to {} ", request.getMessage(), ex);
throw new IOException("Error writing to SNS (no attempt made to retry)", ex);
}
if (!BackOffUtils.next(sleeper, backoff)) {
throw new IOException(
String.format(
"Error writing to SNS after %d attempt(s). No more attempts allowed",
attempt),
ex);
} else {
// Note: this used in test cases to verify behavior
LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
}
}
}
}
@Teardown
public void tearDown() {
if (producer != null) {
producer.shutdown();
producer = null;
}
}
@SuppressWarnings({"checkstyle:illegalCatch"})
private static boolean topicExists(AmazonSNS client, String topicName) {
try {
GetTopicAttributesResult topicAttributesResult = client.getTopicAttributes(topicName);
return topicAttributesResult != null
&& topicAttributesResult.getSdkHttpMetadata().getHttpStatusCode() == 200;
} catch (Exception e) {
LOG.warn("Error checking whether topic {} exists.", topicName, e);
throw e;
}
}
}
}
}