blob: bec7879d05864b30947ac3581b5e700b114159ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.index;
import static org.apache.ignite.internal.schema.definition.TableDefinitionImpl.canonicalName;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.configuration.schemas.table.HashIndexView;
import org.apache.ignite.configuration.schemas.table.IndexColumnView;
import org.apache.ignite.configuration.schemas.table.SortedIndexView;
import org.apache.ignite.configuration.schemas.table.TableIndexChange;
import org.apache.ignite.configuration.schemas.table.TableIndexView;
import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguration;
import org.apache.ignite.internal.index.event.IndexEvent;
import org.apache.ignite.internal.index.event.IndexEventParameters;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.StringUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.lang.ErrorGroups.Table;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.IndexNotFoundException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableNotFoundException;
import org.jetbrains.annotations.NotNull;
/**
* An Ignite component that is responsible for handling index-related commands like CREATE or DROP
* as well as managing indexes lifecycle.
*/
public class IndexManager extends Producer<IndexEvent, IndexEventParameters> implements IgniteComponent {
private static final IgniteLogger LOG = Loggers.forClass(IndexManager.class);
private final TableManager tableManager;
private final Consumer<ConfigurationNamedListListener<TableIndexView>> indicesConfigurationChangeSubscription;
private final Map<UUID, Index<?>> indexById = new ConcurrentHashMap<>();
private final Map<String, Index<?>> indexByName = new ConcurrentHashMap<>();
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Prevents double stopping of the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
/**
* Constructor.
*
* @param tableManager Table manager.
*/
public IndexManager(
TableManager tableManager,
Consumer<ConfigurationNamedListListener<TableIndexView>> indicesConfigurationChangeSubscription
) {
this.tableManager = Objects.requireNonNull(tableManager, "tableManager");
this.indicesConfigurationChangeSubscription = Objects.requireNonNull(indicesConfigurationChangeSubscription, "tablesConfiguration");
}
/** {@inheritDoc} */
@Override
public void start() {
LOG.debug("Index manager is about to start");
indicesConfigurationChangeSubscription.accept(new ConfigurationListener());
LOG.info("Index manager started");
}
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
LOG.debug("Index manager is about to stop");
if (!stopGuard.compareAndSet(false, true)) {
LOG.debug("Index manager already stopped");
return;
}
busyLock.block();
indexById.clear();
indexByName.clear();
LOG.info("Index manager stopped");
}
/**
* Creates index from provided configuration changer.
*
* @param schemaName A name of the schema to create index in.
* @param indexName A name of the index to create.
* @param tableName A name of the table to create index for.
* @param indexChange A consumer that suppose to change the configuration in order to provide description of an index.
* @return A future represented the result of creation.
*/
// TODO: https://issues.apache.org/jira/browse/IGNITE-17474
// Validation of the index name uniqueness is not implemented, because with given
// configuration hierarchy this is a bit tricky exercise. Given that this hierarchy
// is subject to change in the future, seems to be more rational just to omit this
// part for now
public CompletableFuture<Index<?>> createIndexAsync(
String schemaName,
String indexName,
String tableName,
Consumer<TableIndexChange> indexChange
) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
LOG.debug("Going to create index [schema={}, table={}, index={}]", schemaName, tableName, indexName);
try {
validateName(indexName);
CompletableFuture<Index<?>> future = new CompletableFuture<>();
var canonicalName = canonicalName(schemaName, tableName);
tableManager.tableAsyncInternal(canonicalName).thenAccept((table) -> {
if (table == null) {
var exception = new TableNotFoundException(canonicalName);
LOG.info("Unable to create index [schema={}, table={}, index={}]",
exception, schemaName, tableName, indexName);
future.completeExceptionally(exception);
return;
}
tableManager.alterTableAsync(table.name(), tableChange -> tableChange.changeIndices(indexListChange -> {
if (indexListChange.get(indexName) != null) {
var exception = new IndexAlreadyExistsException(indexName);
LOG.info("Unable to create index [schema={}, table={}, index={}]",
exception, schemaName, tableName, indexName);
future.completeExceptionally(exception);
return;
}
indexListChange.create(indexName, indexChange);
TableIndexView indexView = indexListChange.get(indexName);
Set<String> columnNames = Set.copyOf(tableChange.columns().namedListKeys());
validateColumns(indexView, columnNames);
})).whenComplete((index, th) -> {
if (th != null) {
LOG.info("Unable to create index [schema={}, table={}, index={}]",
th, schemaName, tableName, indexName);
future.completeExceptionally(th);
} else if (!future.isDone()) {
String canonicalIndexName = canonicalName(schemaName, indexName);
Index<?> createdIndex = indexByName.get(canonicalIndexName);
if (createdIndex != null) {
LOG.info("Index created [schema={}, table={}, index={}, indexId={}]",
schemaName, tableName, indexName, createdIndex.id());
future.complete(createdIndex);
} else {
var exception = new IgniteInternalException(
Common.UNEXPECTED_ERR, "Looks like the index was concurrently deleted");
LOG.info("Unable to create index [schema={}, table={}, index={}]",
exception, schemaName, tableName, indexName);
future.completeExceptionally(exception);
}
}
});
});
return future;
} catch (Exception ex) {
return CompletableFuture.failedFuture(ex);
} finally {
busyLock.leaveBusy();
}
}
/**
* Drops the index with a given name.
*
* @param schemaName A name of the schema the index belong to.
* @param indexName A name of the index to drop.
* @return A future representing the result of the operation.
*/
// TODO: https://issues.apache.org/jira/browse/IGNITE-17474
// For now it is impossible to locate the index neither by id nor name.
public CompletableFuture<Void> dropIndexAsync(
String schemaName,
String indexName
) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
LOG.debug("Going to drop index [schema={}, index={}]", schemaName, indexName);
try {
CompletableFuture<Void> future = new CompletableFuture<>();
String canonicalName = canonicalName(schemaName, indexName);
Index<?> index = indexByName.get(canonicalName);
if (index == null) {
return CompletableFuture.failedFuture(new IndexNotFoundException(canonicalName));
}
tableManager.tableAsyncInternal(index.tableId(), false).thenAccept((table) -> {
if (table == null) {
var exception = new IndexNotFoundException(canonicalName);
LOG.info("Unable to drop index [schema={}, index={}]",
exception, schemaName, indexName);
future.completeExceptionally(exception);
return;
}
tableManager.alterTableAsync(table.name(), tableChange -> tableChange.changeIndices(indexListChange -> {
if (indexListChange.get(indexName) == null) {
var exception = new IndexNotFoundException(canonicalName);
LOG.info("Unable to drop index [schema={}, index={}]",
exception, schemaName, indexName);
future.completeExceptionally(exception);
return;
}
indexListChange.delete(indexName);
})).whenComplete((ignored, th) -> {
if (th != null) {
LOG.info("Unable to drop index [schema={}, index={}]",
th, schemaName, indexName);
future.completeExceptionally(th);
} else if (!future.isDone()) {
LOG.info("Index dropped [schema={}, index={}]", schemaName, indexName);
future.complete(null);
}
});
});
return future;
} finally {
busyLock.leaveBusy();
}
}
private void validateName(String indexName) {
if (StringUtils.nullOrEmpty(indexName)) {
throw new IgniteInternalException(
ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR,
"Index name should be at least 1 character long"
);
}
}
private void validateColumns(TableIndexView indexView, Set<String> tableColumns) {
if (indexView instanceof SortedIndexView) {
var sortedIndexView = (SortedIndexView) indexView;
validateColumns(sortedIndexView.columns().namedListKeys(), tableColumns);
} else if (indexView instanceof HashIndexView) {
validateColumns(Arrays.asList(((HashIndexView) indexView).columnNames()), tableColumns);
} else {
throw new AssertionError("Unknown index type [type=" + (indexView != null ? indexView.getClass() : null) + ']');
}
}
private void validateColumns(Iterable<String> indexedColumns, Set<String> tableColumns) {
if (CollectionUtils.nullOrEmpty(indexedColumns)) {
throw new IgniteInternalException(
ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR,
"At least one column should be specified by index definition"
);
}
for (var columnName : indexedColumns) {
if (!tableColumns.contains(columnName)) {
throw new IgniteInternalException(
Table.COLUMN_NOT_FOUND_ERR,
"Column not found [name=" + columnName + ']'
);
}
}
}
private void onIndexDrop(ConfigurationNotificationEvent<TableIndexView> ctx) {
Index<?> index = indexById.remove(ctx.oldValue().id());
indexByName.remove(index.name(), index);
fireEvent(IndexEvent.DROP, new IndexEventParameters(ctx.storageRevision(), index.id()), null);
}
private void onIndexCreate(ConfigurationNotificationEvent<TableIndexView> ctx) {
Index<?> index = createIndex(
ctx.config(ExtendedTableConfiguration.class).id().value(),
ctx.newValue()
);
Index<?> prev = indexById.putIfAbsent(index.id(), index);
assert prev == null;
prev = indexByName.putIfAbsent(index.name(), index);
assert prev == null;
fireEvent(IndexEvent.CREATE, new IndexEventParameters(ctx.storageRevision(), index), null);
}
private Index<?> createIndex(UUID tableId, TableIndexView indexView) {
if (indexView instanceof SortedIndexView) {
return new SortedIndexImpl(
indexView.id(),
tableId,
convert((SortedIndexView) indexView)
);
} else if (indexView instanceof HashIndexView) {
return new HashIndex(
indexView.id(),
tableId,
convert((HashIndexView) indexView)
);
}
throw new AssertionError("Unknown index type [type=" + (indexView != null ? indexView.getClass() : null) + ']');
}
private IndexDescriptor convert(HashIndexView indexView) {
return new IndexDescriptor(
canonicalName("PUBLIC", indexView.name()),
Arrays.asList(indexView.columnNames())
);
}
private SortedIndexDescriptor convert(SortedIndexView indexView) {
var indexedColumns = new ArrayList<String>();
var collations = new ArrayList<ColumnCollation>();
for (var columnName : indexView.columns().namedListKeys()) {
IndexColumnView columnView = indexView.columns().get(columnName);
indexedColumns.add(columnName);
collations.add(ColumnCollation.get(columnView.asc(), false));
}
return new SortedIndexDescriptor(
canonicalName("PUBLIC", indexView.name()),
indexedColumns,
collations
);
}
private class ConfigurationListener implements ConfigurationNamedListListener<TableIndexView> {
/** {@inheritDoc} */
@Override
public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
try {
onIndexCreate(ctx);
} catch (Exception e) {
return CompletableFuture.completedFuture(e);
}
return CompletableFuture.completedFuture(null);
}
/** {@inheritDoc} */
@Override
public @NotNull CompletableFuture<?> onRename(
String oldName,
String newName,
ConfigurationNotificationEvent<TableIndexView> ctx
) {
return CompletableFuture.failedFuture(new UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-16196"));
}
/** {@inheritDoc} */
@Override
public @NotNull CompletableFuture<?> onDelete(@NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
if (!busyLock.enterBusy()) {
return CompletableFuture.completedFuture(new NodeStoppingException());
}
try {
onIndexDrop(ctx);
} finally {
busyLock.leaveBusy();
}
return CompletableFuture.completedFuture(null);
}
/** {@inheritDoc} */
@Override
public @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
return CompletableFuture.failedFuture(new IllegalStateException("Should not be called"));
}
}
}