blob: 380a269101a24e1920db6e927cb97138e816b666 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table.distributed;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.schemas.table.ColumnView;
import org.apache.ignite.configuration.schemas.table.TableChange;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.affinity.AffinityManager;
import org.apache.ignite.internal.affinity.event.AffinityEvent;
import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.client.Conditions;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.Operations;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaModificationException;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.schema.PrimaryIndex;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.manager.IgniteTables;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Table manager.
*/
public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(TableManager.class);
/** Internal prefix for the metasorage. */
private static final String INTERNAL_PREFIX = "internal.tables.";
/** Public prefix for metastorage. */
private static final String PUBLIC_PREFIX = "dst-cfg.table.tables.";
/** Meta storage service. */
private final MetaStorageManager metaStorageMgr;
/** Configuration manager. */
private final ConfigurationManager configurationMgr;
/** Raft manmager. */
private final Loza raftMgr;
/** Schema manager. */
private final SchemaManager schemaMgr;
/** Affinity manager. */
private final AffinityManager affMgr;
/** Tables. */
private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
/**
* Creates a new table manager.
*
* @param configurationMgr Configuration manager.
* @param metaStorageMgr Meta storage manager.
* @param schemaMgr Schema manager.
* @param affMgr Affinity manager.
* @param raftMgr Raft manager.
* @param vaultManager Vault manager.
*/
public TableManager(
ConfigurationManager configurationMgr,
MetaStorageManager metaStorageMgr,
SchemaManager schemaMgr,
AffinityManager affMgr,
Loza raftMgr,
VaultManager vaultManager
) {
this.configurationMgr = configurationMgr;
this.metaStorageMgr = metaStorageMgr;
this.affMgr = affMgr;
this.raftMgr = raftMgr;
this.schemaMgr = schemaMgr;
//TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
return onConfigurationChanged(ctx.storageRevision(), ctx.oldValue(), ctx.newValue());
});
}
/**
* Creates local structures for a table.
*
* @param name Table name.
* @param tblId Table id.
* @param assignment Affinity assignment.
* @param schemaReg Schema registry for the table.
*/
private void createTableLocally(
String name,
UUID tblId,
List<List<ClusterNode>> assignment,
SchemaRegistry schemaReg
) {
int partitions = assignment.size();
HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
for (int p = 0; p < partitions; p++) {
partitionMap.put(p, raftMgr.prepareRaftGroup(
raftGroupName(tblId, p),
assignment.get(p),
new PartitionListener()
));
}
InternalTableImpl internalTable = new InternalTableImpl(name, tblId, partitionMap, partitions);
var table = new TableImpl(internalTable, schemaReg, this, null);
tables.put(name, table);
onEvent(TableEvent.CREATE, new TableEventParameters(table), null);
}
/**
* Drops local structures for a table.
*
* @param name Table name.
* @param tblId Table id.
* @param assignment Affinity assignment.
*/
private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> assignment) {
int partitions = assignment.size();
for (int p = 0; p < partitions; p++)
raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
TableImpl table = tables.get(name);
assert table != null : "There is no table with the name specified [name=" + name + ']';
onEvent(TableEvent.DROP, new TableEventParameters(table), null);
}
/**
* Compounds a RAFT group unique name.
*
* @param tableId Table identifier.
* @param partition Number of table partitions.
* @return A RAFT group name.
*/
@NotNull private String raftGroupName(UUID tableId, int partition) {
return tableId + "_part_" + partition;
}
/**
* Table configuration changed callback.
*
* @param rev Storage revision.
* @param oldCfg Old configuration.
* @param newCfg New configuration.
* @return Operation future.
*/
@NotNull private CompletableFuture<?> onConfigurationChanged(
long rev,
@Nullable NamedListView<TableView> oldCfg,
@Nullable NamedListView<TableView> newCfg
) {
Set<String> tablesToStart = (newCfg == null || newCfg.namedListKeys() == null) ?
Collections.emptySet() :
newCfg.namedListKeys().stream().filter(t -> !oldCfg.namedListKeys().contains(t)).collect(Collectors.toSet());
Set<String> tablesToStop = (oldCfg == null || oldCfg.namedListKeys() == null) ?
Collections.emptySet() :
oldCfg.namedListKeys().stream().filter(t -> !newCfg.namedListKeys().contains(t)).collect(Collectors.toSet());
List<CompletableFuture<Boolean>> futs = new ArrayList<>();
final Set<String> schemaChanged =
(oldCfg != null && oldCfg.namedListKeys() != null && newCfg != null && newCfg.namedListKeys() != null) ?
oldCfg.namedListKeys().stream()
.filter(tblName -> newCfg.namedListKeys().contains(tblName)) // Filter changed tables.
.filter(tblName -> {
final TableView newTbl = newCfg.get(tblName);
final TableView oldTbl = oldCfg.get(tblName);
assert newTbl.columns().namedListKeys() != null && oldTbl.columns().namedListKeys() != null;
if (!newTbl.columns().namedListKeys().equals(oldTbl.columns().namedListKeys()))
return true;
return newTbl.columns().namedListKeys().stream().anyMatch(k -> {
final ColumnView newCol = newTbl.columns().get(k);
final ColumnView oldCol = oldTbl.columns().get(k);
assert oldCol != null;
if (!Objects.equals(newCol.type(), oldCol.type()))
throw new SchemaModificationException("Columns type change is not supported.");
if (!Objects.equals(newCol.nullable(), oldCol.nullable()))
throw new SchemaModificationException("Column nullability change is not supported");
if (!Objects.equals(newCol.name(), oldCol.name()) &&
oldTbl.indices().namedListKeys().stream()
.map(n -> oldTbl.indices().get(n))
.filter(idx -> PrimaryIndex.PRIMARY_KEY_INDEX_NAME.equals(idx.name()))
.anyMatch(idx -> idx.columns().namedListKeys().stream()
.anyMatch(c -> idx.columns().get(c).name().equals(oldCol.name()))
))
throw new SchemaModificationException("Key column rename is not supported");
return !Objects.equals(newCol.name(), oldCol.name()) ||
!Objects.equals(newCol.defaultValue(), oldCol.defaultValue());
});
}).collect(Collectors.toSet()) :
Collections.emptySet();
if (!tablesToStart.isEmpty())
futs.addAll(startTables(tablesToStart, rev, newCfg));
if (!schemaChanged.isEmpty())
futs.addAll(changeSchema(schemaChanged, oldCfg, newCfg));
if (!tablesToStop.isEmpty())
futs.addAll(stopTables(tablesToStop));
return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
}
/**
* Start tables routine.
*
* @param tbls Tables to start.
* @param rev Metastore revision.
* @param cfgs Table configurations.
* @return Table creation futures.
*/
private List<CompletableFuture<Boolean>> startTables(Set<String> tbls, long rev, NamedListView<TableView> cfgs) {
boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(configurationMgr);
List<CompletableFuture<Boolean>> futs = new ArrayList<>();
for (String tblName : tbls) {
TableView tableView = cfgs.get(tblName);
UUID tblId = new UUID(rev, 0L);
if (hasMetastorageLocally) {
var key = new ByteArray(INTERNAL_PREFIX + tblId);
futs.add(metaStorageMgr.invoke(
Conditions.notExists(key),
Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
Operations.noop())
.thenCompose(res -> schemaMgr.initSchemaForTable(tblId, tableView.name()))
.thenCompose(res -> affMgr.calculateAssignments(tblId, tableView.name())));
}
final CompletableFuture<AffinityEventParameters> affinityReadyFut = new CompletableFuture<>();
final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>();
CompletableFuture.allOf(affinityReadyFut, schemaReadyFut)
.exceptionally(e -> {
LOG.error("Failed to create a new table [name=" + tblName + ", id=" + tblId + ']', e);
onEvent(TableEvent.CREATE, new TableEventParameters(tblId, tblName), e);
return null;
})
.thenRun(() -> createTableLocally(
tblName,
tblId,
affinityReadyFut.join().assignment(),
schemaReadyFut.join().schemaRegistry()
));
affMgr.listen(AffinityEvent.CALCULATED, new EventListener<>() {
@Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
if (!tblId.equals(parameters.tableId()))
return false;
if (e == null)
affinityReadyFut.complete(parameters);
else
affinityReadyFut.completeExceptionally(e);
return true;
}
@Override public void remove(@NotNull Throwable e) {
affinityReadyFut.completeExceptionally(e);
}
});
schemaMgr.listen(SchemaEvent.INITIALIZED, new EventListener<>() {
@Override public boolean notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable e) {
if (!tblId.equals(parameters.tableId()) && parameters.schemaRegistry().lastSchemaVersion() >= 1)
return false;
if (e == null)
schemaReadyFut.complete(parameters);
else
schemaReadyFut.completeExceptionally(e);
return true;
}
@Override public void remove(@NotNull Throwable e) {
schemaReadyFut.completeExceptionally(e);
}
});
}
return futs;
}
/**
* Drop tables routine.
*
* @param tbls Tables to drop.
* @return Table drop futures.
*/
private List<CompletableFuture<Boolean>> stopTables(Set<String> tbls) {
boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(configurationMgr);
List<CompletableFuture<Boolean>> futs = new ArrayList<>();
for (String tblName : tbls) {
TableImpl t = tables.get(tblName);
UUID tblId = t.tableId();
if (hasMetastorageLocally) {
var key = new ByteArray(INTERNAL_PREFIX + tblId);
futs.add(affMgr.removeAssignment(tblId)
.thenCompose(res -> schemaMgr.unregisterSchemas(tblId))
.thenCompose(res ->
metaStorageMgr.invoke(Conditions.exists(key),
Operations.remove(key),
Operations.noop())));
}
affMgr.listen(AffinityEvent.REMOVED, new EventListener<>() {
@Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
if (!tblId.equals(parameters.tableId()))
return false;
if (e == null)
dropTableLocally(tblName, tblId, parameters.assignment());
else
onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e);
return true;
}
@Override public void remove(@NotNull Throwable e) {
onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e);
}
});
}
return futs;
}
/**
* Change tables schemas.
*
* @param tbls Tables.
* @param oldCfg Old configuration.
* @param newCfg New configuration.
* @return Schema change futures.
*/
private List<CompletableFuture<Boolean>> changeSchema(
Set<String> tbls,
@NotNull NamedListView<TableView> oldCfg,
@NotNull NamedListView<TableView> newCfg
) {
boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(configurationMgr);
List<CompletableFuture<Boolean>> futs = new ArrayList<>();
for (String tblName : tbls) {
TableImpl tbl = tables.get(tblName);
UUID tblId = tbl.tableId();
final int ver = tbl.schemaView().lastSchemaVersion() + 1;
if (hasMetastorageLocally)
futs.add(schemaMgr.updateSchemaForTable(tblId, oldCfg.get(tblName), newCfg.get(tblName)));
final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>();
CompletableFuture.allOf(schemaReadyFut)
.exceptionally(e -> {
LOG.error("Failed to upgrade schema for a table [name=" + tblName + ", id=" + tblId + ']', e);
onEvent(TableEvent.ALTER, new TableEventParameters(tblId, tblName), e);
return null;
})
.thenRun(() ->
onEvent(TableEvent.ALTER, new TableEventParameters(tblId, tblName), null)
);
schemaMgr.listen(SchemaEvent.CHANGED, new EventListener<>() {
@Override public boolean notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable e) {
if (!tblId.equals(parameters.tableId()) && parameters.schemaRegistry().lastSchemaVersion() < ver)
return false;
if (e == null)
schemaReadyFut.complete(parameters);
else
schemaReadyFut.completeExceptionally(e);
return true;
}
@Override public void remove(@NotNull Throwable e) {
schemaReadyFut.completeExceptionally(e);
}
});
}
return futs;
}
/** {@inheritDoc} */
@Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
return createTable(name, tableInitChange, true);
}
/** {@inheritDoc} */
@Override public Table getOrCreateTable(String name, Consumer<TableChange> tableInitChange) {
return createTable(name, tableInitChange, false);
}
/**
* Creates a new table with the specified name or returns an existing table with the same name.
*
* @param name Table name.
* @param tableInitChange Table configuration.
* @param exceptionWhenExist If the value is {@code true}, an exception will be thrown when the table already exists,
* {@code false} means the existing table will be returned.
* @return A table instance.
*/
public Table createTable(String name, Consumer<TableChange> tableInitChange, boolean exceptionWhenExist) {
CompletableFuture<Table> tblFut = new CompletableFuture<>();
EventListener<TableEventParameters> clo = new EventListener<>() {
@Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
String tableName = parameters.tableName();
if (!name.equals(tableName))
return false;
if (e == null)
tblFut.complete(parameters.table());
else
tblFut.completeExceptionally(e);
return true;
}
@Override public void remove(@NotNull Throwable e) {
tblFut.completeExceptionally(e);
}
};
listen(TableEvent.CREATE, clo);
Table tbl = table(name, true);
if (tbl != null) {
if (exceptionWhenExist) {
removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(
LoggerMessageHelper.format("Table already exists [name={}]", name)));
}
else if (tblFut.complete(tbl))
removeListener(TableEvent.CREATE, clo);
}
else {
try {
configurationMgr
.configurationRegistry()
.getConfiguration(TablesConfiguration.KEY)
.tables()
.change(change -> change.create(name, tableInitChange))
.get();
}
catch (InterruptedException | ExecutionException e) {
LOG.error("Table wasn't created [name=" + name + ']', e);
removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(e));
}
}
return tblFut.join();
}
/** {@inheritDoc} */
@Override public void alterTable(String name, Consumer<TableChange> tableChange) {
CompletableFuture<Void> tblFut = new CompletableFuture<>();
listen(TableEvent.ALTER, new EventListener<>() {
@Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
String tableName = parameters.tableName();
if (!name.equals(tableName))
return false;
if (e == null)
tblFut.complete(null);
else
tblFut.completeExceptionally(e);
return true;
}
@Override public void remove(@NotNull Throwable e) {
tblFut.completeExceptionally(e);
}
});
try {
configurationMgr.configurationRegistry()
.getConfiguration(TablesConfiguration.KEY).tables().change(change ->
change.createOrUpdate(name, tableChange)).get();
}
catch (InterruptedException | ExecutionException e) {
LOG.error("Table wasn't created [name=" + name + ']', e);
tblFut.completeExceptionally(e);
}
tblFut.join();
}
/** {@inheritDoc} */
@Override public void dropTable(String name) {
CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
EventListener<TableEventParameters> clo = new EventListener<>() {
@Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
String tableName = parameters.tableName();
if (!name.equals(tableName))
return false;
if (e == null) {
Table droppedTable = tables.remove(tableName);
assert droppedTable != null;
dropTblFut.complete(null);
}
else
dropTblFut.completeExceptionally(e);
return true;
}
@Override public void remove(@NotNull Throwable e) {
dropTblFut.completeExceptionally(e);
}
};
listen(TableEvent.DROP, clo);
if (!isTableConfigured(name)) {
if (dropTblFut.complete(null))
removeListener(TableEvent.DROP, clo, null);
}
else {
try {
configurationMgr
.configurationRegistry()
.getConfiguration(TablesConfiguration.KEY)
.tables()
.change(change -> change.delete(name))
.get();
}
catch (InterruptedException | ExecutionException e) {
LOG.error("Table wasn't dropped [name=" + name + ']', e);
removeListener(TableEvent.DROP, clo, new IgniteInternalCheckedException(e));
}
}
dropTblFut.join();
}
/** {@inheritDoc} */
@Override public List<Table> tables() {
ArrayList<Table> tables = new ArrayList<>();
for (String tblName : tableNamesConfigured()) {
Table tbl = table(tblName, false);
if (tbl != null)
tables.add(tbl);
}
return tables;
}
/**
* Collects a set of table names from the distributed configuration storage.
*
* @return A set of table names.
*/
private HashSet<String> tableNamesConfigured() {
IgniteBiTuple<ByteArray, ByteArray> range = toRange(new ByteArray(PUBLIC_PREFIX));
HashSet<String> tableNames = new HashSet<>();
try (Cursor<Entry> cursor = metaStorageMgr.range(range.get1(), range.get2())) {
while (cursor.hasNext()) {
Entry entry = cursor.next();
String keyTail = entry.key().toString().substring(PUBLIC_PREFIX.length());
int idx = -1;
//noinspection StatementWithEmptyBody
while ((idx = keyTail.indexOf('.', idx + 1)) > 0 && keyTail.charAt(idx - 1) == '\\')
;
String tablName = keyTail.substring(0, idx);
tableNames.add(ConfigurationUtil.unescape(tablName));
}
}
catch (Exception e) {
LOG.error("Can't get table names.", e);
}
return tableNames;
}
/** {@inheritDoc} */
@Override public Table table(String name) {
return table(name, true);
}
/**
* Gets a table if it exists or {@code null} if it was not created or was removed before.
*
* @param name Table name.
* @param checkConfiguration True when the method checks a configuration before tries to get a table,
* false otherwise.
* @return A table or {@code null} if table does not exist.
*/
private Table table(String name, boolean checkConfiguration) {
if (checkConfiguration && !isTableConfigured(name))
return null;
Table tbl = tables.get(name);
if (tbl != null)
return tbl;
CompletableFuture<Table> getTblFut = new CompletableFuture<>();
EventListener<TableEventParameters> clo = new EventListener<>() {
@Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
String tableName = parameters.tableName();
if (!name.equals(tableName))
return false;
if (e == null)
getTblFut.complete(parameters.table());
else
getTblFut.completeExceptionally(e);
return true;
}
@Override public void remove(@NotNull Throwable e) {
getTblFut.completeExceptionally(e);
}
};
listen(TableEvent.CREATE, clo);
tbl = tables.get(name);
if (tbl != null && getTblFut.complete(tbl) ||
!isTableConfigured(name) && getTblFut.complete(null))
removeListener(TableEvent.CREATE, clo, null);
return getTblFut.join();
}
/**
* Checks that the table is configured.
*
* @param name Table name.
* @return True if table configured, false otherwise.
*/
private boolean isTableConfigured(String name) {
return metaStorageMgr.invoke(Conditions.exists(
new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(name) + ".name")),
Operations.noop(),
Operations.noop()
).join();
}
/**
* Transforms a prefix bytes to range.
* This method should be replaced to direct call of range by prefix
* in Meta storage manager when it will be implemented.
* TODO: IGNITE-14799
*
* @param prefixKey Prefix bytes.
* @return Tuple with left and right borders for range.
*/
private IgniteBiTuple<ByteArray, ByteArray> toRange(ByteArray prefixKey) {
var bytes = Arrays.copyOf(prefixKey.bytes(), prefixKey.bytes().length);
if (bytes[bytes.length - 1] != Byte.MAX_VALUE)
bytes[bytes.length - 1]++;
else
bytes = Arrays.copyOf(bytes, bytes.length + 1);
return new IgniteBiTuple<>(prefixKey, new ByteArray(bytes));
}
}