blob: d9e00a8bb2631946946436c71b3e052f54529b21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kinesis;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is responsible for establishing the initial set of shards that existed at the given
* starting point.
*/
class StartingPointShardsFinder implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(StartingPointShardsFinder.class);
/**
* Finds all the shards at the given startingPoint. This method starts by gathering the oldest
* shards in the stream and considers them as initial shards set. Then it validates the shards by
* getting an iterator at the given starting point and trying to read some records. If shard
* passes the validation then it is added to the result shards set. If not then it is regarded as
* expired and its successors are taken into consideration. This step is repeated until all valid
* shards are found.
*
* <p>The following diagram depicts sample split and merge operations on a stream with 3 initial
* shards. Let's consider what happens when T1, T2, T3 or T4 timestamps are passed as the
* startingPoint.
*
* <ul>
* <li>T1 timestamp (or TRIM_HORIZON marker) - 0000, 0001 and 0002 shards are the oldest so they
* are gathered as initial shards set. All of them are valid at T1 timestamp so they are all
* returned from the method.
* <li>T2 timestamp - 0000, 0001 and 0002 shards form the initial shards set.
* <ul>
* <li>0000 passes the validation at T2 timestamp so it is added to the result set
* <li>0001 does not pass the validation as it is already closed at T2 timestamp so its
* successors 0003 and 0004 are considered. Both are valid at T2 timestamp so they are
* added to the resulting set.
* <li>0002 also does not pass the validation so its successors 0005 and 0006 are
* considered and both are valid.
* </ul>
* Finally the resulting set contains 0000, 0003, 0004, 0005 and 0006 shards.
* <li>T3 timestamp - the beginning is the same as in T2 case.
* <ul>
* <li>0000 is valid
* <li>0001 is already closed at T2 timestamp so its successors 0003 and 0004 are next.
* 0003 is valid but 0004 is already closed at T3 timestamp. It has one successor 0007
* which is the result of merging 0004 and 0005 shards. 0007 has two parent shards
* then stored in {@link Shard#parentShardId} and {@link Shard#adjacentParentShardId}
* fields. Only one of them should follow the relation to its successor so it is
* always the shard stored in parentShardId field. Let's assume that it was 0004 shard
* and it's the one that considers 0007 its successor. 0007 is valid at T3 timestamp
* and it's added to the result set.
* <li>0002 is closed at T3 timestamp so its successors 0005 and 0006 are next. 0005 is
* also closed because it was merged with 0004 shard. Their successor is 0007 and it
* was already considered by 0004 shard so no action here is needed. Shard 0006 is
* valid.
* </ul>
* <li>T4 timestamp (or LATEST marker) - following the same reasoning as in previous cases it
* end's up with 0000, 0003, 0008 and 0010 shards.
* </ul>
*
* <pre>
* T1 T2 T3 T4
* | | | |
* 0000-----------------------------------------------------------
*
*
* 0003-----------------------------------------------
* /
* 0001------+
* \
* 0004-----------+ 0008------------------
* \ /
* 0007------+
* / \
* 0005------+ 0009------+
* / \
* 0002-----------+ 0010------
* \ /
* 0006------------------------------+
* </pre>
*/
Set<Shard> findShardsAtStartingPoint(
SimplifiedKinesisClient kinesis, String streamName, StartingPoint startingPoint)
throws TransientKinesisException {
List<Shard> allShards = kinesis.listShards(streamName);
Set<Shard> initialShards = findInitialShardsWithoutParents(streamName, allShards);
Set<Shard> startingPointShards = new HashSet<>();
Set<Shard> expiredShards;
do {
Set<Shard> validShards = validateShards(kinesis, initialShards, streamName, startingPoint);
startingPointShards.addAll(validShards);
expiredShards = Sets.difference(initialShards, validShards);
if (!expiredShards.isEmpty()) {
LOGGER.info(
"Following shards expired for {} stream at '{}' starting point: {}",
streamName,
startingPoint,
expiredShards);
}
initialShards = findNextShards(allShards, expiredShards);
} while (!expiredShards.isEmpty());
return startingPointShards;
}
private Set<Shard> findNextShards(List<Shard> allShards, Set<Shard> expiredShards) {
Set<Shard> nextShards = new HashSet<>();
for (Shard expiredShard : expiredShards) {
boolean successorFound = false;
for (Shard shard : allShards) {
if (Objects.equals(expiredShard.getShardId(), shard.getParentShardId())) {
nextShards.add(shard);
successorFound = true;
} else if (Objects.equals(expiredShard.getShardId(), shard.getAdjacentParentShardId())) {
successorFound = true;
}
}
if (!successorFound) {
// This can potentially happen during split/merge operation. Newly created shards might be
// not listed in the allShards list and their predecessor is already considered expired.
// Retrying should solve the issue.
throw new IllegalStateException("No successors were found for shard: " + expiredShard);
}
}
return nextShards;
}
/**
* Finds the initial set of shards (the oldest ones). These shards do not have their parents in
* the shard list.
*/
private Set<Shard> findInitialShardsWithoutParents(String streamName, List<Shard> allShards) {
Set<String> shardIds = new HashSet<>();
for (Shard shard : allShards) {
shardIds.add(shard.getShardId());
}
LOGGER.info("Stream {} has following shards: {}", streamName, shardIds);
Set<Shard> shardsWithoutParents = new HashSet<>();
for (Shard shard : allShards) {
if (!shardIds.contains(shard.getParentShardId())) {
shardsWithoutParents.add(shard);
}
}
return shardsWithoutParents;
}
/**
* Validates the shards at the given startingPoint. Validity is checked by getting an iterator at
* the startingPoint and then trying to read some records. This action does not affect the records
* at all. If the shard is valid then it will get read from exactly the same point and these
* records will be read again.
*/
private Set<Shard> validateShards(
SimplifiedKinesisClient kinesis,
Iterable<Shard> rootShards,
String streamName,
StartingPoint startingPoint)
throws TransientKinesisException {
Set<Shard> validShards = new HashSet<>();
ShardIteratorType shardIteratorType =
ShardIteratorType.fromValue(startingPoint.getPositionName());
for (Shard shard : rootShards) {
String shardIterator =
kinesis.getShardIterator(
streamName,
shard.getShardId(),
shardIteratorType,
null,
startingPoint.getTimestamp());
GetKinesisRecordsResult records =
kinesis.getRecords(shardIterator, streamName, shard.getShardId());
if (records.getNextShardIterator() != null || !records.getRecords().isEmpty()) {
validShards.add(shard);
}
}
return validShards;
}
}