blob: 934ce5511ab17f060b461aa86d42e705dd28a27c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
import static org.apache.ignite.internal.index.IndexManagementUtils.AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC;
import static org.apache.ignite.internal.index.IndexManagementUtils.isLocalNode;
import static org.apache.ignite.internal.index.IndexManagementUtils.isPrimaryReplica;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
import org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
/**
* Component is responsible for starting and stopping the building of indexes on primary replicas.
*
* <p>Component handles the following events (indexes are started and stopped by {@link CatalogIndexDescriptor#tableId()} ==
* {@link TablePartitionId#tableId()}): </p>
* <ul>
* <li>{@link CatalogEvent#INDEX_BUILDING} - starts building indexes for the corresponding local primary replicas.</li>
* <li>{@link CatalogEvent#INDEX_REMOVED} - stops building indexes for the corresponding local primary replicas.</li>
* <li>{@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED} - for a new local primary replica, starts the building of all corresponding
* indexes, for an expired primary replica, stops the building of all corresponding indexes.</li>
* </ul>
*
* <p>A few words about restoring the building of indexes on node recovery: the building of indexes will be resumed by
* {@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}, which will fire due to a change in the {@link ReplicaMeta#getLeaseholderId()} on
* node restart but after {@link ReplicaMeta#getExpirationTime()}.</p>
*/
class IndexBuildController implements ManuallyCloseable {
private final IndexBuilder indexBuilder;
private final IndexManager indexManager;
private final CatalogService catalogService;
private final ClusterService clusterService;
private final PlacementDriver placementDriver;
private final ClockService clockService;
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
private final AtomicBoolean closeGuard = new AtomicBoolean();
private final Set<TablePartitionId> primaryReplicaIds = ConcurrentHashMap.newKeySet();
/** Constructor. */
IndexBuildController(
IndexBuilder indexBuilder,
IndexManager indexManager,
CatalogService catalogService,
ClusterService clusterService,
PlacementDriver placementDriver,
ClockService clockService
) {
this.indexBuilder = indexBuilder;
this.indexManager = indexManager;
this.catalogService = catalogService;
this.clusterService = clusterService;
this.placementDriver = placementDriver;
this.clockService = clockService;
addListeners();
}
@Override
public void close() {
if (!closeGuard.compareAndSet(false, true)) {
return;
}
busyLock.block();
indexBuilder.close();
}
private void addListeners() {
catalogService.listen(CatalogEvent.INDEX_BUILDING, (StartBuildingIndexEventParameters parameters) -> {
return onIndexBuilding(parameters).thenApply(unused -> false);
});
catalogService.listen(CatalogEvent.INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> {
return onIndexRemoved(parameters).thenApply(unused -> false);
});
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters -> {
return onPrimaryReplicaElected(parameters).thenApply(unused -> false);
});
}
private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
CatalogIndexDescriptor indexDescriptor = catalogService.index(parameters.indexId(), parameters.catalogVersion());
var startBuildIndexFutures = new ArrayList<CompletableFuture<?>>();
for (TablePartitionId primaryReplicaId : primaryReplicaIds) {
if (primaryReplicaId.tableId() == indexDescriptor.tableId()) {
CompletableFuture<?> startBuildIndexFuture = getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId)
.thenCompose(mvTableStorage -> awaitPrimaryReplica(primaryReplicaId, clockService.now())
.thenAccept(replicaMeta -> tryScheduleBuildIndex(
primaryReplicaId,
indexDescriptor,
mvTableStorage,
replicaMeta
))
);
startBuildIndexFutures.add(startBuildIndexFuture);
}
}
return allOf(startBuildIndexFutures.toArray(CompletableFuture[]::new));
});
}
private CompletableFuture<?> onIndexRemoved(RemoveIndexEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
indexBuilder.stopBuildingIndexes(parameters.indexId());
return nullCompletedFuture();
});
}
private CompletableFuture<?> onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId();
if (isLocalNode(clusterService, parameters.leaseholderId())) {
primaryReplicaIds.add(primaryReplicaId);
// It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the
// metastore thread.
int catalogVersion = catalogService.latestCatalogVersion();
return getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId)
.thenCompose(mvTableStorage -> awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
.thenAccept(replicaMeta -> tryScheduleBuildIndexesForNewPrimaryReplica(
catalogVersion,
primaryReplicaId,
mvTableStorage,
replicaMeta
))
);
} else {
stopBuildingIndexesIfPrimaryExpired(primaryReplicaId);
return nullCompletedFuture();
}
});
}
private void tryScheduleBuildIndexesForNewPrimaryReplica(
int catalogVersion,
TablePartitionId primaryReplicaId,
MvTableStorage mvTableStorage,
ReplicaMeta replicaMeta
) {
inBusyLock(busyLock, () -> {
if (isLeaseExpire(replicaMeta)) {
stopBuildingIndexesIfPrimaryExpired(primaryReplicaId);
return;
}
for (CatalogIndexDescriptor indexDescriptor : catalogService.indexes(catalogVersion, primaryReplicaId.tableId())) {
if (indexDescriptor.status() == BUILDING) {
scheduleBuildIndex(primaryReplicaId, indexDescriptor, mvTableStorage, enlistmentConsistencyToken(replicaMeta));
} else if (indexDescriptor.status() == AVAILABLE) {
scheduleBuildIndexAfterDisasterRecovery(
primaryReplicaId,
indexDescriptor,
mvTableStorage,
enlistmentConsistencyToken(replicaMeta)
);
}
}
});
}
private void tryScheduleBuildIndex(
TablePartitionId primaryReplicaId,
CatalogIndexDescriptor indexDescriptor,
MvTableStorage mvTableStorage,
ReplicaMeta replicaMeta
) {
inBusyLock(busyLock, () -> {
if (isLeaseExpire(replicaMeta)) {
stopBuildingIndexesIfPrimaryExpired(primaryReplicaId);
return;
}
scheduleBuildIndex(primaryReplicaId, indexDescriptor, mvTableStorage, enlistmentConsistencyToken(replicaMeta));
});
}
/**
* Stops building indexes for a replica that has expired, if it has not been done before.
*
* <p>We need to stop building indexes at the event of a change of primary replica or after executing asynchronous code, we understand
* that the {@link ReplicaMeta#getExpirationTime() expiration time} has come.</p>
*
* @param replicaId Replica ID.
*/
private void stopBuildingIndexesIfPrimaryExpired(TablePartitionId replicaId) {
if (primaryReplicaIds.remove(replicaId)) {
// Primary replica is no longer current, we need to stop building indexes for it.
indexBuilder.stopBuildingIndexes(replicaId.tableId(), replicaId.partitionId());
}
}
private CompletableFuture<MvTableStorage> getMvTableStorageFuture(long causalityToken, TablePartitionId replicaId) {
return indexManager.getMvTableStorage(causalityToken, replicaId.tableId())
.thenApply(mvTableStorage -> requireMvTableStorageNonNull(mvTableStorage, replicaId.tableId()));
}
private static MvTableStorage requireMvTableStorageNonNull(@Nullable MvTableStorage mvTableStorage, int tableId) {
if (mvTableStorage == null) {
throw new IgniteInternalException(
INTERNAL_ERR,
"Table storage for the specified table cannot be null [tableId = {}]",
tableId
);
}
return mvTableStorage;
}
private CompletableFuture<ReplicaMeta> awaitPrimaryReplica(TablePartitionId replicaId, HybridTimestamp timestamp) {
return placementDriver
.awaitPrimaryReplica(replicaId, timestamp, AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC, SECONDS)
.handle((replicaMeta, throwable) -> {
if (throwable != null) {
Throwable unwrapThrowable = ExceptionUtils.unwrapCause(throwable);
if (unwrapThrowable instanceof PrimaryReplicaAwaitTimeoutException) {
return awaitPrimaryReplica(replicaId, timestamp);
} else {
return CompletableFuture.<ReplicaMeta>failedFuture(unwrapThrowable);
}
}
return completedFuture(replicaMeta);
}).thenCompose(Function.identity());
}
/** Shortcut to schedule index building. */
private void scheduleBuildIndex(
TablePartitionId replicaId,
CatalogIndexDescriptor indexDescriptor,
MvTableStorage mvTableStorage,
long enlistmentConsistencyToken
) {
MvPartitionStorage mvPartition = mvPartitionStorage(mvTableStorage, replicaId);
// TODO: IGNITE-22202 Deal with this situation
if (mvPartition == null) {
return;
}
IndexStorage indexStorage = indexStorage(mvTableStorage, replicaId, indexDescriptor);
indexBuilder.scheduleBuildIndex(
replicaId.tableId(),
replicaId.partitionId(),
indexDescriptor.id(),
indexStorage,
mvPartition,
localNode(),
enlistmentConsistencyToken,
indexDescriptor.txWaitCatalogVersion()
);
}
/** Shortcut to schedule {@link CatalogIndexStatus#AVAILABLE available} index building after disaster recovery. */
private void scheduleBuildIndexAfterDisasterRecovery(
TablePartitionId replicaId,
CatalogIndexDescriptor indexDescriptor,
MvTableStorage mvTableStorage,
long enlistmentConsistencyToken
) {
MvPartitionStorage mvPartition = mvPartitionStorage(mvTableStorage, replicaId);
// TODO: IGNITE-22202 Deal with this situation
if (mvPartition == null) {
return;
}
IndexStorage indexStorage = indexStorage(mvTableStorage, replicaId, indexDescriptor);
indexBuilder.scheduleBuildIndexAfterDisasterRecovery(
replicaId.tableId(),
replicaId.partitionId(),
indexDescriptor.id(),
indexStorage,
mvPartition,
localNode(),
enlistmentConsistencyToken,
indexDescriptor.txWaitCatalogVersion()
);
}
private ClusterNode localNode() {
return IndexManagementUtils.localNode(clusterService);
}
private boolean isLeaseExpire(ReplicaMeta replicaMeta) {
return !isPrimaryReplica(replicaMeta, localNode(), clockService.now());
}
private static long enlistmentConsistencyToken(ReplicaMeta replicaMeta) {
return replicaMeta.getStartTime().longValue();
}
private static @Nullable MvPartitionStorage mvPartitionStorage(MvTableStorage mvTableStorage, TablePartitionId replicaId) {
MvPartitionStorage mvPartition = mvTableStorage.getMvPartition(replicaId.partitionId());
// TODO: IGNITE-22202 Deal with this situation
// assert mvPartition != null : replicaId;
return mvPartition;
}
private static IndexStorage indexStorage(
MvTableStorage mvTableStorage,
TablePartitionId replicaId,
CatalogIndexDescriptor indexDescriptor
) {
IndexStorage indexStorage = mvTableStorage.getIndex(replicaId.partitionId(), indexDescriptor.id());
assert indexStorage != null : "replicaId=" + replicaId + ", indexId=" + indexDescriptor.id();
return indexStorage;
}
}