blob: 9903f0f31a6e24746a6baa5507f857356ac3fd97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kinesis;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.model.Datapoint;
import com.amazonaws.services.cloudwatch.model.Dimension;
import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest;
import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.joda.time.Instant;
import org.joda.time.Minutes;
/** Wraps {@link AmazonKinesis} class providing much simpler interface and proper error handling. */
class SimplifiedKinesisClient {
private static final String KINESIS_NAMESPACE = "AWS/Kinesis";
private static final String INCOMING_RECORDS_METRIC = "IncomingBytes";
private static final int PERIOD_GRANULARITY_IN_SECONDS = 60;
private static final String SUM_STATISTIC = "Sum";
private static final String STREAM_NAME_DIMENSION = "StreamName";
private final AmazonKinesis kinesis;
private final AmazonCloudWatch cloudWatch;
private final Integer limit;
public SimplifiedKinesisClient(
AmazonKinesis kinesis, AmazonCloudWatch cloudWatch, Integer limit) {
this.kinesis = checkNotNull(kinesis, "kinesis");
this.cloudWatch = checkNotNull(cloudWatch, "cloudWatch");
this.limit = limit;
}
public static SimplifiedKinesisClient from(AWSClientsProvider provider, Integer limit) {
return new SimplifiedKinesisClient(
provider.getKinesisClient(), provider.getCloudWatchClient(), limit);
}
public String getShardIterator(
final String streamName,
final String shardId,
final ShardIteratorType shardIteratorType,
final String startingSequenceNumber,
final Instant timestamp)
throws TransientKinesisException {
final Date date = timestamp != null ? timestamp.toDate() : null;
return wrapExceptions(
() ->
kinesis
.getShardIterator(
new GetShardIteratorRequest()
.withStreamName(streamName)
.withShardId(shardId)
.withShardIteratorType(shardIteratorType)
.withStartingSequenceNumber(startingSequenceNumber)
.withTimestamp(date))
.getShardIterator());
}
public List<Shard> listShards(final String streamName) throws TransientKinesisException {
return wrapExceptions(
() -> {
List<Shard> shards = Lists.newArrayList();
String lastShardId = null;
StreamDescription description;
do {
description = kinesis.describeStream(streamName, lastShardId).getStreamDescription();
shards.addAll(description.getShards());
lastShardId = shards.get(shards.size() - 1).getShardId();
} while (description.getHasMoreShards());
return shards;
});
}
/**
* Gets records from Kinesis and deaggregates them if needed.
*
* @return list of deaggregated records
* @throws TransientKinesisException - in case of recoverable situation
*/
public GetKinesisRecordsResult getRecords(String shardIterator, String streamName, String shardId)
throws TransientKinesisException {
return getRecords(shardIterator, streamName, shardId, limit);
}
/**
* Gets records from Kinesis and deaggregates them if needed.
*
* @return list of deaggregated records
* @throws TransientKinesisException - in case of recoverable situation
*/
public GetKinesisRecordsResult getRecords(
final String shardIterator,
final String streamName,
final String shardId,
final Integer limit)
throws TransientKinesisException {
return wrapExceptions(
() -> {
GetRecordsResult response =
kinesis.getRecords(
new GetRecordsRequest().withShardIterator(shardIterator).withLimit(limit));
return new GetKinesisRecordsResult(
UserRecord.deaggregate(response.getRecords()),
response.getNextShardIterator(),
response.getMillisBehindLatest(),
streamName,
shardId);
});
}
/**
* Gets total size in bytes of all events that remain in Kinesis stream after specified instant.
*
* @return total size in bytes of all Kinesis events after specified instant
*/
public long getBacklogBytes(String streamName, Instant countSince)
throws TransientKinesisException {
return getBacklogBytes(streamName, countSince, new Instant());
}
/**
* Gets total size in bytes of all events that remain in Kinesis stream between specified
* instants.
*
* @return total size in bytes of all Kinesis events after specified instant
*/
public long getBacklogBytes(
final String streamName, final Instant countSince, final Instant countTo)
throws TransientKinesisException {
return wrapExceptions(
() -> {
Minutes period = Minutes.minutesBetween(countSince, countTo);
if (period.isLessThan(Minutes.ONE)) {
return 0L;
}
GetMetricStatisticsRequest request =
createMetricStatisticsRequest(streamName, countSince, countTo, period);
long totalSizeInBytes = 0;
GetMetricStatisticsResult result = cloudWatch.getMetricStatistics(request);
for (Datapoint point : result.getDatapoints()) {
totalSizeInBytes += point.getSum().longValue();
}
return totalSizeInBytes;
});
}
GetMetricStatisticsRequest createMetricStatisticsRequest(
String streamName, Instant countSince, Instant countTo, Minutes period) {
return new GetMetricStatisticsRequest()
.withNamespace(KINESIS_NAMESPACE)
.withMetricName(INCOMING_RECORDS_METRIC)
.withPeriod(period.getMinutes() * PERIOD_GRANULARITY_IN_SECONDS)
.withStartTime(countSince.toDate())
.withEndTime(countTo.toDate())
.withStatistics(Collections.singletonList(SUM_STATISTIC))
.withDimensions(
Collections.singletonList(
new Dimension().withName(STREAM_NAME_DIMENSION).withValue(streamName)));
}
/**
* Wraps Amazon specific exceptions into more friendly format.
*
* @throws TransientKinesisException - in case of recoverable situation, i.e. the request rate is
* too high, Kinesis remote service failed, network issue, etc.
* @throws ExpiredIteratorException - if iterator needs to be refreshed
* @throws RuntimeException - in all other cases
*/
private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
try {
return callable.call();
} catch (ExpiredIteratorException e) {
throw e;
} catch (LimitExceededException | ProvisionedThroughputExceededException e) {
throw new TransientKinesisException(
"Too many requests to Kinesis. Wait some time and retry.", e);
} catch (AmazonServiceException e) {
if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
throw new TransientKinesisException("Kinesis backend failed. Wait some time and retry.", e);
}
throw new RuntimeException("Kinesis client side failure", e);
} catch (AmazonClientException e) {
if (e.isRetryable()) {
throw new TransientKinesisException("Retryable client failure", e);
}
throw new RuntimeException("Not retryable client failure", e);
} catch (Exception e) {
throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
}
}
}