blob: e0698933e1046a71cc1d2c69b3f171da49a56b4f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.schema;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.causality.VersionedValue;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.DefaultValueProvider;
import org.apache.ignite.internal.schema.DefaultValueProvider.Type;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Holds actual schema and mutates it on schema change, requested by Ignite.
*/
public class SqlSchemaManagerImpl implements SqlSchemaManager {
private static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
private final VersionedValue<Map<String, IgniteSchema>> schemasVv;
private final VersionedValue<Map<UUID, IgniteTable>> tablesVv;
private final TableManager tableManager;
private final SchemaManager schemaManager;
private final VersionedValue<SchemaPlus> calciteSchemaVv;
private final Set<SchemaUpdateListener> listeners = new CopyOnWriteArraySet<>();
/** Busy lock for stop synchronisation. */
private final IgniteSpinBusyLock busyLock;
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public SqlSchemaManagerImpl(
TableManager tableManager,
SchemaManager schemaManager,
Consumer<Function<Long, CompletableFuture<?>>> registry,
IgniteSpinBusyLock busyLock
) {
this.tableManager = tableManager;
this.schemaManager = schemaManager;
schemasVv = new VersionedValue<>(registry, HashMap::new);
tablesVv = new VersionedValue<>(registry, HashMap::new);
this.busyLock = busyLock;
calciteSchemaVv = new VersionedValue<>(null, () -> {
SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
newCalciteSchema.add(DEFAULT_SCHEMA_NAME, new IgniteSchema(DEFAULT_SCHEMA_NAME));
return newCalciteSchema;
});
schemasVv.whenComplete((token, stringIgniteSchemaMap, throwable) -> {
if (!busyLock.enterBusy()) {
calciteSchemaVv.completeExceptionally(token, new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
return;
}
try {
if (throwable != null) {
calciteSchemaVv.completeExceptionally(
token,
new IgniteInternalException("Couldn't evaluate sql schemas for causality token: " + token, throwable)
);
return;
}
SchemaPlus newCalciteSchema = rebuild(stringIgniteSchemaMap);
listeners.forEach(SchemaUpdateListener::onSchemaUpdated);
calciteSchemaVv.complete(token, newCalciteSchema);
} finally {
busyLock.leaveBusy();
}
});
}
/** {@inheritDoc} */
@Override
public SchemaPlus schema(@Nullable String schema) {
SchemaPlus schemaPlus = calciteSchemaVv.latest();
return schema != null ? schemaPlus.getSubSchema(schema) : schemaPlus.getSubSchema(DEFAULT_SCHEMA_NAME);
}
/** {@inheritDoc} */
@Override
@NotNull
public IgniteTable tableById(UUID id, int ver) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
}
try {
IgniteTable table = tablesVv.latest().get(id);
// there is a chance that someone tries to resolve table before
// the distributed event of that table creation has been processed
// by TableManager, so we need to get in sync with the TableManager
if (table == null || ver > table.version()) {
table = awaitLatestTableSchema(id);
}
if (table == null) {
throw new IgniteInternalException(
IgniteStringFormatter.format("Table not found [tableId={}]", id));
}
if (table.version() < ver) {
throw new IgniteInternalException(
IgniteStringFormatter.format("Table version not found [tableId={}, requiredVer={}, latestKnownVer={}]",
id, ver, table.version()));
}
return table;
} finally {
busyLock.leaveBusy();
}
}
public void registerListener(SchemaUpdateListener listener) {
listeners.add(listener);
}
private @Nullable IgniteTable awaitLatestTableSchema(UUID tableId) {
try {
TableImpl table = tableManager.table(tableId);
if (table == null) {
return null;
}
table.schemaView().waitLatestSchema();
return convert(table);
} catch (NodeStoppingException e) {
throw new IgniteInternalException(NODE_STOPPING_ERR, e);
}
}
/**
* OnSqlTypeCreated.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public synchronized CompletableFuture<?> onTableCreated(
String schemaName,
TableImpl table,
long causalityToken
) {
if (!busyLock.enterBusy()) {
return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
}
try {
schemasVv.update(causalityToken, (schemas, e) -> inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(e);
}
Map<String, IgniteSchema> res = new HashMap<>(schemas);
IgniteSchema schema = res.computeIfAbsent(schemaName, IgniteSchema::new);
CompletableFuture<IgniteTableImpl> igniteTableFuture = convert(causalityToken, table);
return tablesVv.update(causalityToken, (tables, ex) -> inBusyLock(busyLock, () -> {
if (ex != null) {
return failedFuture(ex);
}
Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
return igniteTableFuture.thenApply(igniteTable -> inBusyLock(busyLock, () -> {
resTbls.put(igniteTable.id(), igniteTable);
return resTbls;
}));
})).thenCombine(igniteTableFuture, (v, igniteTable) -> inBusyLock(busyLock, () -> {
schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
return null;
})).thenCompose(v -> inBusyLock(busyLock, () -> completedFuture(res)));
}));
return calciteSchemaVv.get(causalityToken);
} finally {
busyLock.leaveBusy();
}
}
/**
* OnSqlTypeUpdated.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public CompletableFuture<?> onTableUpdated(
String schemaName,
TableImpl table,
long causalityToken
) {
return onTableCreated(schemaName, table, causalityToken);
}
/**
* OnSqlTypeDropped.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public synchronized CompletableFuture<?> onTableDropped(
String schemaName,
String tableName,
long causalityToken
) {
if (!busyLock.enterBusy()) {
return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
}
try {
schemasVv.update(causalityToken, (schemas, e) -> inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(e);
}
Map<String, IgniteSchema> res = new HashMap<>(schemas);
IgniteSchema schema = res.computeIfAbsent(schemaName, IgniteSchema::new);
String calciteTableName = removeSchema(schemaName, tableName);
InternalIgniteTable table = (InternalIgniteTable) schema.getTable(calciteTableName);
if (table != null) {
schema.removeTable(calciteTableName);
return tablesVv.update(causalityToken, (tables, ex) -> inBusyLock(busyLock, () -> {
if (ex != null) {
return failedFuture(ex);
}
Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
resTbls.remove(table.id());
return completedFuture(resTbls);
})).thenCompose(tables -> inBusyLock(busyLock, () -> completedFuture(res)));
}
return completedFuture(res);
}));
return calciteSchemaVv.get(causalityToken);
} finally {
busyLock.leaveBusy();
}
}
/**
* Rebuilds Calcite schemas.
*
* @param schemas Ignite schemas.
*/
private SchemaPlus rebuild(Map<String, IgniteSchema> schemas) {
SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
schemas.forEach(newCalciteSchema::add);
return newCalciteSchema;
}
private CompletableFuture<IgniteTableImpl> convert(long causalityToken, TableImpl table) {
return schemaManager.schemaRegistry(causalityToken, table.tableId())
.thenApply(schemaRegistry -> inBusyLock(busyLock, () -> convert(table, schemaRegistry)));
}
private IgniteTableImpl convert(TableImpl table) {
SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(table.tableId());
return convert(table, schemaRegistry);
}
private IgniteTableImpl convert(TableImpl table, SchemaRegistry schemaRegistry) {
SchemaDescriptor descriptor = schemaRegistry.schema();
List<ColumnDescriptor> colDescriptors = descriptor.columnNames().stream()
.map(descriptor::column)
.sorted(Comparator.comparingInt(Column::columnOrder))
.map(col -> new ColumnDescriptorImpl(
col.name(),
descriptor.isKeyColumn(col.schemaIndex()),
col.nullable(),
col.columnOrder(),
col.schemaIndex(),
col.type(),
convertDefaultValueProvider(col.defaultValueProvider()),
col::defaultValue
))
.collect(Collectors.toList());
return new IgniteTableImpl(
new TableDescriptorImpl(colDescriptors),
table.internalTable(),
schemaRegistry
);
}
private DefaultValueStrategy convertDefaultValueProvider(DefaultValueProvider defaultValueProvider) {
return defaultValueProvider.type() == Type.CONSTANT
? DefaultValueStrategy.DEFAULT_CONSTANT
: DefaultValueStrategy.DEFAULT_COMPUTED;
}
private static String removeSchema(String schemaName, String canonicalName) {
return canonicalName.substring(schemaName.length() + 1);
}
}