blob: 47be8726c5516d57010b25436ac95c2478d0736d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED;
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
import static org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexToTable;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongFunction;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
/**
* An Ignite component that is responsible for handling index-related commands like CREATE or DROP
* as well as managing indexes' lifecycle.
*
* <p>To avoid errors when using indexes while applying replication log during node recovery, the registration of indexes was moved to the
* start of the tables.</p>
*/
public class IndexManager implements IgniteComponent {
private static final IgniteLogger LOG = Loggers.forClass(IndexManager.class);
/** Schema manager. */
private final SchemaManager schemaManager;
/** Table manager. */
private final TableManager tableManager;
/** Catalog service. */
private final CatalogService catalogService;
/** Separate executor for IO operations like storage initialization. */
private final ExecutorService ioExecutor;
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Prevents double stopping of the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
/** Versioned value to prevent races when registering/unregistering indexes when processing metastore or catalog events. */
private final IncrementalVersionedValue<Void> handleMetastoreEventVv;
/** Low watermark. */
private final LowWatermark lowWatermark;
/** A queue for deferred index destruction events. */
private final LongPriorityQueue<DestroyIndexEvent> destructionEventsQueue =
new LongPriorityQueue<>(DestroyIndexEvent::catalogVersion);
/**
* Constructor.
*
* @param schemaManager Schema manager.
* @param tableManager Table manager.
* @param catalogService Catalog service.
* @param ioExecutor Separate executor for IO operations like storage initialization.
*/
public IndexManager(
SchemaManager schemaManager,
TableManager tableManager,
CatalogService catalogService,
ExecutorService ioExecutor,
Consumer<LongFunction<CompletableFuture<?>>> registry,
LowWatermark lowWatermark
) {
this.schemaManager = schemaManager;
this.tableManager = tableManager;
this.catalogService = catalogService;
this.ioExecutor = ioExecutor;
this.lowWatermark = lowWatermark;
handleMetastoreEventVv = new IncrementalVersionedValue<>(registry);
}
@Override
public CompletableFuture<Void> startAsync() {
LOG.debug("Index manager is about to start");
recoverDestructionQueue();
catalogService.listen(INDEX_CREATE, (CreateIndexEventParameters parameters) -> onIndexCreate(parameters));
catalogService.listen(INDEX_REMOVED, fromConsumer(this::onIndexRemoved));
lowWatermark.listen(LOW_WATERMARK_CHANGED, parameters -> onLwmChanged((ChangeLowWatermarkEventParameters) parameters));
LOG.info("Index manager started");
return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> stopAsync() {
LOG.debug("Index manager is about to stop");
if (!stopGuard.compareAndSet(false, true)) {
LOG.debug("Index manager already stopped");
return nullCompletedFuture();
}
busyLock.block();
LOG.info("Index manager stopped");
return nullCompletedFuture();
}
/**
* Returns a multi-version table storage with created index storages by passed parameters.
*
* <p>Example: when we start building an index, we will need {@link IndexStorage} (as well as storage {@link MvPartitionStorage}) to
* build it and we can get them in {@link CatalogEvent#INDEX_CREATE} using this method.</p>
*
* <p>During recovery, it is important to wait until the local node becomes a primary replica so that all index building commands are
* applied from the replication log.</p>
*
* @param causalityToken Causality token.
* @param tableId Table ID.
* @return Future with multi-version table storage, completes with {@code null} if the table does not exist according to the passed
* parameters.
*/
CompletableFuture<@Nullable MvTableStorage> getMvTableStorage(long causalityToken, int tableId) {
return tableManager.tableAsync(causalityToken, tableId).thenApply(table -> table == null ? null : table.internalTable().storage());
}
private CompletableFuture<Boolean> onIndexCreate(CreateIndexEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
CatalogIndexDescriptor index = parameters.indexDescriptor();
int indexId = index.id();
int tableId = index.tableId();
long causalityToken = parameters.causalityToken();
int catalogVersion = parameters.catalogVersion();
CatalogTableDescriptor table = catalogService.table(tableId, catalogVersion);
assert table != null : "tableId=" + tableId + ", indexId=" + indexId;
if (LOG.isInfoEnabled()) {
LOG.info(
"Creating local index: name={}, id={}, tableId={}, token={}",
index.name(), indexId, tableId, causalityToken
);
}
return startIndexAsync(table, index, causalityToken).thenApply(unused -> false);
});
}
private void onIndexRemoved(RemoveIndexEventParameters parameters) {
inBusyLock(busyLock, () -> {
int indexId = parameters.indexId();
int catalogVersion = parameters.catalogVersion();
int previousCatalogVersion = catalogVersion - 1;
// Retrieve descriptor during synchronous call, before the previous catalog version could be concurrently compacted.
CatalogIndexDescriptor indexDescriptor = catalogService.index(indexId, previousCatalogVersion);
assert indexDescriptor != null : "indexId=" + indexId + ", catalogVersion=" + previousCatalogVersion;
int tableId = indexDescriptor.tableId();
if (catalogService.table(tableId, catalogVersion) == null) {
// Nothing to do. Index will be destroyed along with the table.
return;
}
destructionEventsQueue.enqueue(new DestroyIndexEvent(catalogVersion, indexId, tableId));
});
}
private CompletableFuture<Boolean> onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
int newEarliestCatalogVersion = catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
List<DestroyIndexEvent> events = destructionEventsQueue.drainUpTo(newEarliestCatalogVersion);
return runAsync(
() -> events.forEach(event -> destroyIndex(event.indexId(), event.tableId())),
ioExecutor
).thenApply(unused -> false);
});
}
private void destroyIndex(int indexId, int tableId) {
TableViewInternal table = tableManager.cachedTable(tableId);
if (table != null) {
// In case of DROP TABLE the table will be removed with all its indexes.
table.unregisterIndex(indexId);
}
}
private CompletableFuture<?> startIndexAsync(
CatalogTableDescriptor table,
CatalogIndexDescriptor index,
long causalityToken
) {
int tableId = index.tableId();
CompletableFuture<PartitionSet> tablePartitionFuture = tableManager.localPartitionSetAsync(causalityToken, tableId);
CompletableFuture<SchemaRegistry> schemaRegistryFuture = schemaManager.schemaRegistry(causalityToken, tableId);
return handleMetastoreEventVv.update(
causalityToken,
updater(mvTableStorageById -> tablePartitionFuture.thenCombineAsync(schemaRegistryFuture,
(partitionSet, schemaRegistry) -> inBusyLock(busyLock, () -> {
registerIndex(table, index, partitionSet, schemaRegistry);
return null;
}), ioExecutor))
);
}
private void registerIndex(
CatalogTableDescriptor table,
CatalogIndexDescriptor index,
PartitionSet partitionSet,
SchemaRegistry schemaRegistry
) {
TableViewInternal tableView = getTableViewStrict(table.id());
registerIndexToTable(tableView, table, index, partitionSet, schemaRegistry);
}
private TableViewInternal getTableViewStrict(int tableId) {
TableViewInternal table = tableManager.cachedTable(tableId);
assert table != null : tableId;
return table;
}
private static <T> BiFunction<T, Throwable, CompletableFuture<T>> updater(Function<T, CompletableFuture<T>> updateFunction) {
return (t, throwable) -> {
if (throwable != null) {
return failedFuture(throwable);
}
return updateFunction.apply(t);
};
}
/** Recover deferred destroy events. */
private void recoverDestructionQueue() {
// LWM starts updating only after the node is restored.
HybridTimestamp lwm = lowWatermark.getLowWatermark();
int earliestCatalogVersion = catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
int latestCatalogVersion = catalogService.latestCatalogVersion();
for (int catalogVersion = latestCatalogVersion - 1; catalogVersion >= earliestCatalogVersion; catalogVersion--) {
int nextVersion = catalogVersion + 1;
catalogService.indexes(catalogVersion).stream()
.filter(idx -> catalogService.index(idx.id(), nextVersion) == null)
.forEach(idx -> destructionEventsQueue.enqueue(new DestroyIndexEvent(nextVersion, idx.id(), idx.tableId())));
}
}
/** Internal event. */
private static class DestroyIndexEvent {
final int catalogVersion;
final int indexId;
final int tableId;
DestroyIndexEvent(int catalogVersion, int indexId, int tableId) {
this.catalogVersion = catalogVersion;
this.indexId = indexId;
this.tableId = tableId;
}
public int catalogVersion() {
return catalogVersion;
}
public int indexId() {
return indexId;
}
public int tableId() {
return tableId;
}
}
}