blob: e788d59bb441d540c23edac09f4406024711f3e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.exec.ddl;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.clusterWideEnsuredActivationTsSafeForRoReads;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.DistributionZoneExistsValidationException;
import org.apache.ignite.internal.catalog.DistributionZoneNotFoundValidationException;
import org.apache.ignite.internal.catalog.IndexExistsValidationException;
import org.apache.ignite.internal.catalog.IndexNotFoundValidationException;
import org.apache.ignite.internal.catalog.TableExistsValidationException;
import org.apache.ignite.internal.catalog.TableNotFoundValidationException;
import org.apache.ignite.internal.catalog.commands.AbstractCreateIndexCommand;
import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnCommand;
import org.apache.ignite.internal.catalog.commands.AlterTableAlterColumnCommand;
import org.apache.ignite.internal.catalog.commands.AlterTableDropColumnCommand;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommand;
import org.apache.ignite.internal.catalog.commands.AlterZoneSetDefaultCommand;
import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite.internal.catalog.commands.DropIndexCommand;
import org.apache.ignite.internal.catalog.commands.DropTableCommand;
import org.apache.ignite.internal.catalog.commands.DropZoneCommand;
import org.apache.ignite.internal.catalog.commands.RenameZoneCommand;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.sql.SqlException;
/** DDL commands handler. */
public class DdlCommandHandler implements LifecycleAware {
private final CatalogManager catalogManager;
private final ClockService clockService;
private final LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier;
private final InFlightFutures inFlightFutures = new InFlightFutures();
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/**
* Constructor.
*/
public DdlCommandHandler(
CatalogManager catalogManager,
ClockService clockService,
LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier
) {
this.catalogManager = catalogManager;
this.clockService = clockService;
this.partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier;
}
/** Handles ddl commands. */
public CompletableFuture<Boolean> handle(CatalogCommand cmd) {
if (cmd instanceof CreateTableCommand) {
return handleCreateTable((CreateTableCommand) cmd);
} else if (cmd instanceof DropTableCommand) {
return handleDropTable((DropTableCommand) cmd);
} else if (cmd instanceof AlterTableAddColumnCommand) {
return handleAlterAddColumn((AlterTableAddColumnCommand) cmd);
} else if (cmd instanceof AlterTableDropColumnCommand) {
return handleAlterDropColumn((AlterTableDropColumnCommand) cmd);
} else if (cmd instanceof AlterTableAlterColumnCommand) {
return handleAlterColumn((AlterTableAlterColumnCommand) cmd);
} else if (cmd instanceof AbstractCreateIndexCommand) {
return handleCreateIndex((AbstractCreateIndexCommand) cmd);
} else if (cmd instanceof DropIndexCommand) {
return handleDropIndex((DropIndexCommand) cmd);
} else if (cmd instanceof CreateZoneCommand) {
return handleCreateZone((CreateZoneCommand) cmd);
} else if (cmd instanceof RenameZoneCommand) {
return handleRenameZone((RenameZoneCommand) cmd);
} else if (cmd instanceof AlterZoneCommand) {
return handleAlterZone((AlterZoneCommand) cmd);
} else if (cmd instanceof AlterZoneSetDefaultCommand) {
return handleAlterZoneSetDefault((AlterZoneSetDefaultCommand) cmd);
} else if (cmd instanceof DropZoneCommand) {
return handleDropZone((DropZoneCommand) cmd);
} else {
return failedFuture(new SqlException(STMT_VALIDATION_ERR, "Unsupported DDL operation ["
+ "cmdName=" + (cmd == null ? null : cmd.getClass().getSimpleName()) + "; "
+ "cmd=\"" + cmd + "\"]"));
}
}
/** Handles create distribution zone command. */
private CompletableFuture<Boolean> handleCreateZone(CreateZoneCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifNotExists(), DistributionZoneExistsValidationException.class));
}
/** Handles rename zone command. */
private CompletableFuture<Boolean> handleRenameZone(RenameZoneCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifExists(), DistributionZoneNotFoundValidationException.class));
}
/** Handles alter zone command. */
private CompletableFuture<Boolean> handleAlterZone(AlterZoneCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifExists(), DistributionZoneNotFoundValidationException.class));
}
/** Handles alter zone set default command. */
private CompletableFuture<Boolean> handleAlterZoneSetDefault(AlterZoneSetDefaultCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifExists(), DistributionZoneNotFoundValidationException.class));
}
/** Handles drop distribution zone command. */
private CompletableFuture<Boolean> handleDropZone(DropZoneCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifExists(), DistributionZoneNotFoundValidationException.class));
}
/** Handles create table command. */
private CompletableFuture<Boolean> handleCreateTable(CreateTableCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifTableExists(), TableExistsValidationException.class));
}
/** Handles drop table command. */
private CompletableFuture<Boolean> handleDropTable(DropTableCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifTableExists(), TableNotFoundValidationException.class));
}
/** Handles add column command. */
private CompletableFuture<Boolean> handleAlterAddColumn(AlterTableAddColumnCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifTableExists(), TableNotFoundValidationException.class));
}
/** Handles drop column command. */
private CompletableFuture<Boolean> handleAlterDropColumn(AlterTableDropColumnCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifTableExists(), TableNotFoundValidationException.class));
}
/** Handles drop column command. */
private CompletableFuture<Boolean> handleAlterColumn(AlterTableAlterColumnCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifTableExists(), TableNotFoundValidationException.class));
}
private static BiFunction<Object, Throwable, Boolean> handleModificationResult(boolean ignoreExpectedError, Class<?> expErrCls) {
return (val, err) -> {
if (err == null) {
return Boolean.TRUE;
} else if (ignoreExpectedError) {
Throwable err0 = err instanceof CompletionException ? err.getCause() : err;
if (expErrCls.isAssignableFrom(err0.getClass())) {
return Boolean.FALSE;
}
}
throw (err instanceof RuntimeException) ? (RuntimeException) err : new CompletionException(err);
};
}
/** Handles create index command. */
private CompletableFuture<Boolean> handleCreateIndex(AbstractCreateIndexCommand cmd) {
return catalogManager.execute(cmd)
.thenCompose(catalogVersion -> inBusyLock(busyLock, () -> waitTillIndexBecomesAvailableOrRemoved(cmd, catalogVersion)))
.handle(handleModificationResult(cmd.ifNotExists(), IndexExistsValidationException.class));
}
private CompletionStage<Void> waitTillIndexBecomesAvailableOrRemoved(AbstractCreateIndexCommand cmd, Integer creationCatalogVersion) {
CompletableFuture<Void> future = inFlightFutures.registerFuture(new CompletableFuture<>());
Catalog catalog = catalogManager.catalog(creationCatalogVersion);
assert catalog != null : creationCatalogVersion;
CatalogSchemaDescriptor schema = catalog.schema(cmd.schemaName());
assert schema != null : "Did not find schema " + cmd.schemaName() + " in version " + creationCatalogVersion;
CatalogIndexDescriptor index = schema.aliveIndex(cmd.indexName());
assert index != null
: "Did not find index " + cmd.indexName() + " in schema " + cmd.schemaName() + " in version " + creationCatalogVersion;
EventListener<CatalogEventParameters> availabilityListener = EventListener.fromConsumer(event -> {
if (((MakeIndexAvailableEventParameters) event).indexId() == index.id()) {
completeFutureWhenEventVersionActivates(future, event);
}
});
catalogManager.listen(CatalogEvent.INDEX_AVAILABLE, availabilityListener);
EventListener<CatalogEventParameters> removalListener = EventListener.fromConsumer(event -> {
if (((RemoveIndexEventParameters) event).indexId() == index.id()) {
future.complete(null);
}
});
catalogManager.listen(CatalogEvent.INDEX_REMOVED, removalListener);
// We added listeners, but the index could switch to a state of interest before we added them, so check
// explicitly.
int latestVersion = catalogManager.latestCatalogVersion();
for (int version = creationCatalogVersion + 1; version <= latestVersion; version++) {
CatalogIndexDescriptor indexAtVersion = catalogManager.index(index.id(), version);
if (indexAtVersion == null) {
// It's already removed.
future.complete(null);
break;
} else if (indexAtVersion.status().isAvailableOrLater()) {
// It was already made available.
completeFutureWhenEventVersionActivates(future, version);
break;
}
}
return future.whenComplete((res, ex) -> {
catalogManager.removeListener(CatalogEvent.INDEX_AVAILABLE, availabilityListener);
catalogManager.removeListener(CatalogEvent.INDEX_REMOVED, removalListener);
});
}
private void completeFutureWhenEventVersionActivates(CompletableFuture<Void> future, CatalogEventParameters event) {
completeFutureWhenEventVersionActivates(future, event.catalogVersion());
}
private void completeFutureWhenEventVersionActivates(CompletableFuture<Void> future, int catalogVersion) {
Catalog catalog = catalogManager.catalog(catalogVersion);
assert catalog != null;
HybridTimestamp tsToWait = clusterWideEnsuredActivationTsSafeForRoReads(
catalog,
partitionIdleSafeTimePropagationPeriodMsSupplier,
clockService.maxClockSkewMillis()
);
clockService.waitFor(tsToWait)
.whenComplete((res, ex) -> future.complete(null));
}
/** Handles drop index command. */
private CompletableFuture<Boolean> handleDropIndex(DropIndexCommand cmd) {
return catalogManager.execute(cmd)
.handle(handleModificationResult(cmd.ifExists(), IndexNotFoundValidationException.class));
}
@Override
public void start() {
// No-op.
}
@Override
public void stop() throws Exception {
busyLock.block();
inFlightFutures.failInFlightFutures(new NodeStoppingException());
}
}