blob: 71a12fc96ff8cb46e1b662b4e2429f1f945e8eed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kinesis;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Internal shard iterators pool. It maintains the thread pool for reading Kinesis shards in
* separate threads. Read records are stored in a blocking queue of limited capacity.
*/
class ShardReadersPool {
private static final Logger LOG = LoggerFactory.getLogger(ShardReadersPool.class);
private static final int DEFAULT_CAPACITY_PER_SHARD = 10_000;
private static final int ATTEMPTS_TO_SHUTDOWN = 3;
/**
* Executor service for running the threads that read records from shards handled by this pool.
* Each thread runs the {@link ShardReadersPool#readLoop(ShardRecordsIterator)} method and handles
* exactly one shard.
*/
private final ExecutorService executorService;
/**
* A Bounded buffer for read records. Records are added to this buffer within {@link
* ShardReadersPool#readLoop(ShardRecordsIterator)} method and removed in {@link
* ShardReadersPool#nextRecord()}.
*/
private BlockingQueue<KinesisRecord> recordsQueue;
/**
* A reference to an immutable mapping of {@link ShardRecordsIterator} instances to shard ids.
* This map is replaced with a new one when resharding operation on any handled shard occurs.
*/
private final AtomicReference<ImmutableMap<String, ShardRecordsIterator>> shardIteratorsMap;
/** A map for keeping the current number of records stored in a buffer per shard. */
private final ConcurrentMap<String, AtomicInteger> numberOfRecordsInAQueueByShard;
private final SimplifiedKinesisClient kinesis;
private final WatermarkPolicyFactory watermarkPolicyFactory;
private final KinesisReaderCheckpoint initialCheckpoint;
private final int queueCapacityPerShard;
private final AtomicBoolean poolOpened = new AtomicBoolean(true);
ShardReadersPool(
SimplifiedKinesisClient kinesis,
KinesisReaderCheckpoint initialCheckpoint,
WatermarkPolicyFactory watermarkPolicyFactory) {
this(kinesis, initialCheckpoint, watermarkPolicyFactory, DEFAULT_CAPACITY_PER_SHARD);
}
ShardReadersPool(
SimplifiedKinesisClient kinesis,
KinesisReaderCheckpoint initialCheckpoint,
WatermarkPolicyFactory watermarkPolicyFactory,
int queueCapacityPerShard) {
this.kinesis = kinesis;
this.initialCheckpoint = initialCheckpoint;
this.watermarkPolicyFactory = watermarkPolicyFactory;
this.queueCapacityPerShard = queueCapacityPerShard;
this.executorService = Executors.newCachedThreadPool();
this.numberOfRecordsInAQueueByShard = new ConcurrentHashMap<>();
this.shardIteratorsMap = new AtomicReference<>();
}
void start() throws TransientKinesisException {
ImmutableMap.Builder<String, ShardRecordsIterator> shardsMap = ImmutableMap.builder();
for (ShardCheckpoint checkpoint : initialCheckpoint) {
shardsMap.put(checkpoint.getShardId(), createShardIterator(kinesis, checkpoint));
}
shardIteratorsMap.set(shardsMap.build());
if (!shardIteratorsMap.get().isEmpty()) {
recordsQueue =
new ArrayBlockingQueue<>(queueCapacityPerShard * shardIteratorsMap.get().size());
startReadingShards(shardIteratorsMap.get().values());
} else {
// There are no shards to handle when restoring from an empty checkpoint. Empty checkpoints
// are generated when the last shard handled by this pool was closed
recordsQueue = new ArrayBlockingQueue<>(1);
}
}
// Note: readLoop() will log any Throwable raised so opt to ignore the future result
@SuppressWarnings("FutureReturnValueIgnored")
void startReadingShards(Iterable<ShardRecordsIterator> shardRecordsIterators) {
for (final ShardRecordsIterator recordsIterator : shardRecordsIterators) {
numberOfRecordsInAQueueByShard.put(recordsIterator.getShardId(), new AtomicInteger());
executorService.submit(() -> readLoop(recordsIterator));
}
}
private void readLoop(ShardRecordsIterator shardRecordsIterator) {
while (poolOpened.get()) {
try {
List<KinesisRecord> kinesisRecords;
try {
kinesisRecords = shardRecordsIterator.readNextBatch();
} catch (KinesisShardClosedException e) {
LOG.info(
"Shard iterator for {} shard is closed, finishing the read loop",
shardRecordsIterator.getShardId(),
e);
// Wait until all records from already closed shard are taken from the buffer and only
// then start reading successive shards. This guarantees that checkpoints will contain
// either parent or child shard and never both. Such approach allows for more
// straightforward checkpoint restoration than in a case when new shards are read
// immediately.
waitUntilAllShardRecordsRead(shardRecordsIterator);
readFromSuccessiveShards(shardRecordsIterator);
break;
}
for (KinesisRecord kinesisRecord : kinesisRecords) {
recordsQueue.put(kinesisRecord);
numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
}
} catch (TransientKinesisException e) {
LOG.warn("Transient exception occurred.", e);
} catch (InterruptedException e) {
LOG.warn("Thread was interrupted, finishing the read loop", e);
break;
} catch (Throwable e) {
LOG.error("Unexpected exception occurred", e);
}
}
LOG.info("Kinesis Shard read loop has finished");
}
CustomOptional<KinesisRecord> nextRecord() {
try {
KinesisRecord record = recordsQueue.poll(1, TimeUnit.SECONDS);
if (record == null) {
return CustomOptional.absent();
}
shardIteratorsMap.get().get(record.getShardId()).ackRecord(record);
// numberOfRecordsInAQueueByShard contains the counter for a given shard until the shard is
// closed and then it's counter reaches 0. Thus the access here is safe
numberOfRecordsInAQueueByShard.get(record.getShardId()).decrementAndGet();
return CustomOptional.of(record);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for KinesisRecord from the buffer");
return CustomOptional.absent();
}
}
void stop() {
LOG.info("Closing shard iterators pool");
poolOpened.set(false);
executorService.shutdown();
awaitTermination();
if (!executorService.isTerminated()) {
LOG.warn(
"Executor service was not completely terminated after {} attempts, trying to forcibly stop it.",
ATTEMPTS_TO_SHUTDOWN);
executorService.shutdownNow();
awaitTermination();
}
}
private void awaitTermination() {
int attemptsLeft = ATTEMPTS_TO_SHUTDOWN;
boolean isTerminated = executorService.isTerminated();
while (!isTerminated && attemptsLeft-- > 0) {
try {
isTerminated = executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for the executor service to shutdown");
throw new RuntimeException(e);
}
if (!isTerminated && attemptsLeft > 0) {
LOG.warn(
"Executor service is taking long time to shutdown, will retry. {} attempts left",
attemptsLeft);
}
}
}
Instant getWatermark() {
return shardIteratorsMap.get().values().stream()
.map(ShardRecordsIterator::getShardWatermark)
.min(Comparator.naturalOrder())
.orElse(BoundedWindow.TIMESTAMP_MAX_VALUE);
}
KinesisReaderCheckpoint getCheckpointMark() {
ImmutableMap<String, ShardRecordsIterator> currentShardIterators = shardIteratorsMap.get();
return new KinesisReaderCheckpoint(
currentShardIterators.values().stream()
.map(
shardRecordsIterator -> {
checkArgument(
shardRecordsIterator != null, "shardRecordsIterator can not be null");
return shardRecordsIterator.getCheckpoint();
})
.collect(Collectors.toList()));
}
ShardRecordsIterator createShardIterator(
SimplifiedKinesisClient kinesis, ShardCheckpoint checkpoint)
throws TransientKinesisException {
return new ShardRecordsIterator(checkpoint, kinesis, watermarkPolicyFactory);
}
/**
* Waits until all records read from given shardRecordsIterator are taken from {@link
* #recordsQueue} and acked. Uses {@link #numberOfRecordsInAQueueByShard} map to track the amount
* of remaining events.
*/
private void waitUntilAllShardRecordsRead(ShardRecordsIterator shardRecordsIterator)
throws InterruptedException {
// Given shard is already closed so no more records will be read from it. Thus the counter for
// that shard will be strictly decreasing to 0.
AtomicInteger numberOfShardRecordsInAQueue =
numberOfRecordsInAQueueByShard.get(shardRecordsIterator.getShardId());
while (!(numberOfShardRecordsInAQueue.get() == 0)) {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
}
}
/**
* Tries to find successors of a given shard and start reading them. Each closed shard can have 0,
* 1 or 2 successors
*
* <ul>
* <li>0 successors - when shard was merged with another shard and this one is considered
* adjacent by merge operation
* <li>1 successor - when shard was merged with another shard and this one is considered a
* parent by merge operation
* <li>2 successors - when shard was split into two shards
* </ul>
*
* <p>Once shard successors are established, the transition to reading new shards can begin.
* During this operation, the immutable {@link ShardReadersPool#shardIteratorsMap} is replaced
* with a new one holding references to {@link ShardRecordsIterator} instances for open shards
* only. Potentially there might be more shard iterators closing at the same time so {@link
* ShardReadersPool#shardIteratorsMap} is updated in a loop using CAS pattern to keep all the
* updates. Then, the counter for already closed shard is removed from {@link
* ShardReadersPool#numberOfRecordsInAQueueByShard} map.
*
* <p>Finally when update is finished, new threads are spawned for reading the successive shards.
* The thread that handled reading from already closed shard can finally complete.
*/
private void readFromSuccessiveShards(final ShardRecordsIterator closedShardIterator)
throws TransientKinesisException {
List<ShardRecordsIterator> successiveShardRecordIterators =
closedShardIterator.findSuccessiveShardRecordIterators();
ImmutableMap<String, ShardRecordsIterator> current;
ImmutableMap<String, ShardRecordsIterator> updated;
do {
current = shardIteratorsMap.get();
updated =
createMapWithSuccessiveShards(
current, closedShardIterator, successiveShardRecordIterators);
} while (!shardIteratorsMap.compareAndSet(current, updated));
numberOfRecordsInAQueueByShard.remove(closedShardIterator.getShardId());
startReadingShards(successiveShardRecordIterators);
}
private ImmutableMap<String, ShardRecordsIterator> createMapWithSuccessiveShards(
ImmutableMap<String, ShardRecordsIterator> current,
ShardRecordsIterator closedShardIterator,
List<ShardRecordsIterator> successiveShardRecordIterators)
throws TransientKinesisException {
ImmutableMap.Builder<String, ShardRecordsIterator> shardsMap = ImmutableMap.builder();
Iterable<ShardRecordsIterator> allShards =
Iterables.concat(current.values(), successiveShardRecordIterators);
for (ShardRecordsIterator iterator : allShards) {
if (!closedShardIterator.getShardId().equals(iterator.getShardId())) {
shardsMap.put(iterator.getShardId(), iterator);
}
}
return shardsMap.build();
}
}