| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.catalog; |
| |
| import static java.util.concurrent.CompletableFuture.allOf; |
| import static java.util.concurrent.CompletableFuture.completedFuture; |
| import static java.util.concurrent.CompletableFuture.failedFuture; |
| import static java.util.stream.Collectors.joining; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_REPLICA_COUNT; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.clusterWideEnsuredActivationTsSafeForRoReads; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt; |
| import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor.CatalogIndexDescriptorType.HASH; |
| import static org.apache.ignite.internal.type.NativeTypes.BOOLEAN; |
| import static org.apache.ignite.internal.type.NativeTypes.INT32; |
| import static org.apache.ignite.internal.type.NativeTypes.STRING; |
| import static org.apache.ignite.internal.type.NativeTypes.stringOf; |
| import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; |
| import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.NavigableMap; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.Flow.Publisher; |
| import java.util.function.LongSupplier; |
| import org.apache.ignite.internal.catalog.commands.AlterZoneSetDefaultCommand; |
| import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand; |
| import org.apache.ignite.internal.catalog.commands.CreateZoneCommand; |
| import org.apache.ignite.internal.catalog.commands.StorageProfileParams; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogSortedIndexDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; |
| import org.apache.ignite.internal.catalog.events.CatalogEvent; |
| import org.apache.ignite.internal.catalog.events.CatalogEventParameters; |
| import org.apache.ignite.internal.catalog.storage.Fireable; |
| import org.apache.ignite.internal.catalog.storage.SnapshotEntry; |
| import org.apache.ignite.internal.catalog.storage.UpdateEntry; |
| import org.apache.ignite.internal.catalog.storage.UpdateLog; |
| import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler; |
| import org.apache.ignite.internal.catalog.storage.UpdateLogEvent; |
| import org.apache.ignite.internal.catalog.storage.VersionedUpdate; |
| import org.apache.ignite.internal.event.AbstractEventProducer; |
| import org.apache.ignite.internal.hlc.ClockService; |
| import org.apache.ignite.internal.hlc.HybridTimestamp; |
| import org.apache.ignite.internal.lang.IgniteInternalException; |
| import org.apache.ignite.internal.lang.NodeStoppingException; |
| import org.apache.ignite.internal.logger.IgniteLogger; |
| import org.apache.ignite.internal.logger.Loggers; |
| import org.apache.ignite.internal.systemview.api.SystemView; |
| import org.apache.ignite.internal.systemview.api.SystemViewProvider; |
| import org.apache.ignite.internal.systemview.api.SystemViews; |
| import org.apache.ignite.internal.util.ExceptionUtils; |
| import org.apache.ignite.internal.util.IgniteSpinBusyLock; |
| import org.apache.ignite.internal.util.PendingComparableValuesTracker; |
| import org.apache.ignite.internal.util.SubscriptionUtils; |
| import org.apache.ignite.internal.util.TransformingIterator; |
| import org.apache.ignite.lang.ErrorGroups.Common; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * Catalog service implementation. |
| */ |
| public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, CatalogEventParameters> |
| implements CatalogManager, SystemViewProvider { |
| static String DEFAULT_ZONE_NAME = "Default"; |
| |
| private static final int MAX_RETRY_COUNT = 10; |
| |
| private static final int SYSTEM_VIEW_STRING_COLUMN_LENGTH = Short.MAX_VALUE; |
| |
| /** Safe time to wait before new Catalog version activation. */ |
| static final int DEFAULT_DELAY_DURATION = 0; |
| |
| static final int DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD = 0; |
| |
| /** |
| * Initial update token for a catalog descriptor, this token is valid only before the first call of |
| * {@link UpdateEntry#applyUpdate(Catalog, long)}. |
| * |
| * <p>After that {@link CatalogObjectDescriptor#updateToken()} will be initialised with a causality token from |
| * {@link UpdateEntry#applyUpdate(Catalog, long)} |
| */ |
| public static final long INITIAL_CAUSALITY_TOKEN = 0L; |
| |
| /** The logger. */ |
| private static final IgniteLogger LOG = Loggers.forClass(CatalogManagerImpl.class); |
| |
| /** Versioned catalog descriptors. */ |
| private final NavigableMap<Integer, Catalog> catalogByVer = new ConcurrentSkipListMap<>(); |
| |
| /** Versioned catalog descriptors sorted in chronological order. */ |
| private final NavigableMap<Long, Catalog> catalogByTs = new ConcurrentSkipListMap<>(); |
| |
| /** A future that completes when an empty catalog is initialised. If catalog is not empty this future when this completes starts. */ |
| private final CompletableFuture<Void> catalogInitializationFuture = new CompletableFuture<>(); |
| |
| private final UpdateLog updateLog; |
| |
| private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0); |
| |
| private final ClockService clockService; |
| |
| private final LongSupplier delayDurationMsSupplier; |
| |
| private final LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier; |
| |
| /** Busy lock to stop synchronously. */ |
| private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); |
| |
| /** |
| * Constructor. |
| */ |
| public CatalogManagerImpl(UpdateLog updateLog, ClockService clockService) { |
| this(updateLog, clockService, () -> DEFAULT_DELAY_DURATION, () -> DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD); |
| } |
| |
| /** |
| * Constructor. |
| */ |
| public CatalogManagerImpl( |
| UpdateLog updateLog, |
| ClockService clockService, |
| LongSupplier delayDurationMsSupplier, |
| LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier |
| ) { |
| this.updateLog = updateLog; |
| this.clockService = clockService; |
| this.delayDurationMsSupplier = delayDurationMsSupplier; |
| this.partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier; |
| } |
| |
| @Override |
| public CompletableFuture<Void> startAsync() { |
| int objectIdGen = 0; |
| |
| Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(), List.of(), null); |
| |
| registerCatalog(emptyCatalog); |
| |
| updateLog.registerUpdateHandler(new OnUpdateHandlerImpl()); |
| |
| return updateLog.startAsync() |
| .thenCompose(none -> { |
| if (latestCatalogVersion() == emptyCatalog.version()) { |
| int initializedCatalogVersion = emptyCatalog.version() + 1; |
| |
| this.catalogReadyFuture(initializedCatalogVersion) |
| .thenCompose(ignored -> awaitVersionActivation(initializedCatalogVersion)) |
| .handle((r, e) -> catalogInitializationFuture.complete(null)); |
| |
| return initCatalog(emptyCatalog); |
| } else { |
| catalogInitializationFuture.complete(null); |
| return nullCompletedFuture(); |
| } |
| }); |
| } |
| |
| @Override |
| public CompletableFuture<Void> stopAsync() { |
| busyLock.block(); |
| versionTracker.close(); |
| return updateLog.stopAsync(); |
| } |
| |
| @Override |
| public @Nullable CatalogTableDescriptor table(String tableName, long timestamp) { |
| CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME); |
| if (schema == null) { |
| return null; |
| } |
| return schema.table(tableName); |
| } |
| |
| @Override |
| public @Nullable CatalogTableDescriptor table(int tableId, long timestamp) { |
| return catalogAt(timestamp).table(tableId); |
| } |
| |
| @Override |
| public @Nullable CatalogTableDescriptor table(int tableId, int catalogVersion) { |
| return catalog(catalogVersion).table(tableId); |
| } |
| |
| @Override |
| public Collection<CatalogTableDescriptor> tables(int catalogVersion) { |
| return catalog(catalogVersion).tables(); |
| } |
| |
| @Override |
| public @Nullable CatalogIndexDescriptor aliveIndex(String indexName, long timestamp) { |
| CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME); |
| if (schema == null) { |
| return null; |
| } |
| return schema.aliveIndex(indexName); |
| } |
| |
| @Override |
| public @Nullable CatalogIndexDescriptor index(int indexId, long timestamp) { |
| return catalogAt(timestamp).index(indexId); |
| } |
| |
| @Override |
| public @Nullable CatalogIndexDescriptor index(int indexId, int catalogVersion) { |
| return catalog(catalogVersion).index(indexId); |
| } |
| |
| @Override |
| public Collection<CatalogIndexDescriptor> indexes(int catalogVersion) { |
| return catalog(catalogVersion).indexes(); |
| } |
| |
| @Override |
| public List<CatalogIndexDescriptor> indexes(int catalogVersion, int tableId) { |
| return catalog(catalogVersion).indexes(tableId); |
| } |
| |
| @Override |
| public @Nullable CatalogSchemaDescriptor schema(int catalogVersion) { |
| return schema(DEFAULT_SCHEMA_NAME, catalogVersion); |
| } |
| |
| @Override |
| public @Nullable CatalogSchemaDescriptor schema(String schemaName, int catalogVersion) { |
| Catalog catalog = catalog(catalogVersion); |
| |
| if (catalog == null) { |
| return null; |
| } |
| |
| return catalog.schema(schemaName == null ? DEFAULT_SCHEMA_NAME : schemaName); |
| } |
| |
| @Override |
| public @Nullable CatalogSchemaDescriptor schema(int schemaId, int catalogVersion) { |
| Catalog catalog = catalog(catalogVersion); |
| |
| return catalog == null ? null : catalog.schema(schemaId); |
| } |
| |
| @Override |
| public @Nullable CatalogZoneDescriptor zone(String zoneName, long timestamp) { |
| return catalogAt(timestamp).zone(zoneName); |
| } |
| |
| @Override |
| public @Nullable CatalogZoneDescriptor zone(int zoneId, long timestamp) { |
| return catalogAt(timestamp).zone(zoneId); |
| } |
| |
| @Override |
| public @Nullable CatalogZoneDescriptor zone(int zoneId, int catalogVersion) { |
| return catalog(catalogVersion).zone(zoneId); |
| } |
| |
| @Override |
| public Collection<CatalogZoneDescriptor> zones(int catalogVersion) { |
| return catalog(catalogVersion).zones(); |
| } |
| |
| @Override |
| public @Nullable CatalogSchemaDescriptor activeSchema(long timestamp) { |
| return catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME); |
| } |
| |
| @Override |
| public @Nullable CatalogSchemaDescriptor activeSchema(String schemaName, long timestamp) { |
| return catalogAt(timestamp).schema(schemaName == null ? DEFAULT_SCHEMA_NAME : schemaName); |
| } |
| |
| @Override |
| public int activeCatalogVersion(long timestamp) { |
| return catalogAt(timestamp).version(); |
| } |
| |
| @Override |
| public int earliestCatalogVersion() { |
| return catalogByVer.firstEntry().getKey(); |
| } |
| |
| @Override |
| public int latestCatalogVersion() { |
| return catalogByVer.lastEntry().getKey(); |
| } |
| |
| @Override |
| public CompletableFuture<Void> catalogReadyFuture(int version) { |
| return versionTracker.waitFor(version); |
| } |
| |
| @Override |
| public CompletableFuture<Void> catalogInitializationFuture() { |
| return catalogInitializationFuture; |
| } |
| |
| @Override |
| public @Nullable Catalog catalog(int catalogVersion) { |
| return catalogByVer.get(catalogVersion); |
| } |
| |
| private Catalog catalogAt(long timestamp) { |
| Entry<Long, Catalog> entry = catalogByTs.floorEntry(timestamp); |
| |
| if (entry == null) { |
| throw new IllegalStateException("No valid schema found for given timestamp: " + timestamp); |
| } |
| |
| return entry.getValue(); |
| } |
| |
| @Override |
| public CompletableFuture<Integer> execute(CatalogCommand command) { |
| return saveUpdateAndWaitForActivation(command); |
| } |
| |
| @Override |
| public CompletableFuture<Integer> execute(List<CatalogCommand> commands) { |
| if (nullOrEmpty(commands)) { |
| return nullCompletedFuture(); |
| } |
| |
| return saveUpdateAndWaitForActivation(new BulkUpdateProducer(List.copyOf(commands))); |
| } |
| |
| /** |
| * Cleanup outdated catalog versions, which can't be observed after given timestamp (inclusively), and compact underlying update log. |
| * |
| * @param timestamp Earliest observable timestamp. |
| * @return Operation future, which is completing with {@code true} if a new snapshot has been successfully written, {@code false} |
| * otherwise if a snapshot with the same or greater version already exists. |
| */ |
| public CompletableFuture<Boolean> compactCatalog(long timestamp) { |
| Catalog catalog = catalogAt(timestamp); |
| |
| return updateLog.saveSnapshot(new SnapshotEntry(catalog)); |
| } |
| |
| private CompletableFuture<Void> initCatalog(Catalog emptyCatalog) { |
| List<CatalogCommand> initCommands = List.of( |
| // Init default zone |
| CreateZoneCommand.builder() |
| .zoneName(DEFAULT_ZONE_NAME) |
| .partitions(DEFAULT_PARTITION_COUNT) |
| .replicas(DEFAULT_REPLICA_COUNT) |
| .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE) |
| .dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE) |
| .filter(DEFAULT_FILTER) |
| .storageProfilesParams( |
| List.of(StorageProfileParams.builder().storageProfile(CatalogService.DEFAULT_STORAGE_PROFILE).build()) |
| ) |
| .build(), |
| AlterZoneSetDefaultCommand.builder() |
| .zoneName(DEFAULT_ZONE_NAME) |
| .build(), |
| // Add schemas |
| CreateSchemaCommand.builder().name(DEFAULT_SCHEMA_NAME).build(), |
| CreateSchemaCommand.builder().name(SYSTEM_SCHEMA_NAME).build() |
| ); |
| |
| List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(emptyCatalog); |
| |
| return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, entries)) |
| .handle((result, error) -> { |
| if (error != null) { |
| LOG.warn("Unable to create default zone.", error); |
| } |
| |
| return null; |
| }); |
| } |
| |
| private void registerCatalog(Catalog newCatalog) { |
| catalogByVer.put(newCatalog.version(), newCatalog); |
| catalogByTs.put(newCatalog.time(), newCatalog); |
| } |
| |
| private void truncateUpTo(Catalog catalog) { |
| catalogByVer.headMap(catalog.version(), false).clear(); |
| catalogByTs.headMap(catalog.time(), false).clear(); |
| |
| LOG.info("Catalog history was truncated up to version=" + catalog.version()); |
| } |
| |
| private CompletableFuture<Integer> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) { |
| CompletableFuture<Integer> resultFuture = new CompletableFuture<>(); |
| |
| saveUpdate(updateProducer, 0) |
| .thenCompose(this::awaitVersionActivation) |
| .whenComplete((newVersion, err) -> { |
| if (err != null) { |
| Throwable errUnwrapped = ExceptionUtils.unwrapCause(err); |
| |
| if (errUnwrapped instanceof CatalogVersionAwareValidationException) { |
| CatalogVersionAwareValidationException err0 = (CatalogVersionAwareValidationException) errUnwrapped; |
| Catalog catalog = catalogByVer.get(err0.version()); |
| Throwable error = err0.initial(); |
| |
| if (catalog.version() == 0) { |
| resultFuture.completeExceptionally(error); |
| } else { |
| HybridTimestamp tsSafeForRoReadingInPastOptimization = calcClusterWideEnsureActivationTime(catalog); |
| |
| clockService.waitFor(tsSafeForRoReadingInPastOptimization) |
| .whenComplete((ver, err1) -> { |
| if (err1 != null) { |
| error.addSuppressed(err1); |
| } |
| |
| resultFuture.completeExceptionally(error); |
| }); |
| } |
| } else { |
| resultFuture.completeExceptionally(err); |
| } |
| } else { |
| resultFuture.complete(newVersion); |
| } |
| }); |
| |
| return resultFuture; |
| } |
| |
| private CompletableFuture<Integer> awaitVersionActivation(int version) { |
| Catalog catalog = catalogByVer.get(version); |
| |
| HybridTimestamp tsSafeForRoReadingInPastOptimization = calcClusterWideEnsureActivationTime(catalog); |
| |
| return clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused -> version); |
| } |
| |
| private HybridTimestamp calcClusterWideEnsureActivationTime(Catalog catalog) { |
| return clusterWideEnsuredActivationTsSafeForRoReads( |
| catalog, |
| partitionIdleSafeTimePropagationPeriodMsSupplier, |
| clockService.maxClockSkewMillis()); |
| } |
| |
| /** |
| * Attempts to save a versioned update using a CAS-like logic. If the attempt fails, makes more attempts |
| * until the max retry count is reached. |
| * |
| * @param updateProducer Supplies simple updates to include into a versioned update to install. |
| * @param attemptNo Ordinal number of an attempt. |
| * @return Future that completes with the new Catalog version (if update was saved successfully) or an exception, otherwise. |
| */ |
| private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int attemptNo) { |
| if (!busyLock.enterBusy()) { |
| return failedFuture(new NodeStoppingException()); |
| } |
| |
| try { |
| if (attemptNo >= MAX_RETRY_COUNT) { |
| return failedFuture(new IgniteInternalException(Common.INTERNAL_ERR, "Max retry limit exceeded: " + attemptNo)); |
| } |
| |
| Catalog catalog = catalogByVer.lastEntry().getValue(); |
| |
| List<UpdateEntry> updates; |
| try { |
| updates = updateProducer.get(catalog); |
| } catch (CatalogValidationException ex) { |
| return failedFuture(new CatalogVersionAwareValidationException(ex, catalog.version())); |
| } catch (Exception ex) { |
| return failedFuture(ex); |
| } |
| |
| if (updates.isEmpty()) { |
| return completedFuture(catalog.version()); |
| } |
| |
| int newVersion = catalog.version() + 1; |
| |
| // It is quite important to preserve such behavior: we wait here for versionTracker to be updated. It is updated when all events |
| // that were triggered by this change will be completed. That means that any Catalog update will be completed only |
| // after all reactions to that event will be completed through the catalog event notifications mechanism. |
| // This is important for the distribution zones recovery purposes: |
| // we guarantee recovery for a zones' catalog actions only if that actions were completed. |
| return updateLog.append(new VersionedUpdate(newVersion, delayDurationMsSupplier.getAsLong(), updates)) |
| .thenCompose(result -> versionTracker.waitFor(newVersion).thenApply(none -> result)) |
| .thenCompose(result -> { |
| if (result) { |
| return completedFuture(newVersion); |
| } |
| |
| return saveUpdate(updateProducer, attemptNo + 1); |
| }); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| @Override |
| public List<SystemView<?>> systemViews() { |
| return List.of( |
| createSystemViewsView(), |
| createSystemViewColumnsView(), |
| createZonesView(), |
| createIndexesView() |
| ); |
| } |
| |
| class OnUpdateHandlerImpl implements OnUpdateHandler { |
| @Override |
| public CompletableFuture<Void> handle(UpdateLogEvent event, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) { |
| if (event instanceof SnapshotEntry) { |
| return handle((SnapshotEntry) event); |
| } |
| |
| return handle((VersionedUpdate) event, metaStorageUpdateTimestamp, causalityToken); |
| } |
| |
| private CompletableFuture<Void> handle(SnapshotEntry event) { |
| Catalog catalog = event.snapshot(); |
| // On recovery phase, we must register catalog from the snapshot. |
| // In other cases, it is ok to rewrite an existed version, because it's exactly the same. |
| registerCatalog(catalog); |
| truncateUpTo(catalog); |
| |
| return nullCompletedFuture(); |
| } |
| |
| private CompletableFuture<Void> handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) { |
| int version = update.version(); |
| Catalog catalog = catalogByVer.get(version - 1); |
| |
| assert catalog != null : version - 1; |
| |
| for (UpdateEntry entry : update.entries()) { |
| catalog = entry.applyUpdate(catalog, causalityToken); |
| } |
| |
| catalog = applyUpdateFinal(catalog, update, metaStorageUpdateTimestamp); |
| |
| registerCatalog(catalog); |
| |
| List<CompletableFuture<?>> eventFutures = new ArrayList<>(update.entries().size()); |
| |
| for (UpdateEntry entry : update.entries()) { |
| if (entry instanceof Fireable) { |
| Fireable fireEvent = (Fireable) entry; |
| |
| eventFutures.add(fireEvent( |
| fireEvent.eventType(), |
| fireEvent.createEventParameters(causalityToken, version) |
| )); |
| } |
| } |
| |
| // It is quite important to preserve such behavior: we wait for all events to be completed and only after that we complete |
| // versionTracker, which is used for saving any update to Catalog. That means that any Catalog update will be completed only |
| // after all reactions to that event will be completed through the catalog event notifications mechanism. |
| // This is important for the distribution zones recovery purposes: |
| // we guarantee recovery for a zones' catalog actions only if that actions were completed. |
| return allOf(eventFutures.toArray(CompletableFuture[]::new)) |
| .whenComplete((ignore, err) -> { |
| if (err != null) { |
| LOG.warn("Failed to apply catalog update.", err); |
| // TODO: IGNITE-14611 Pass exception to an error handler because catalog got into inconsistent state. |
| } |
| |
| versionTracker.update(version, null); |
| }); |
| } |
| } |
| |
| private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp) { |
| long activationTimestamp = metaStorageUpdateTimestamp.addPhysicalTime(update.delayDurationMs()).longValue(); |
| |
| assert activationTimestamp > catalog.time() |
| : "Activation timestamp " + activationTimestamp + " must be greater than previous catalog version activation timestamp " |
| + catalog.time(); |
| |
| return new Catalog( |
| update.version(), |
| activationTimestamp, |
| catalog.objectIdGenState(), |
| catalog.zones(), |
| catalog.schemas(), |
| defaultZoneIdOpt(catalog) |
| ); |
| } |
| |
| private static class BulkUpdateProducer implements UpdateProducer { |
| private final List<? extends UpdateProducer> commands; |
| |
| BulkUpdateProducer(List<? extends UpdateProducer> producers) { |
| this.commands = producers; |
| } |
| |
| @Override |
| public List<UpdateEntry> get(Catalog catalog) { |
| List<UpdateEntry> bulkUpdateEntries = new ArrayList<>(); |
| |
| for (UpdateProducer producer : commands) { |
| List<UpdateEntry> entries = producer.get(catalog); |
| |
| for (UpdateEntry entry : entries) { |
| catalog = entry.applyUpdate(catalog, INITIAL_CAUSALITY_TOKEN); |
| } |
| |
| bulkUpdateEntries.addAll(entries); |
| } |
| |
| return bulkUpdateEntries; |
| } |
| } |
| |
| private SystemView<?> createSystemViewsView() { |
| Iterable<SchemaAwareDescriptor<CatalogSystemViewDescriptor>> viewData = () -> { |
| Catalog catalog = catalogAt(clockService.nowLong()); |
| |
| return catalog.schemas().stream() |
| .flatMap(schema -> Arrays.stream(schema.systemViews()) |
| .map(viewDescriptor -> new SchemaAwareDescriptor<>(viewDescriptor, schema.name())) |
| ) |
| .iterator(); |
| }; |
| |
| Publisher<SchemaAwareDescriptor<CatalogSystemViewDescriptor>> viewDataPublisher = SubscriptionUtils.fromIterable(viewData); |
| |
| return SystemViews.<SchemaAwareDescriptor<CatalogSystemViewDescriptor>>clusterViewBuilder() |
| .name("SYSTEM_VIEWS") |
| .addColumn("ID", INT32, entry -> entry.descriptor.id()) |
| .addColumn("SCHEMA", stringOf(SYSTEM_VIEW_STRING_COLUMN_LENGTH), |
| entry -> entry.schema) |
| .addColumn("NAME", stringOf(SYSTEM_VIEW_STRING_COLUMN_LENGTH), |
| entry -> entry.descriptor.name()) |
| .addColumn("TYPE", stringOf(SYSTEM_VIEW_STRING_COLUMN_LENGTH), |
| entry -> entry.descriptor.systemViewType().name()) |
| .dataProvider(viewDataPublisher) |
| .build(); |
| } |
| |
| private SystemView<?> createSystemViewColumnsView() { |
| Iterable<ParentIdAwareDescriptor<CatalogTableColumnDescriptor>> viewData = () -> { |
| Catalog catalog = catalogAt(clockService.nowLong()); |
| |
| return catalog.schemas().stream() |
| .flatMap(schema -> Arrays.stream(schema.systemViews())) |
| .flatMap(viewDescriptor -> viewDescriptor.columns().stream() |
| .map(columnDescriptor -> new ParentIdAwareDescriptor<>(columnDescriptor, viewDescriptor.id())) |
| ) |
| .iterator(); |
| }; |
| |
| Publisher<ParentIdAwareDescriptor<CatalogTableColumnDescriptor>> viewDataPublisher = SubscriptionUtils.fromIterable(viewData); |
| |
| return SystemViews.<ParentIdAwareDescriptor<CatalogTableColumnDescriptor>>clusterViewBuilder() |
| .name("SYSTEM_VIEW_COLUMNS") |
| .addColumn("VIEW_ID", INT32, entry -> entry.id) |
| .addColumn("NAME", stringOf(SYSTEM_VIEW_STRING_COLUMN_LENGTH), entry -> entry.descriptor.name()) |
| .addColumn("TYPE", stringOf(SYSTEM_VIEW_STRING_COLUMN_LENGTH), entry -> entry.descriptor.type().name()) |
| .addColumn("NULLABLE", BOOLEAN, entry -> entry.descriptor.nullable()) |
| .addColumn("PRECISION", INT32, entry -> entry.descriptor.precision()) |
| .addColumn("SCALE", INT32, entry -> entry.descriptor.scale()) |
| .addColumn("LENGTH", INT32, entry -> entry.descriptor.length()) |
| .dataProvider(viewDataPublisher) |
| .build(); |
| } |
| |
| private SystemView<?> createZonesView() { |
| return SystemViews.<ZoneWithDefaultMarker>clusterViewBuilder() |
| .name("ZONES") |
| .<String>addColumn("NAME", STRING, wrapper -> wrapper.zone.name()) |
| .<Integer>addColumn("PARTITIONS", INT32, wrapper -> wrapper.zone.partitions()) |
| .<Integer>addColumn("REPLICAS", INT32, wrapper -> wrapper.zone.replicas()) |
| .<Integer>addColumn("DATA_NODES_AUTO_ADJUST_SCALE_UP", INT32, wrapper -> wrapper.zone.dataNodesAutoAdjustScaleUp()) |
| .<Integer>addColumn("DATA_NODES_AUTO_ADJUST_SCALE_DOWN", INT32, wrapper -> wrapper.zone.dataNodesAutoAdjustScaleDown()) |
| .<String>addColumn("DATA_NODES_FILTER", STRING, wrapper -> wrapper.zone.filter()) |
| .<Boolean>addColumn("IS_DEFAULT_ZONE", BOOLEAN, wrapper -> wrapper.isDefault) |
| .dataProvider(SubscriptionUtils.fromIterable(() -> { |
| Catalog catalog = catalogAt(clockService.nowLong()); |
| CatalogZoneDescriptor defaultZone = catalog.defaultZone(); |
| return new TransformingIterator<>(catalog.zones().iterator(), |
| (zone) -> new ZoneWithDefaultMarker(zone, defaultZone != null && defaultZone.id() == zone.id())); |
| } |
| )) |
| .build(); |
| } |
| |
| private SystemView<?> createIndexesView() { |
| Iterable<CatalogAwareDescriptor<CatalogIndexDescriptor>> viewData = () -> { |
| Catalog catalog = catalogAt(clockService.nowLong()); |
| |
| return catalog.indexes().stream() |
| .filter(index -> index.status().isAlive()) |
| .map(index -> new CatalogAwareDescriptor<>(index, catalog)) |
| .iterator(); |
| }; |
| |
| return SystemViews.<CatalogAwareDescriptor<CatalogIndexDescriptor>>clusterViewBuilder() |
| .name("INDEXES") |
| .addColumn("INDEX_ID", INT32, entry -> entry.descriptor.id()) |
| .addColumn("INDEX_NAME", STRING, entry -> entry.descriptor.name()) |
| .addColumn("TABLE_ID", INT32, entry -> entry.descriptor.tableId()) |
| .addColumn("TABLE_NAME", STRING, entry -> getTableDescriptor(entry).name()) |
| .addColumn("SCHEMA_ID", INT32, CatalogManagerImpl::getSchemaId) |
| .addColumn("SCHEMA_NAME", STRING, entry -> entry.catalog.schema(getSchemaId(entry)).name()) |
| .addColumn("TYPE", STRING, entry -> entry.descriptor.indexType().name()) |
| .addColumn("IS_UNIQUE", BOOLEAN, entry -> entry.descriptor.unique()) |
| .addColumn("COLUMNS", STRING, CatalogManagerImpl::getColumnsString) |
| .addColumn("STATUS", STRING, entry -> entry.descriptor.status().name()) |
| .dataProvider(SubscriptionUtils.fromIterable(viewData)) |
| .build(); |
| } |
| |
| private static int getSchemaId(CatalogAwareDescriptor<CatalogIndexDescriptor> entry) { |
| return getTableDescriptor(entry).schemaId(); |
| } |
| |
| private static CatalogTableDescriptor getTableDescriptor(CatalogAwareDescriptor<CatalogIndexDescriptor> entry) { |
| return entry.catalog.table(entry.descriptor.tableId()); |
| } |
| |
| private static String getColumnsString(CatalogAwareDescriptor<CatalogIndexDescriptor> entry) { |
| return entry.descriptor.indexType() == HASH |
| ? String.join(", ", ((CatalogHashIndexDescriptor) entry.descriptor).columns()) |
| : ((CatalogSortedIndexDescriptor) entry.descriptor) |
| .columns() |
| .stream() |
| .map(column -> column.name() + (column.collation().asc() ? " ASC" : " DESC")) |
| .collect(joining(", ")); |
| } |
| |
| /** Wraps a CatalogZoneDescriptor and a flag indicating whether this zone is the default zone. */ |
| static class ZoneWithDefaultMarker { |
| private final CatalogZoneDescriptor zone; |
| private final boolean isDefault; |
| |
| ZoneWithDefaultMarker(CatalogZoneDescriptor zone, boolean isDefault) { |
| this.zone = zone; |
| this.isDefault = isDefault; |
| } |
| } |
| |
| /** |
| * A container that keeps given descriptor along with name of the schema this |
| * descriptor belongs to. |
| */ |
| private static class SchemaAwareDescriptor<T> { |
| private final T descriptor; |
| private final String schema; |
| |
| SchemaAwareDescriptor(T descriptor, String schema) { |
| this.descriptor = descriptor; |
| this.schema = schema; |
| } |
| } |
| |
| /** |
| * A container that keeps given descriptor along with its parent's id. |
| */ |
| private static class ParentIdAwareDescriptor<T> { |
| private final T descriptor; |
| private final int id; |
| |
| ParentIdAwareDescriptor(T descriptor, int id) { |
| this.descriptor = descriptor; |
| this.id = id; |
| } |
| } |
| |
| /** |
| * A container that keeps given descriptor along with the catalog it belongs to. |
| */ |
| private static class CatalogAwareDescriptor<T> { |
| private final T descriptor; |
| private final Catalog catalog; |
| |
| CatalogAwareDescriptor(T descriptor, Catalog catalog) { |
| this.descriptor = descriptor; |
| this.catalog = catalog; |
| } |
| } |
| } |