IGNITE-21859 Causality token stays 0 for default zone (#3653)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
index 40e35b4..301a451 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
@@ -64,7 +64,7 @@
private final long activationTimestamp;
private final Map<String, CatalogSchemaDescriptor> schemasByName;
private final Map<String, CatalogZoneDescriptor> zonesByName;
- private final CatalogZoneDescriptor defaultZone;
+ private final @Nullable CatalogZoneDescriptor defaultZone;
@IgniteToStringExclude
private final Int2ObjectMap<CatalogSchemaDescriptor> schemasById;
@@ -98,7 +98,7 @@
int objectIdGen,
Collection<CatalogZoneDescriptor> zones,
Collection<CatalogSchemaDescriptor> schemas,
- int defaultZoneId
+ @Nullable Integer defaultZoneId
) {
this.version = version;
this.activationTimestamp = activationTimestamp;
@@ -115,10 +115,15 @@
indexesById = schemas.stream().flatMap(s -> Arrays.stream(s.indexes())).collect(toMapById());
indexesByTableId = unmodifiable(toIndexesByTableId(schemas));
zonesById = zones.stream().collect(toMapById());
- defaultZone = zonesById.get(defaultZoneId);
- if (defaultZone == null) {
- throw new IllegalStateException("The default zone was not found among the provided zones [id=" + defaultZoneId + ']');
+ if (defaultZoneId != null) {
+ defaultZone = zonesById.get((int) defaultZoneId);
+
+ if (defaultZone == null) {
+ throw new IllegalStateException("The default zone was not found among the provided zones [id=" + defaultZoneId + ']');
+ }
+ } else {
+ defaultZone = null;
}
}
@@ -178,7 +183,7 @@
return zonesByName.values();
}
- public CatalogZoneDescriptor defaultZone() {
+ public @Nullable CatalogZoneDescriptor defaultZone() {
return defaultZone;
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index e0dcf51..f9523f4 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -21,8 +21,13 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.joining;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_REPLICA_COUNT;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.clusterWideEnsuredActivationTsSafeForRoReads;
-import static org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParams;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor.CatalogIndexDescriptorType.HASH;
import static org.apache.ignite.internal.type.NativeTypes.BOOLEAN;
import static org.apache.ignite.internal.type.NativeTypes.INT32;
@@ -41,6 +46,9 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Flow.Publisher;
import java.util.function.LongSupplier;
+import org.apache.ignite.internal.catalog.commands.AlterZoneSetDefaultCatalogCommand;
+import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
+import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
@@ -128,19 +136,7 @@
* Constructor.
*/
public CatalogManagerImpl(UpdateLog updateLog, ClockService clockService) {
- this(updateLog, clockService, DEFAULT_DELAY_DURATION, DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD);
- }
-
- /**
- * Constructor.
- */
- CatalogManagerImpl(
- UpdateLog updateLog,
- ClockService clockService,
- long delayDurationMs,
- long partitionIdleSafeTimePropagationPeriod
- ) {
- this(updateLog, clockService, () -> delayDurationMs, () -> partitionIdleSafeTimePropagationPeriod);
+ this(updateLog, clockService, () -> DEFAULT_DELAY_DURATION, () -> DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD);
}
/**
@@ -182,16 +178,22 @@
INITIAL_CAUSALITY_TOKEN
);
- CatalogZoneDescriptor defaultZone = fromParams(
- objectIdGen++,
- DEFAULT_ZONE_NAME
- );
+ Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(), List.of(publicSchema, systemSchema), null);
- registerCatalog(new Catalog(0, 0L, objectIdGen, List.of(defaultZone), List.of(publicSchema, systemSchema), defaultZone.id()));
+ registerCatalog(emptyCatalog);
updateLog.registerUpdateHandler(new OnUpdateHandlerImpl());
- return updateLog.startAsync();
+ return updateLog.startAsync()
+ .thenCompose(none -> {
+ if (latestCatalogVersion() == emptyCatalog.version()) {
+ // node has not seen any updates yet, let's try to initialise
+ // catalog with default zone
+ return createDefaultZone(emptyCatalog);
+ }
+
+ return nullCompletedFuture();
+ });
}
@Override
@@ -361,6 +363,34 @@
return updateLog.saveSnapshot(new SnapshotEntry(catalog));
}
+ private CompletableFuture<Void> createDefaultZone(Catalog emptyCatalog) {
+ List<UpdateEntry> createZoneEntries = new BulkUpdateProducer(List.of(
+ CreateZoneCommand.builder()
+ .zoneName(DEFAULT_ZONE_NAME)
+ .partitions(DEFAULT_PARTITION_COUNT)
+ .replicas(DEFAULT_REPLICA_COUNT)
+ .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+ .filter(DEFAULT_FILTER)
+ .storageProfilesParams(
+ List.of(StorageProfileParams.builder().storageProfile(CatalogService.DEFAULT_STORAGE_PROFILE).build())
+ )
+ .build(),
+ AlterZoneSetDefaultCatalogCommand.builder()
+ .zoneName(DEFAULT_ZONE_NAME)
+ .build()
+ )).get(emptyCatalog);
+
+ return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, createZoneEntries))
+ .handle((result, error) -> {
+ if (error != null) {
+ LOG.warn("Unable to create default zone.", error);
+ }
+
+ return null;
+ });
+ }
+
private void registerCatalog(Catalog newCatalog) {
catalogByVer.put(newCatalog.version(), newCatalog);
catalogByTs.put(newCatalog.time(), newCatalog);
@@ -566,7 +596,7 @@
catalog.objectIdGenState(),
catalog.zones(),
catalog.schemas(),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
@@ -660,8 +690,9 @@
.<Boolean>addColumn("IS_DEFAULT_ZONE", BOOLEAN, wrapper -> wrapper.isDefault)
.dataProvider(SubscriptionUtils.fromIterable(() -> {
Catalog catalog = catalogAt(clockService.nowLong());
+ CatalogZoneDescriptor defaultZone = catalog.defaultZone();
return new TransformingIterator<>(catalog.zones().iterator(),
- (zone) -> new ZoneWithDefaultMarker(zone, catalog.defaultZone().id() == zone.id()));
+ (zone) -> new ZoneWithDefaultMarker(zone, defaultZone != null && defaultZone.id() == zone.id()));
}
))
.build();
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterZoneSetDefaultCatalogCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterZoneSetDefaultCatalogCommand.java
index 473940a..81015cf 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterZoneSetDefaultCatalogCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterZoneSetDefaultCatalogCommand.java
@@ -51,7 +51,8 @@
public List<UpdateEntry> get(Catalog catalog) {
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);
- if (zone.id() == catalog.defaultZone().id()) {
+ CatalogZoneDescriptor defaultZone = catalog.defaultZone();
+ if (defaultZone != null && zone.id() == defaultZone.id()) {
// Specified zone already marked as default.
return Collections.emptyList();
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
index 7c6bc8e..d785e7b 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
@@ -141,30 +141,6 @@
}
/**
- * Returns zone descriptor with default parameters.
- *
- * @param id Distribution zone ID.
- * @param zoneName Zone name.
- * @return Distribution zone descriptor.
- */
- public static CatalogZoneDescriptor fromParams(int id, String zoneName) {
- List<StorageProfileParams> storageProfiles =
- List.of(StorageProfileParams.builder().storageProfile(CatalogService.DEFAULT_STORAGE_PROFILE).build());
-
- return new CatalogZoneDescriptor(
- id,
- zoneName,
- DEFAULT_PARTITION_COUNT,
- DEFAULT_REPLICA_COUNT,
- INFINITE_TIMER_VALUE,
- IMMEDIATE_TIMER_VALUE,
- INFINITE_TIMER_VALUE,
- DEFAULT_FILTER,
- fromParams(storageProfiles)
- );
- }
-
- /**
* Converts StorageProfileParams to descriptor.
*
* @param params Parameters.
@@ -598,4 +574,11 @@
partitionIdleSafeTimePropagationPeriodMsSupplier.getAsLong() + maxClockSkewMillis
);
}
+
+ /** Returns id of the default zone from given catalog, or {@code null} if default zone is not exist. */
+ public static @Nullable Integer defaultZoneIdOpt(Catalog catalog) {
+ CatalogZoneDescriptor defaultZone = catalog.defaultZone();
+
+ return defaultZone != null ? defaultZone.id() : null;
+ }
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
index 3de1ec8..565cdce 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
@@ -109,9 +109,16 @@
ensureNoTableIndexOrSysViewExistsWithGivenName(schema, tableName);
- CatalogZoneDescriptor zone = zoneName == null
- ? catalog.defaultZone()
- : zoneOrThrow(catalog, zoneName);
+ CatalogZoneDescriptor zone;
+ if (zoneName == null) {
+ if (catalog.defaultZone() == null) {
+ throw new CatalogValidationException("The zone is not specified. Please specify zone explicitly or set default one.");
+ }
+
+ zone = catalog.defaultZone();
+ } else {
+ zone = zoneOrThrow(catalog, zoneName);
+ }
if (storageProfile == null) {
storageProfile = zone.storageProfiles().defaultProfile().storageProfile();
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropZoneCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropZoneCommand.java
index 6e19e35..6ae466c 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropZoneCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropZoneCommand.java
@@ -51,8 +51,9 @@
@Override
public List<UpdateEntry> get(Catalog catalog) {
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);
+ CatalogZoneDescriptor defaultZone = catalog.defaultZone();
- if (zone.id() == catalog.defaultZone().id()) {
+ if (defaultZone != null && zone.id() == defaultZone.id()) {
throw new DistributionZoneCantBeDroppedValidationException("Default distribution zone can't be dropped: zoneName={}", zoneName);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
index 0e75390..6c6baac 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.indexOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceIndex;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
@@ -55,7 +56,7 @@
catalog.objectIdGenState(),
catalog.zones(),
replaceSchema(replaceIndex(schema, newIndexDescriptor), catalog.schemas()),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index 338aae2..f7cefa5 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -19,6 +19,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
@@ -109,7 +110,7 @@
catalog.objectIdGenState(),
catalog.zones(),
replaceSchema(replaceTable(schema, newTableDescriptor), catalog.schemas()),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
index 039218b..c9bbf22 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.catalog.storage;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import java.io.IOException;
import org.apache.ignite.internal.catalog.Catalog;
@@ -80,7 +81,7 @@
.map(z -> z.id() == descriptor.id() ? descriptor : z)
.collect(toList()),
catalog.schemas(),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index f3ae741..c3cd9f8 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -19,6 +19,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
@@ -111,7 +112,7 @@
catalog.objectIdGenState(),
catalog.zones(),
replaceSchema(replaceTable(schema, newTableDescriptor), catalog.schemas()),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
index dc20ff2..509d284 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
@@ -91,7 +93,7 @@
schema.systemViews(),
causalityToken
), catalog.schemas()),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
index 6afbebc..eaeec85 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.catalog.storage;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import java.io.IOException;
import org.apache.ignite.internal.catalog.Catalog;
@@ -75,7 +76,7 @@
catalog.objectIdGenState(),
catalog.zones().stream().filter(z -> z.id() != zoneId).collect(toList()),
catalog.schemas(),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index 0d2047d..de24c10 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.catalog.storage;
import static java.util.Objects.requireNonNull;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
@@ -107,7 +108,7 @@
catalog.objectIdGenState(),
catalog.zones(),
replaceSchema(replaceTable(schema, newTableDescriptor), catalog.schemas()),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
index 4110b86..59f294c 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+
import java.io.IOException;
import java.util.Objects;
import org.apache.ignite.internal.catalog.Catalog;
@@ -94,7 +96,7 @@
schema.systemViews(),
causalityToken
), catalog.schemas()),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
index 104e131..03d130a 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
@@ -100,7 +102,7 @@
catalog.objectIdGenState(),
catalog.zones(),
CatalogUtils.replaceSchema(newSystemSchema, catalog.schemas()),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
index 18507ab..563bd5e 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+
import java.io.IOException;
import java.util.List;
import java.util.Objects;
@@ -96,7 +98,7 @@
catalog.objectIdGenState(),
catalog.zones(),
schemas,
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
index 6a00124..84c89d6 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+
import java.io.IOException;
import java.util.List;
import org.apache.ignite.internal.catalog.Catalog;
@@ -78,7 +80,7 @@
catalog.objectIdGenState(),
CollectionUtils.concat(catalog.zones(), List.of(descriptor)),
catalog.schemas(),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
index 66fe037..7fe128d 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+
import java.io.IOException;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
@@ -55,7 +57,7 @@
catalog.objectIdGenState() + delta,
catalog.zones(),
catalog.schemas(),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RemoveIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RemoveIndexEntry.java
index da155f2..a660e69 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RemoveIndexEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RemoveIndexEntry.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.storage.AbstractChangeIndexStatusEntry.schemaByIndexId;
import java.io.IOException;
@@ -83,7 +84,7 @@
schema.systemViews(),
causalityToken
), catalog.schemas()),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameIndexEntry.java
index 550e418..1b2278f 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameIndexEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameIndexEntry.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.indexOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceIndex;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
@@ -72,7 +73,7 @@
catalog.objectIdGenState(),
catalog.zones(),
replaceSchema(replaceIndex(schemaDescriptor, newIndexDescriptor), catalog.schemas()),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
index 99b1ec6..420fc4a 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.catalog.storage;
import static java.util.Objects.requireNonNull;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
@@ -81,7 +82,7 @@
catalog.objectIdGenState(),
catalog.zones(),
replaceSchema(replaceTable(schemaDescriptor, newTableDescriptor), catalog.schemas()),
- catalog.defaultZone().id()
+ defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/SnapshotEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/SnapshotEntry.java
index 29f2980..f9134be 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/SnapshotEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/SnapshotEntry.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -30,6 +32,7 @@
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.jetbrains.annotations.Nullable;
/**
* A catalog snapshot entry.
@@ -42,7 +45,7 @@
private final int objectIdGenState;
private final CatalogZoneDescriptor[] zones;
private final CatalogSchemaDescriptor[] schemas;
- private final int defaultZoneId;
+ private final @Nullable Integer defaultZoneId;
/**
* Constructs the object.
@@ -51,7 +54,7 @@
*/
public SnapshotEntry(Catalog catalog) {
this(catalog.version(), catalog.time(), catalog.objectIdGenState(), catalog.zones().toArray(CatalogZoneDescriptor[]::new),
- catalog.schemas().toArray(CatalogSchemaDescriptor[]::new), catalog.defaultZone().id());
+ catalog.schemas().toArray(CatalogSchemaDescriptor[]::new), defaultZoneIdOpt(catalog));
}
/**
@@ -63,7 +66,7 @@
int objectIdGenState,
CatalogZoneDescriptor[] zones,
CatalogSchemaDescriptor[] schemas,
- int defaultZoneId
+ @Nullable Integer defaultZoneId
) {
this.version = version;
this.activationTime = activationTime;
@@ -129,7 +132,7 @@
public SnapshotEntry readFrom(IgniteDataInput input) throws IOException {
int catalogVersion = input.readInt();
long activationTime = input.readLong();
- int objectIdGenState = input.readInt();
+ int objectIdGenState = input.readInt();
CatalogZoneDescriptor[] zones =
CatalogSerializationUtils.readArray(CatalogZoneDescriptor.SERIALIZER, input, CatalogZoneDescriptor.class);
@@ -137,7 +140,10 @@
CatalogSchemaDescriptor[] schemas =
CatalogSerializationUtils.readArray(CatalogSchemaDescriptor.SERIALIZER, input, CatalogSchemaDescriptor.class);
- int defaultZoneId = input.readInt();
+ Integer defaultZoneId = null;
+ if (input.readBoolean()) {
+ defaultZoneId = input.readInt();
+ }
return new SnapshotEntry(catalogVersion, activationTime, objectIdGenState, zones, schemas, defaultZoneId);
}
@@ -151,7 +157,10 @@
CatalogSerializationUtils.writeArray(entry.zones, CatalogZoneDescriptor.SERIALIZER, output);
CatalogSerializationUtils.writeArray(entry.schemas, CatalogSchemaDescriptor.SERIALIZER, output);
- output.writeInt(entry.defaultZoneId);
+ output.writeBoolean(entry.defaultZoneId != null);
+ if (entry.defaultZoneId != null) {
+ output.writeInt(entry.defaultZoneId);
+ }
}
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index fa7e3ae..242dc3b 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -144,7 +144,11 @@
}
@Override
- public void registerUpdateHandler(OnUpdateHandler handler) {
+ public synchronized void registerUpdateHandler(OnUpdateHandler handler) {
+ if (onUpdateHandler != null) {
+ throw new IllegalStateException("onUpdateHandler handler already registered");
+ }
+
onUpdateHandler = handler;
}
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
index caff7da..5d0eb96 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
@@ -27,6 +27,7 @@
import static org.apache.ignite.internal.catalog.commands.DefaultValue.constant;
import static org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.ASC_NULLS_LAST;
import static org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.DESC_NULLS_FIRST;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.apache.ignite.sql.ColumnType.STRING;
@@ -68,38 +69,42 @@
CatalogSchemaDescriptor defaultSchema = manager.schema(DEFAULT_SCHEMA_NAME, 0);
assertNotNull(defaultSchema);
+ assertNull(manager.catalog(0).defaultZone());
assertSame(defaultSchema, manager.activeSchema(DEFAULT_SCHEMA_NAME, clock.nowLong()));
assertSame(defaultSchema, manager.schema(0));
assertSame(defaultSchema, manager.activeSchema(clock.nowLong()));
- assertNull(manager.schema(1));
+ Catalog catalogWithDefaultZone = manager.catalog(1);
+ assertNotNull(catalogWithDefaultZone);
+
+ CatalogZoneDescriptor defaultZone = catalogWithDefaultZone.defaultZone();
+ assertNotNull(defaultZone);
+ assertTrue(
+ defaultZone.updateToken() > INITIAL_CAUSALITY_TOKEN,
+ "Non default token was expected"
+ );
+ assertNotNull(Objects.requireNonNull(manager.catalog(manager.activeCatalogVersion(clock.nowLong()))).defaultZone());
+
+ assertNull(manager.schema(2));
assertThrows(IllegalStateException.class, () -> manager.activeSchema(-1L));
// Validate default schema.
assertEquals(INITIAL_CAUSALITY_TOKEN, defaultSchema.updateToken());
-
- // Default distribution zone must exists.
- CatalogZoneDescriptor zone = Objects.requireNonNull(manager.catalog(manager.activeCatalogVersion(clock.nowLong()))).defaultZone();
-
- assertNotNull(zone);
-
- assertEquals(INITIAL_CAUSALITY_TOKEN, zone.updateToken());
}
@Test
public void testCreateTable() {
- assertThat(
+ int tableCreationVersion = await(
manager.execute(createTableCommand(
TABLE_NAME,
List.of(columnParams("key1", INT32), columnParams("key2", INT32), columnParams("val", INT32, true)),
List.of("key1", "key2"),
List.of("key2")
- )),
- willCompleteSuccessfully()
+ ))
);
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = manager.schema(0);
+ CatalogSchemaDescriptor schema = manager.schema(tableCreationVersion - 1);
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
@@ -110,7 +115,7 @@
assertNull(manager.table(TABLE_NAME, 123L));
// Validate actual catalog.
- schema = manager.schema(SCHEMA_NAME, 1);
+ schema = manager.schema(SCHEMA_NAME, tableCreationVersion);
CatalogTableDescriptor table = schema.table(TABLE_NAME);
assertNotNull(schema);
@@ -126,10 +131,12 @@
assertEquals(table.updateToken(), schema.updateToken());
// Validate another table creation.
- assertThat(manager.execute(simpleTable(TABLE_NAME_2)), willCompleteSuccessfully());
+ int secondTableCreationVersion = await(
+ manager.execute(simpleTable(TABLE_NAME_2))
+ );
// Validate actual catalog. has both tables.
- schema = manager.schema(2);
+ schema = manager.schema(secondTableCreationVersion);
table = schema.table(TABLE_NAME);
CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2);
@@ -154,14 +161,14 @@
@Test
public void testDropTable() {
assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
- assertThat(manager.execute(simpleTable(TABLE_NAME_2)), willCompleteSuccessfully());
+ int secondTableCreationVersion = await(manager.execute(simpleTable(TABLE_NAME_2)));
long beforeDropTimestamp = clock.nowLong();
- assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willCompleteSuccessfully());
+ int tableDropVersion = await(manager.execute(dropTableCommand(TABLE_NAME)));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = manager.schema(2);
+ CatalogSchemaDescriptor schema = manager.schema(secondTableCreationVersion);
CatalogTableDescriptor table1 = schema.table(TABLE_NAME);
CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2);
@@ -181,7 +188,7 @@
assertSame(table2, manager.table(table2.id(), beforeDropTimestamp));
// Validate actual catalog.
- schema = manager.schema(3);
+ schema = manager.schema(tableDropVersion);
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
@@ -270,12 +277,12 @@
@Test
public void testCreateHashIndex() {
- assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
+ int tableCreationVersion = await(manager.execute(simpleTable(TABLE_NAME)));
- assertThat(manager.execute(createHashIndexCommand(INDEX_NAME, List.of("VAL", "ID"))), willCompleteSuccessfully());
+ int indexCreationVersion = await(manager.execute(createHashIndexCommand(INDEX_NAME, List.of("VAL", "ID"))));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = manager.schema(1);
+ CatalogSchemaDescriptor schema = manager.schema(tableCreationVersion);
assertNotNull(schema);
assertNull(schema.aliveIndex(INDEX_NAME));
@@ -286,7 +293,7 @@
assertTrue(schemaCausalityToken > INITIAL_CAUSALITY_TOKEN);
// Validate actual catalog.
- schema = manager.schema(2);
+ schema = manager.schema(indexCreationVersion);
CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) schema.aliveIndex(INDEX_NAME);
@@ -303,7 +310,7 @@
@Test
public void testCreateSortedIndex() {
- assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
+ int tableCreationVersion = await(manager.execute(simpleTable(TABLE_NAME)));
CatalogCommand command = createSortedIndexCommand(
INDEX_NAME,
@@ -312,10 +319,10 @@
List.of(DESC_NULLS_FIRST, ASC_NULLS_LAST)
);
- assertThat(manager.execute(command), willCompleteSuccessfully());
+ int indexCreationVersion = await(manager.execute(command));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = manager.schema(1);
+ CatalogSchemaDescriptor schema = manager.schema(tableCreationVersion);
assertNotNull(schema);
assertNull(schema.aliveIndex(INDEX_NAME));
@@ -325,7 +332,7 @@
assertTrue(schemaCausalityToken > INITIAL_CAUSALITY_TOKEN);
// Validate actual catalog.
- schema = manager.schema(2);
+ schema = manager.schema(indexCreationVersion);
CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor) schema.aliveIndex(INDEX_NAME);
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
index 03f96ad..4104863 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
@@ -27,6 +27,7 @@
import static org.apache.ignite.internal.catalog.BaseCatalogManagerTest.simpleIndex;
import static org.apache.ignite.internal.catalog.BaseCatalogManagerTest.simpleTable;
import static org.apache.ignite.internal.catalog.BaseCatalogManagerTest.startBuildingIndexCommand;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
@@ -39,6 +40,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
@@ -86,8 +88,12 @@
}
@Test
- void testRecoveryCatalogVersionTimestamps() {
+ void testRecoveryCatalogVersionTimestamps() throws InterruptedException {
createAndStartComponents();
+ awaitDefaultZoneCreation();
+
+ // on the first start default zone must be created
+ verify(metaStorageManager).invoke(any());
// Let's create a couple of versions of the catalog.
assertThat(catalogManager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
@@ -114,11 +120,16 @@
HybridTimestamp newNow = clock.now();
assertThat(newNow, greaterThan(updateNow));
+ reset(metaStorageManager);
+
createAndStartComponents();
// Check recovery events.
- verify(interceptor, times(2)).handle(any(VersionedUpdate.class), any(), anyLong());
+ verify(interceptor, times(3)).handle(any(VersionedUpdate.class), any(), anyLong());
verify(interceptor, Mockito.never()).handle(any(SnapshotEntry.class), any(), anyLong());
+ // on recovery no additional invocation should happen
+ verify(metaStorageManager, Mockito.never()).invoke(any());
+
// Let's check that the versions for the points in time at which they were created are in place.
assertThat(catalogManager.activeCatalogVersion(time0), equalTo(catalogVersion0));
@@ -126,8 +137,9 @@
}
@Test
- void testRecoveryCatalogAfterCompaction() {
+ void testRecoveryCatalogAfterCompaction() throws InterruptedException {
createAndStartComponents();
+ awaitDefaultZoneCreation();
// Let's create a couple of versions of the catalog.
assertThat(catalogManager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
@@ -175,8 +187,9 @@
}
@Test
- void testRecoveryIndexCreationCatalogVersion() {
+ void testRecoveryIndexCreationCatalogVersion() throws InterruptedException {
createAndStartComponents();
+ awaitDefaultZoneCreation();
assertThat(catalogManager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
assertThat(catalogManager.execute(simpleIndex(TABLE_NAME, INDEX_NAME)), willCompleteSuccessfully());
@@ -204,13 +217,17 @@
private void createComponents() {
KeyValueStorage keyValueStorage = new TestRocksDbKeyValueStorage(NODE_NAME, workDir);
- metaStorageManager = StandaloneMetaStorageManager.create(keyValueStorage);
+ metaStorageManager = spy(StandaloneMetaStorageManager.create(keyValueStorage));
interceptor = spy(new TestUpdateHandlerInterceptor());
catalogManager = CatalogTestUtils.createTestCatalogManagerWithInterceptor(NODE_NAME, clock, metaStorageManager, interceptor);
}
+ private void awaitDefaultZoneCreation() throws InterruptedException {
+ waitForCondition(() -> catalogManager.latestCatalogVersion() > 0, 5_000);
+ }
+
private void startComponentsAndDeployWatches() {
assertThat(
startAsync(metaStorageManager, catalogManager)
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 53b7191..072a6d6 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -46,6 +46,7 @@
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -82,6 +83,7 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
@@ -179,8 +181,10 @@
assertSame(defaultSchema, manager.schema(defaultSchema.id(), 0));
assertSame(defaultSchema, manager.activeSchema(clock.nowLong()));
- assertNull(manager.schema(1));
- assertNull(manager.schema(defaultSchema.id(), 1));
+ int nonExistingVersion = manager.latestCatalogVersion() + 1;
+
+ assertNull(manager.schema(nonExistingVersion));
+ assertNull(manager.schema(defaultSchema.id(), nonExistingVersion));
assertThrows(IllegalStateException.class, () -> manager.activeSchema(-1L));
// Validate default schema.
@@ -214,7 +218,7 @@
assertEquals(0, systemSchema.tables().length);
assertEquals(0, systemSchema.indexes().length);
- assertThat(manager.latestCatalogVersion(), is(0));
+ assertThat(manager.latestCatalogVersion(), is(1));
}
@Test
@@ -228,16 +232,18 @@
CompletableFuture<Integer> version3Future = manager.execute(simpleTable(TABLE_NAME_2));
assertThat(version3Future, willCompleteSuccessfully());
- assertThat(version1Future.join(), is(1));
- assertThat(version2Future.join(), is(2));
- assertThat(version3Future.join(), is(3));
+ int firstVersion = version1Future.join();
+ assertThat(version2Future.join(), is(firstVersion + 1));
+ assertThat(version3Future.join(), is(firstVersion + 2));
}
@Test
public void testNoInteractionsAfterStop() {
clearInvocations(updateLog);
- CompletableFuture<Void> readyFuture = manager.catalogReadyFuture(1);
+ int futureVersion = manager.latestCatalogVersion() + 1;
+
+ CompletableFuture<Void> readyFuture = manager.catalogReadyFuture(futureVersion);
assertFalse(readyFuture.isDone());
assertThat(manager.stopAsync(), willCompleteSuccessfully());
@@ -254,18 +260,17 @@
@Test
public void testCreateTable() {
- assertThat(
+ int tableCreationVersion = await(
manager.execute(createTableCommand(
TABLE_NAME,
List.of(columnParams("key1", INT32), columnParams("key2", INT32), columnParams("val", INT32, true)),
List.of("key1", "key2"),
List.of("key2")
- )),
- willCompleteSuccessfully()
+ ))
);
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = manager.schema(0);
+ CatalogSchemaDescriptor schema = manager.schema(tableCreationVersion - 1);
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
@@ -277,7 +282,7 @@
assertNull(manager.aliveIndex(pkIndexName(TABLE_NAME), 123L));
// Validate actual catalog
- schema = manager.schema(SCHEMA_NAME, 1);
+ schema = manager.schema(SCHEMA_NAME, tableCreationVersion);
CatalogTableDescriptor table = schema.table(TABLE_NAME);
CatalogHashIndexDescriptor pkIndex = (CatalogHashIndexDescriptor) schema.aliveIndex(pkIndexName(TABLE_NAME));
@@ -312,10 +317,10 @@
assertThat(desc.precision(), is(DEFAULT_PRECISION));
// Validate another table creation.
- assertThat(manager.execute(simpleTable(TABLE_NAME_2)), willCompleteSuccessfully());
+ int secondTableCreationVersion = await(manager.execute(simpleTable(TABLE_NAME_2)));
// Validate actual catalog has both tables.
- schema = manager.schema(2);
+ schema = manager.schema(secondTableCreationVersion);
table = schema.table(TABLE_NAME);
pkIndex = (CatalogHashIndexDescriptor) schema.aliveIndex(pkIndexName(TABLE_NAME));
CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2);
@@ -357,10 +362,10 @@
long beforeDropTimestamp = clock.nowLong();
- assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willCompleteSuccessfully());
+ int tableDropVersion = await(manager.execute(dropTableCommand(TABLE_NAME)));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = manager.schema(2);
+ CatalogSchemaDescriptor schema = manager.schema(tableDropVersion - 1);
CatalogTableDescriptor table1 = schema.table(TABLE_NAME);
CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2);
CatalogIndexDescriptor pkIndex1 = schema.aliveIndex(pkIndexName(TABLE_NAME));
@@ -386,7 +391,7 @@
assertSame(pkIndex2, manager.index(pkIndex2.id(), beforeDropTimestamp));
// Validate actual catalog
- schema = manager.schema(3);
+ schema = manager.schema(tableDropVersion);
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
@@ -546,7 +551,7 @@
public void testAlterColumnDefault() {
assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
- int schemaVer = 1;
+ int schemaVer = manager.latestCatalogVersion();
assertNotNull(manager.schema(schemaVer));
assertNull(manager.schema(schemaVer + 1));
@@ -588,7 +593,7 @@
public void testAlterColumnNotNull() {
assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
- int schemaVer = 1;
+ int schemaVer = manager.latestCatalogVersion();
assertNotNull(manager.schema(schemaVer));
assertNull(manager.schema(schemaVer + 1));
@@ -631,7 +636,7 @@
assertThat(manager.execute(simpleTable(TABLE_NAME, List.of(pkCol, col1, col2))), willCompleteSuccessfully());
- int schemaVer = 1;
+ int schemaVer = manager.latestCatalogVersion();
assertNotNull(manager.schema(schemaVer));
assertNull(manager.schema(schemaVer + 1));
@@ -699,7 +704,7 @@
simpleTable(TABLE_NAME, List.of(pkCol, colWithPrecision))), willCompleteSuccessfully()
);
- int schemaVer = 1;
+ int schemaVer = manager.latestCatalogVersion();
assertNotNull(manager.schema(schemaVer));
assertNull(manager.schema(schemaVer + 1));
@@ -735,7 +740,7 @@
assertThat(manager.execute(simpleTable(TABLE_NAME, List.of(pkCol, col))), willCompleteSuccessfully());
- int schemaVer = 1;
+ int schemaVer = manager.latestCatalogVersion();
assertNotNull(manager.schema(schemaVer));
assertNull(manager.schema(schemaVer + 1));
@@ -805,7 +810,7 @@
willCompleteSuccessfully()
);
- int schemaVer = 1;
+ int schemaVer = manager.latestCatalogVersion();
assertNotNull(manager.schema(schemaVer));
assertNull(manager.schema(schemaVer + 1));
@@ -849,7 +854,7 @@
assertThat(manager.execute(simpleTable(TABLE_NAME, List.of(pkCol, col))), willCompleteSuccessfully());
- int schemaVer = 1;
+ int schemaVer = manager.latestCatalogVersion();
assertNotNull(manager.schema(schemaVer));
assertNull(manager.schema(schemaVer + 1));
@@ -900,7 +905,7 @@
assertThat(manager.execute(simpleTable(TABLE_NAME, tableColumns)), willCompleteSuccessfully());
- int schemaVer = 1;
+ int schemaVer = manager.latestCatalogVersion();
assertNotNull(manager.schema(schemaVer));
assertNull(manager.schema(schemaVer + 1));
@@ -940,7 +945,7 @@
public void testAlterColumnMultipleChanges() {
assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
- int schemaVer = 1;
+ int schemaVer = manager.latestCatalogVersion();
assertNotNull(manager.schema(schemaVer));
assertNull(manager.schema(schemaVer + 1));
@@ -974,13 +979,13 @@
@Test
public void testAlterColumnForNonExistingTableRejected() {
- assertNotNull(manager.schema(0));
- assertNull(manager.schema(1));
+ int versionBefore = manager.latestCatalogVersion();
assertThat(changeColumn(TABLE_NAME, "ID", null, null, null), willThrowFast(TableNotFoundValidationException.class));
- assertNotNull(manager.schema(0));
- assertNull(manager.schema(1));
+ int versionAfter = manager.latestCatalogVersion();
+
+ assertEquals(versionBefore, versionAfter);
}
@Test
@@ -1028,19 +1033,19 @@
@Test
public void testCreateHashIndex() {
- assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
+ int tableCreationVersion = await(manager.execute(simpleTable(TABLE_NAME)));
- assertThat(manager.execute(createHashIndexCommand(INDEX_NAME, List.of("VAL", "ID"))), willCompleteSuccessfully());
+ int indexCreationVersion = await(manager.execute(createHashIndexCommand(INDEX_NAME, List.of("VAL", "ID"))));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = manager.schema(1);
+ CatalogSchemaDescriptor schema = manager.schema(tableCreationVersion);
assertNotNull(schema);
assertNull(schema.aliveIndex(INDEX_NAME));
assertNull(manager.aliveIndex(INDEX_NAME, 123L));
// Validate actual catalog
- schema = manager.schema(2);
+ schema = manager.schema(indexCreationVersion);
CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) schema.aliveIndex(INDEX_NAME);
@@ -1068,10 +1073,10 @@
List.of(DESC_NULLS_FIRST, ASC_NULLS_LAST)
);
- assertThat(manager.execute(command), willCompleteSuccessfully());
+ int indexCreationVersion = await(manager.execute(command));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = manager.schema(1);
+ CatalogSchemaDescriptor schema = manager.schema(indexCreationVersion - 1);
assertNotNull(schema);
assertNull(schema.aliveIndex(INDEX_NAME));
@@ -1079,7 +1084,7 @@
assertNull(manager.index(4, 123L));
// Validate actual catalog
- schema = manager.schema(2);
+ schema = manager.schema(indexCreationVersion);
CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor) schema.aliveIndex(INDEX_NAME);
@@ -1107,10 +1112,13 @@
doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture());
when(updateLogMock.startAsync()).thenReturn(nullCompletedFuture());
+ when(updateLogMock.append(any())).thenReturn(CompletableFuture.completedFuture(true));
CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockService);
assertThat(manager.startAsync(), willCompleteSuccessfully());
+ reset(updateLogMock);
+
when(updateLogMock.append(any())).thenAnswer(invocation -> {
// here we emulate concurrent updates. First of all, we return a future completed with "false"
// as if someone has concurrently appended an update. Besides, in order to unblock manager and allow to
@@ -1128,7 +1136,16 @@
return falseCompletedFuture();
});
- CompletableFuture<?> createTableFut = manager.execute(simpleTable("T"));
+ CompletableFuture<?> createTableFut = manager.execute(List.of(
+ CreateZoneCommand.builder()
+ .zoneName("TEST_ZONE")
+ .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build()))
+ .build(),
+ AlterZoneSetDefaultCatalogCommand.builder()
+ .zoneName("TEST_ZONE")
+ .build(),
+ simpleTable("T")
+ ));
assertThat(createTableFut, willThrow(IgniteInternalException.class, "Max retry limit exceeded"));
@@ -1138,84 +1155,70 @@
@Test
public void catalogActivationTime() {
- long delayDuration = TimeUnit.DAYS.toMillis(365);
+ delayDuration.set(TimeUnit.DAYS.toMillis(365));
+ reset(updateLog, clockWaiter);
- CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, clockService, delayDuration, 0);
+ CompletableFuture<Integer> createTableFuture = manager.execute(simpleTable(TABLE_NAME));
- assertThat(manager.startAsync(), willCompleteSuccessfully());
+ assertFalse(createTableFuture.isDone());
- try {
- CompletableFuture<?> createTableFuture = manager.execute(simpleTable(TABLE_NAME));
+ verify(updateLog).append(any());
+ // TODO IGNITE-19400: recheck createTable future completion guarantees
- assertFalse(createTableFuture.isDone());
+ // This waits till the new Catalog version lands in the internal structures.
+ verify(clockWaiter, timeout(10_000)).waitFor(any());
- verify(updateLog).append(any());
- // TODO IGNITE-19400: recheck createTable future completion guarantees
+ int latestVersion = manager.latestCatalogVersion();
- // This waits till the new Catalog version lands in the internal structures.
- verify(clockWaiter, timeout(10_000)).waitFor(any());
+ assertSame(manager.schema(latestVersion - 1), manager.activeSchema(clock.nowLong()));
+ assertNull(manager.table(TABLE_NAME, clock.nowLong()));
- assertSame(manager.schema(0), manager.activeSchema(clock.nowLong()));
- assertNull(manager.table(TABLE_NAME, clock.nowLong()));
+ clock.update(clock.now().addPhysicalTime(delayDuration.get()));
- clock.update(clock.now().addPhysicalTime(delayDuration));
-
- assertSame(manager.schema(1), manager.activeSchema(clock.nowLong()));
- assertNotNull(manager.table(TABLE_NAME, clock.nowLong()));
- } finally {
- assertThat(manager.stopAsync(), willCompleteSuccessfully());
- }
+ assertSame(manager.schema(latestVersion), manager.activeSchema(clock.nowLong()));
+ assertNotNull(manager.table(TABLE_NAME, clock.nowLong()));
}
@Test
public void createTableIfNotExistWaitsActivationEvenIfTableExists() throws Exception {
- long delayDuration = TimeUnit.DAYS.toMillis(365);
+ delayDuration.set(TimeUnit.DAYS.toMillis(365));
+ partitionIdleSafeTimePropagationPeriod.set(0);
+ reset(updateLog);
- int partitionIdleSafeTimePropagationPeriod = 0;
+ CatalogCommand createTableCommand = spy(simpleTable(TABLE_NAME));
- CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, clockService, delayDuration,
- partitionIdleSafeTimePropagationPeriod);
+ CompletableFuture<Integer> createTableFuture1 = manager.execute(createTableCommand);
- assertThat(manager.startAsync(), willCompleteSuccessfully());
+ assertFalse(createTableFuture1.isDone());
- try {
- CatalogCommand createTableCommand = spy(simpleTable(TABLE_NAME));
+ ArgumentCaptor<VersionedUpdate> appendCapture = ArgumentCaptor.forClass(VersionedUpdate.class);
- CompletableFuture<Integer> createTableFuture1 = manager.execute(createTableCommand);
+ verify(updateLog).append(appendCapture.capture());
- assertFalse(createTableFuture1.isDone());
+ int catalogVerAfterTableCreate = appendCapture.getValue().version();
- ArgumentCaptor<VersionedUpdate> appendCapture = ArgumentCaptor.forClass(VersionedUpdate.class);
+ CompletableFuture<Integer> createTableFuture2 = manager.execute(createTableCommand);
- verify(updateLog).append(appendCapture.capture());
+ verify(createTableCommand, times(2)).get(any());
- int catalogVerAfterTableCreate = appendCapture.getValue().version();
+ assertFalse(createTableFuture2.isDone());
- CompletableFuture<Integer> createTableFuture2 = manager.execute(createTableCommand);
+ verify(clockWaiter, timeout(10_000).times(2)).waitFor(any());
- verify(createTableCommand, times(2)).get(any());
+ Catalog catalog0 = manager.catalog(manager.latestCatalogVersion());
- assertFalse(createTableFuture2.isDone());
+ assertNotNull(catalog0);
- verify(clockWaiter, timeout(10_000).times(2)).waitFor(any());
+ HybridTimestamp activationSkew = CatalogUtils.clusterWideEnsuredActivationTsSafeForRoReads(
+ catalog0,
+ () -> partitionIdleSafeTimePropagationPeriod.get(), clockService.maxClockSkewMillis());
- Catalog catalog0 = manager.catalog(manager.latestCatalogVersion());
+ clock.update(activationSkew);
- assertNotNull(catalog0);
+ assertTrue(waitForCondition(createTableFuture1::isDone, 2_000));
+ assertTrue(waitForCondition(createTableFuture2::isDone, 2_000));
- HybridTimestamp activationSkew = CatalogUtils.clusterWideEnsuredActivationTsSafeForRoReads(
- catalog0,
- () -> partitionIdleSafeTimePropagationPeriod, clockService.maxClockSkewMillis());
-
- clock.update(activationSkew);
-
- assertTrue(waitForCondition(createTableFuture1::isDone, 2_000));
- assertTrue(waitForCondition(createTableFuture2::isDone, 2_000));
-
- assertSame(manager.schema(catalogVerAfterTableCreate), manager.activeSchema(clock.nowLong()));
- } finally {
- assertThat(manager.stopAsync(), willCompleteSuccessfully());
- }
+ assertSame(manager.schema(catalogVerAfterTableCreate), manager.activeSchema(clock.nowLong()));
}
@Test
@@ -1223,6 +1226,7 @@
UpdateLog updateLogMock = mock(UpdateLog.class);
when(updateLogMock.startAsync()).thenReturn(nullCompletedFuture());
when(updateLogMock.stopAsync()).thenReturn(nullCompletedFuture());
+ when(updateLogMock.append(any())).thenReturn(CompletableFuture.completedFuture(true));
CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockService);
@@ -1723,68 +1727,50 @@
@Test
public void userFutureCompletesAfterClusterWideActivationHappens() {
- long delayDuration = TimeUnit.DAYS.toMillis(365);
+ delayDuration.set(TimeUnit.DAYS.toMillis(365));
+ reset(clockWaiter);
HybridTimestamp startTs = clock.now();
- CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, clockService, delayDuration, 0);
+ CompletableFuture<?> createTableFuture = manager.execute(simpleTable(TABLE_NAME));
- assertThat(manager.startAsync(), willCompleteSuccessfully());
+ assertFalse(createTableFuture.isDone());
- try {
- CompletableFuture<?> createTableFuture = manager.execute(simpleTable(TABLE_NAME));
+ ArgumentCaptor<HybridTimestamp> tsCaptor = ArgumentCaptor.forClass(HybridTimestamp.class);
- assertFalse(createTableFuture.isDone());
-
- ArgumentCaptor<HybridTimestamp> tsCaptor = ArgumentCaptor.forClass(HybridTimestamp.class);
-
- verify(clockWaiter, timeout(10_000)).waitFor(tsCaptor.capture());
- HybridTimestamp userWaitTs = tsCaptor.getValue();
- assertThat(
- userWaitTs.getPhysical() - startTs.getPhysical(),
- greaterThanOrEqualTo(delayDuration + clockService.maxClockSkewMillis())
- );
- } finally {
- assertThat(manager.stopAsync(), willCompleteSuccessfully());
- }
+ verify(clockWaiter, timeout(10_000)).waitFor(tsCaptor.capture());
+ HybridTimestamp userWaitTs = tsCaptor.getValue();
+ assertThat(
+ userWaitTs.getPhysical() - startTs.getPhysical(),
+ greaterThanOrEqualTo(delayDuration.get() + clockService.maxClockSkewMillis())
+ );
}
// TODO: remove after IGNITE-20378 is implemented.
@Test
public void userFutureCompletesAfterClusterWideActivationWithAdditionalIdleSafeTimePeriodHappens() {
- long delayDuration = TimeUnit.DAYS.toMillis(365);
- long partitionIdleSafeTimePropagationPeriod = TimeUnit.DAYS.toDays(365);
+ delayDuration.set(TimeUnit.DAYS.toMillis(365));
+ partitionIdleSafeTimePropagationPeriod.set(TimeUnit.DAYS.toDays(365));
+
+ reset(clockWaiter);
HybridTimestamp startTs = clock.now();
- CatalogManagerImpl manager = new CatalogManagerImpl(
- updateLog,
- clockService,
- delayDuration,
- partitionIdleSafeTimePropagationPeriod
+ CompletableFuture<?> createTableFuture = manager.execute(simpleTable(TABLE_NAME));
+
+ assertFalse(createTableFuture.isDone());
+
+ ArgumentCaptor<HybridTimestamp> tsCaptor = ArgumentCaptor.forClass(HybridTimestamp.class);
+
+ verify(clockWaiter, timeout(10_000)).waitFor(tsCaptor.capture());
+ HybridTimestamp userWaitTs = tsCaptor.getValue();
+ assertThat(
+ userWaitTs.getPhysical() - startTs.getPhysical(),
+ greaterThanOrEqualTo(
+ delayDuration.get() + clockService.maxClockSkewMillis()
+ + partitionIdleSafeTimePropagationPeriod.get() + clockService.maxClockSkewMillis()
+ )
);
-
- assertThat(manager.startAsync(), willCompleteSuccessfully());
-
- try {
- CompletableFuture<?> createTableFuture = manager.execute(simpleTable(TABLE_NAME));
-
- assertFalse(createTableFuture.isDone());
-
- ArgumentCaptor<HybridTimestamp> tsCaptor = ArgumentCaptor.forClass(HybridTimestamp.class);
-
- verify(clockWaiter, timeout(10_000)).waitFor(tsCaptor.capture());
- HybridTimestamp userWaitTs = tsCaptor.getValue();
- assertThat(
- userWaitTs.getPhysical() - startTs.getPhysical(),
- greaterThanOrEqualTo(
- delayDuration + clockService.maxClockSkewMillis()
- + partitionIdleSafeTimePropagationPeriod + clockService.maxClockSkewMillis()
- )
- );
- } finally {
- assertThat(manager.stopAsync(), willCompleteSuccessfully());
- }
}
@Test
@@ -1801,12 +1787,12 @@
@Test
void testGetTableByIdAndCatalogVersion() {
- assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
+ int tableCreationVersion = await(manager.execute(simpleTable(TABLE_NAME)));
CatalogTableDescriptor table = manager.table(TABLE_NAME, clock.nowLong());
- assertNull(manager.table(table.id(), 0));
- assertSame(table, manager.table(table.id(), 1));
+ assertNull(manager.table(table.id(), tableCreationVersion - 1));
+ assertSame(table, manager.table(table.id(), tableCreationVersion));
}
@Test
@@ -1894,33 +1880,49 @@
@Test
void testLatestCatalogVersion() {
- assertEquals(0, manager.latestCatalogVersion());
-
- assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
assertEquals(1, manager.latestCatalogVersion());
- assertThat(manager.execute(simpleIndex()), willCompleteSuccessfully());
+ assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
assertEquals(2, manager.latestCatalogVersion());
+
+ assertThat(manager.execute(simpleIndex()), willCompleteSuccessfully());
+ assertEquals(3, manager.latestCatalogVersion());
}
@Test
void testTables() {
+ int initialVersion = manager.latestCatalogVersion();
+
assertThat(manager.execute(simpleTable(TABLE_NAME + 0)), willCompleteSuccessfully());
assertThat(manager.execute(simpleTable(TABLE_NAME + 1)), willCompleteSuccessfully());
- assertThat(manager.tables(0), empty());
- assertThat(manager.tables(1), hasItems(table(1, TABLE_NAME + 0)));
- assertThat(manager.tables(2), hasItems(table(2, TABLE_NAME + 0), table(2, TABLE_NAME + 1)));
+ assertThat(manager.tables(initialVersion), empty());
+ assertThat(
+ manager.tables(initialVersion + 1),
+ hasItems(table(initialVersion + 1, TABLE_NAME + 0))
+ );
+ assertThat(
+ manager.tables(initialVersion + 2),
+ hasItems(table(initialVersion + 2, TABLE_NAME + 0), table(initialVersion + 2, TABLE_NAME + 1))
+ );
}
@Test
void testIndexes() {
+ int initialVersion = manager.latestCatalogVersion();
+
assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
assertThat(manager.execute(simpleIndex()), willCompleteSuccessfully());
- assertThat(manager.indexes(0), empty());
- assertThat(manager.indexes(1), hasItems(index(1, pkIndexName(TABLE_NAME))));
- assertThat(manager.indexes(2), hasItems(index(2, pkIndexName(TABLE_NAME)), index(2, INDEX_NAME)));
+ assertThat(manager.indexes(initialVersion), empty());
+ assertThat(
+ manager.indexes(initialVersion + 1),
+ hasItems(index(initialVersion + 1, pkIndexName(TABLE_NAME)))
+ );
+ assertThat(
+ manager.indexes(initialVersion + 2),
+ hasItems(index(initialVersion + 2, pkIndexName(TABLE_NAME)), index(initialVersion + 2, INDEX_NAME))
+ );
}
@Test
@@ -1987,15 +1989,14 @@
assertEquals(expectedCreationToken, table.creationToken());
- CompletableFuture<?> future = manager.execute(
+ int tableCreationVersion = await(manager.execute(
AlterTableAlterColumnCommand.builder()
.schemaName(SCHEMA_NAME)
.tableName(TABLE_NAME)
.columnName("val1")
.type(INT64)
.build()
- );
- assertThat(future, willCompleteSuccessfully());
+ ));
table = manager.table(TABLE_NAME, Long.MAX_VALUE);
@@ -2003,7 +2004,7 @@
assertEquals(expectedCreationToken, table.creationToken());
- table = manager.table(tableId(TABLE_NAME), 1);
+ table = manager.table(tableId(TABLE_NAME), tableCreationVersion);
assertEquals(expectedCreationToken, table.creationToken());
}
@@ -2732,19 +2733,19 @@
@Test
public void testEmptyCatalogCompaction() {
- assertEquals(0, manager.latestCatalogVersion());
+ assertEquals(1, manager.latestCatalogVersion());
long timestamp = clock.nowLong();
assertThat(manager.compactCatalog(timestamp), willBe(false));
assertEquals(0, manager.earliestCatalogVersion());
- assertEquals(0, manager.latestCatalogVersion());
+ assertEquals(1, manager.latestCatalogVersion());
- assertNotNull(manager.catalog(0));
+ assertNotNull(manager.catalog(1));
assertEquals(0, manager.activeCatalogVersion(0));
- assertEquals(0, manager.activeCatalogVersion(timestamp));
+ assertEquals(1, manager.activeCatalogVersion(timestamp));
}
private CompletableFuture<?> changeColumn(
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractChangeIndexStatusCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractChangeIndexStatusCommandValidationTest.java
index 3ef5abc..c40cf4d 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractChangeIndexStatusCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractChangeIndexStatusCommandValidationTest.java
@@ -109,7 +109,7 @@
@Test
void exceptionIsThrownIfIndexWithGivenIdNotFound() {
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = createCommand(1);
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java
index 7cbac40..46208e1 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java
@@ -84,6 +84,24 @@
}
static Catalog emptyCatalog() {
+ return new Catalog(
+ 0,
+ 0L,
+ 1,
+ List.of(),
+ List.of(new CatalogSchemaDescriptor(
+ 0,
+ SCHEMA_NAME,
+ new CatalogTableDescriptor[0],
+ new CatalogIndexDescriptor[0],
+ new CatalogSystemViewDescriptor[0],
+ INITIAL_CAUSALITY_TOKEN
+ )),
+ null
+ );
+ }
+
+ static Catalog catalogWithDefaultZone() {
return catalog(1, new CatalogTableDescriptor[0], new CatalogIndexDescriptor[0], new CatalogSystemViewDescriptor[0]);
}
@@ -176,7 +194,7 @@
}
static Catalog catalog(CatalogCommand... commandsToApply) {
- return applyCommandsToCatalog(emptyCatalog(), commandsToApply);
+ return applyCommandsToCatalog(catalogWithDefaultZone(), commandsToApply);
}
static Catalog catalog(
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommandValidationTest.java
index e908737..6ce129c 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommandValidationTest.java
@@ -115,7 +115,7 @@
void exceptionIsThrownIfSchemaNotExists() {
AlterTableAddColumnCommandBuilder builder = AlterTableAddColumnCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = fillProperties(builder).schemaName(SCHEMA_NAME + "_UNK").build();
@@ -130,7 +130,7 @@
void exceptionIsThrownIfTableNotExists() {
AlterTableAddColumnCommandBuilder builder = AlterTableAddColumnCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = fillProperties(builder).tableName("TEST").build();
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommandValidationTest.java
index f139f8e..3f07fd1 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommandValidationTest.java
@@ -95,7 +95,7 @@
void exceptionIsThrownIfSchemaNotExists() {
AlterTableAlterColumnCommandBuilder builder = AlterTableAlterColumnCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = builder
.schemaName(SCHEMA_NAME + "_UNK")
@@ -115,7 +115,7 @@
void exceptionIsThrownIfTableWithGivenNameNotFound() {
AlterTableAlterColumnCommandBuilder builder = AlterTableAlterColumnCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = builder
.schemaName(SCHEMA_NAME)
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommandValidationTest.java
index c41c901..d363cff 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommandValidationTest.java
@@ -120,7 +120,7 @@
void exceptionIsThrownIfSchemaNotExists() {
AlterTableDropColumnCommandBuilder builder = AlterTableDropColumnCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = fillProperties(builder).schemaName(SCHEMA_NAME + "_UNK").build();
@@ -135,7 +135,7 @@
void exceptionIsThrownIfTableNotExists() {
AlterTableDropColumnCommandBuilder builder = AlterTableDropColumnCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = fillProperties(builder).tableName("TEST").build();
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterZoneCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterZoneCommandValidationTest.java
index 7509229..1877a9a 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterZoneCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AlterZoneCommandValidationTest.java
@@ -55,7 +55,7 @@
CatalogCommand cmd = AlterZoneCommand.builder().zoneName("not_existing_zone").build();
assertThrows(
CatalogValidationException.class,
- () -> cmd.get(emptyCatalog()),
+ () -> cmd.get(catalogWithDefaultZone()),
"Distribution zone with name 'not_existing_zone' not found"
);
}
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
index 9f0a0b6..a1e24c4 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
@@ -19,7 +19,7 @@
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
-import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.createCatalogManagerWithTestUpdateLog;
import static org.apache.ignite.internal.catalog.CatalogTestUtils.index;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.clusterWideEnsuredActivationTimestamp;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.collectIndexes;
@@ -72,10 +72,12 @@
private final HybridClock clock = new HybridClockImpl();
- private final CatalogManager catalogManager = createTestCatalogManager("test", clock);
+ private CatalogManager catalogManager;
@BeforeEach
void setUp() {
+ catalogManager = createCatalogManagerWithTestUpdateLog("test", clock);
+
assertThat(catalogManager.startAsync(), willCompleteSuccessfully());
}
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateAbstractIndexCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateAbstractIndexCommandValidationTest.java
index 17c5538..1e0b54f 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateAbstractIndexCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateAbstractIndexCommandValidationTest.java
@@ -128,7 +128,7 @@
@Test
void exceptionIsThrownIfSchemaNotExists() {
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = prefilledBuilder().schemaName(SCHEMA_NAME + "_UNK").build();
@@ -141,7 +141,7 @@
@Test
void exceptionIsThrownIfTableNotExists() {
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = prefilledBuilder().build();
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateTableCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateTableCommandValidationTest.java
index 0a852b4..9f48989 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateTableCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateTableCommandValidationTest.java
@@ -301,6 +301,21 @@
);
}
+ @Test
+ void exceptionIsThrownIfZoneNeitherSpecifiedExplicitlyNorDefaultWasSet() {
+ CreateTableCommandBuilder builder = CreateTableCommand.builder();
+
+ Catalog catalog = emptyCatalog();
+
+ CatalogCommand command = fillProperties(builder).zone(null).build();
+
+ assertThrowsWithCause(
+ () -> command.get(catalog),
+ CatalogValidationException.class,
+ "The zone is not specified. Please specify zone explicitly or set default one."
+ );
+ }
+
private static CreateTableCommandBuilder fillProperties(CreateTableCommandBuilder builder) {
return builder
.schemaName(SCHEMA_NAME)
@@ -320,7 +335,7 @@
void exceptionIsThrownIfSchemaNotExists() {
CreateTableCommandBuilder builder = CreateTableCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = fillProperties(builder).schemaName(SCHEMA_NAME + "_UNK").build();
@@ -349,7 +364,7 @@
void exceptionIsThrownIfZoneNotExists() {
CreateTableCommandBuilder builder = CreateTableCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = fillProperties(builder).zone(ZONE_NAME + "_UNK").build();
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandValidationTest.java
index d40ce6d..1d8cc1b 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandValidationTest.java
@@ -67,7 +67,7 @@
@Test
void exceptionIsThrownIfSchemaNotExists() {
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = DropIndexCommand.builder()
.schemaName(SCHEMA_NAME + "_UNK")
@@ -83,7 +83,7 @@
@Test
void exceptionIsThrownIfIndexWithGivenNameNotFound() {
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = DropIndexCommand.builder()
.schemaName(SCHEMA_NAME)
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropTableCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropTableCommandValidationTest.java
index 8dc7dd4..9e0f2fd 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropTableCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropTableCommandValidationTest.java
@@ -66,7 +66,7 @@
void exceptionIsThrownIfSchemaNotExists() {
DropTableCommandBuilder builder = DropTableCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = builder
.schemaName(SCHEMA_NAME + "_UNK")
@@ -84,7 +84,7 @@
void exceptionIsThrownIfTableWithGivenNameNotFound() {
DropTableCommandBuilder builder = DropTableCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = builder
.schemaName(SCHEMA_NAME)
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropZoneCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropZoneCommandValidationTest.java
index 2a09397..aea4980 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropZoneCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropZoneCommandValidationTest.java
@@ -48,7 +48,7 @@
@Test
void rejectToDropDefaultZone() {
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand cmd = DropZoneCommand.builder()
.zoneName(catalog.defaultZone().name())
@@ -81,7 +81,7 @@
void exceptionIsThrownIfZoneWithGivenNameNotFound() {
DropZoneCommandBuilder builder = DropZoneCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = builder
.zoneName("some_zone")
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RenameTableCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RenameTableCommandValidationTest.java
index ee13d1f..9bcd0da 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RenameTableCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RenameTableCommandValidationTest.java
@@ -79,7 +79,7 @@
@Test
void exceptionIsThrownIfSchemaDoesNotExist() {
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = RenameTableCommand.builder()
.schemaName("TEST")
@@ -96,7 +96,7 @@
@Test
void exceptionIsThrownIfTableWithGivenNameNotFound() {
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = RenameTableCommand.builder()
.schemaName(SCHEMA_NAME)
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RenameZoneCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RenameZoneCommandValidationTest.java
index fcb5620..cecbc5d 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RenameZoneCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RenameZoneCommandValidationTest.java
@@ -50,7 +50,7 @@
void exceptionIsThrownIfZoneWithGivenNameNotFound() {
RenameZoneCommandBuilder builder = RenameZoneCommand.builder();
- Catalog catalog = emptyCatalog();
+ Catalog catalog = catalogWithDefaultZone();
CatalogCommand command = builder
.zoneName("some_zone")
diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
index 8c5690d..16291ec 100644
--- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
+++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.catalog;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.awaitDefaultZoneCreation;
import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParamsBuilder;
import static org.apache.ignite.internal.catalog.commands.DefaultValue.constant;
@@ -84,10 +85,13 @@
protected CatalogManagerImpl manager;
protected AtomicLong delayDuration = new AtomicLong();
+ protected AtomicLong partitionIdleSafeTimePropagationPeriod = new AtomicLong();
+
@BeforeEach
void setUp() {
delayDuration.set(CatalogManagerImpl.DEFAULT_DELAY_DURATION);
+ partitionIdleSafeTimePropagationPeriod.set(CatalogManagerImpl.DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD);
metastore = StandaloneMetaStorageManager.create(new SimpleInMemoryKeyValueStorage(NODE_NAME));
@@ -100,12 +104,14 @@
updateLog,
clockService,
delayDuration::get,
- () -> CatalogManagerImpl.DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD
+ partitionIdleSafeTimePropagationPeriod::get
);
assertThat(startAsync(metastore, clockWaiter, manager), willCompleteSuccessfully());
assertThat("Watches were not deployed", metastore.deployWatches(), willCompleteSuccessfully());
+
+ awaitDefaultZoneCreation(manager);
}
@AfterEach
diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
index fb68179..60525e9 100644
--- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
+++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
@@ -20,10 +20,13 @@
import static java.util.concurrent.CompletableFuture.allOf;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import java.util.Set;
@@ -486,4 +489,28 @@
return delegate().handle(update, metaStorageUpdateTimestamp, causalityToken);
}
}
+
+ /**
+ * Waits till default zone appears in latest version of catalog.
+ *
+ * @param manager Catalog manager to monitor.
+ */
+ public static void awaitDefaultZoneCreation(CatalogManager manager) {
+ try {
+ int[] versionHolder = new int[1];
+
+ assertTrue(waitForCondition(() -> {
+ int latestVersion = manager.latestCatalogVersion();
+
+ versionHolder[0] = latestVersion;
+
+ return manager.catalog(latestVersion).defaultZone() != null;
+ }, 5_000));
+
+ // additionally we have to wait till all listeners complete handling of event
+ await(manager.catalogReadyFuture(versionHolder[0]));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 7666bd3..d19a356 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -802,7 +802,7 @@
* @return A result of the stage.
*/
@SuppressWarnings("UnusedReturnValue")
- public static <T> @Nullable T await(CompletionStage<T> stage, long timeout, TimeUnit unit) {
+ public static <T> T await(CompletionStage<T> stage, long timeout, TimeUnit unit) {
try {
return stage.toCompletableFuture().get(timeout, unit);
} catch (Throwable e) {
@@ -821,9 +821,7 @@
sneakyThrow(original);
}
- assert false;
-
- return null;
+ throw new AssertionError("Should not get here");
}
/**
@@ -834,7 +832,7 @@
* @return A result of the stage.
*/
@SuppressWarnings("UnusedReturnValue")
- public static <T> @Nullable T await(CompletionStage<T> stage) {
+ public static <T> T await(CompletionStage<T> stage) {
return await(stage, TIMEOUT_SEC, TimeUnit.SECONDS);
}
diff --git a/modules/distribution-zones/build.gradle b/modules/distribution-zones/build.gradle
index 42c2387..0c520a0 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -55,6 +55,7 @@
testImplementation project(':ignite-system-view-api')
testImplementation(testFixtures(project(':ignite-core')))
+ testImplementation(testFixtures(project(':ignite-catalog')))
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-metastorage')))
testImplementation(testFixtures(project(':ignite-cluster-management')))
@@ -92,6 +93,7 @@
integrationTestImplementation project(':ignite-security')
integrationTestImplementation project(':ignite-system-view-api')
integrationTestImplementation project(':ignite-failure-handler')
+ integrationTestImplementation testFixtures(project(':ignite-catalog'))
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
integrationTestImplementation testFixtures(project(':ignite-metastorage'))
diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index e724e98..93b24bb 100644
--- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -79,6 +79,7 @@
import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -522,6 +523,8 @@
metastore = findComponent(node.startedComponents(), MetaStorageManager.class);
+ awaitDefaultZoneCreation(node);
+
startGlobalStateUpdateBlocking = true;
startScaleUpBlocking = true;
@@ -802,6 +805,8 @@
String zoneName;
if (useDefaultZone) {
+ awaitDefaultZoneCreation(node);
+
CatalogZoneDescriptor defaultZone = getDefaultZone(getCatalogManager(node), node.clock().nowLong());
zoneName = defaultZone.name();
@@ -927,4 +932,12 @@
return scaleDownLatch;
}
+
+ private static void awaitDefaultZoneCreation(PartialNode node) {
+ CatalogManager manager = findComponent(node.startedComponents(), CatalogManager.class);
+
+ assert manager != null;
+
+ CatalogTestUtils.awaitDefaultZoneCreation(manager);
+ }
}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
index 2c18436..f726056 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
@@ -39,6 +39,7 @@
import java.util.function.LongFunction;
import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
@@ -205,6 +206,8 @@
}
protected CatalogZoneDescriptor getDefaultZone() {
+ CatalogTestUtils.awaitDefaultZoneCreation(catalogManager);
+
return DistributionZonesTestUtil.getDefaultZone(catalogManager, clock.nowLong());
}
}
diff --git a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 415b583..120fa01 100644
--- a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++ b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -577,7 +577,7 @@
Objects.requireNonNull(catalog);
- return catalog.defaultZone();
+ return Objects.requireNonNull(catalog.defaultZone());
}
/**
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskControllerTest.java
index df35427..b119dcd 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskControllerTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.createCatalogManagerWithTestUpdateLog;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.LOCAL_NODE;
@@ -39,7 +40,6 @@
import java.util.List;
import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
@@ -67,12 +67,12 @@
public class ChangeIndexStatusTaskControllerTest extends BaseIgniteAbstractTest {
private final HybridClock clock = new HybridClockImpl();
- private final CatalogManager catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock);
-
private final TestPlacementDriver placementDriver = new TestPlacementDriver();
private final ClusterService clusterService = createClusterService();
+ private CatalogManager catalogManager;
+
@Mock
private ChangeIndexStatusTaskScheduler changeIndexStatusTaskScheduler;
@@ -80,6 +80,8 @@
@BeforeEach
void setUp() {
+ catalogManager = createCatalogManagerWithTestUpdateLog(NODE_NAME, clock);
+
assertThat(catalogManager.startAsync(), willCompleteSuccessfully());
createTable(catalogManager, TABLE_NAME, COLUMN_NAME);
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskTest.java
index 1d7d4c6..2031b55 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskTest.java
@@ -21,6 +21,7 @@
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.awaitDefaultZoneCreation;
import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
@@ -133,6 +134,8 @@
assertThat(startAsync(clockWaiter, catalogManager), willCompleteSuccessfully());
+ awaitDefaultZoneCreation(catalogManager);
+
createTable(catalogManager, TABLE_NAME, COLUMN_NAME);
createIndex(catalogManager, TABLE_NAME, INDEX_NAME, COLUMN_NAME);
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index a577476..7684ca7 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -19,6 +19,7 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.awaitDefaultZoneCreation;
import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
@@ -101,6 +102,8 @@
willCompleteSuccessfully()
);
+ awaitDefaultZoneCreation(catalogManager);
+
Catalog catalog = catalogManager.catalog(catalogManager.activeCatalogVersion(clock.nowLong()));
assert catalog != null;
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index 10edcf1..326dae8 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -19,7 +19,7 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
-import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.createCatalogManagerWithTestUpdateLog;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME;
@@ -102,7 +102,7 @@
ClusterService clusterService = mock(ClusterService.class, invocation -> mock(TopologyService.class, invocation1 -> LOCAL_NODE));
- catalogManager = createTestCatalogManager(NODE_NAME, clock);
+ catalogManager = createCatalogManagerWithTestUpdateLog(NODE_NAME, clock);
assertThat(catalogManager.startAsync(), willCompleteSuccessfully());
indexBuildController = new IndexBuildController(
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 61a2c74..20e3438 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -20,6 +20,7 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.awaitDefaultZoneCreation;
import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME;
@@ -119,6 +120,8 @@
createAndStartComponents();
+ awaitDefaultZoneCreation(catalogManager);
+
createTable(TABLE_NAME);
}
diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
index 006d502..5a9b615 100644
--- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
+++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
@@ -155,9 +155,10 @@
CatalogManager catalogManager = CLUSTER.aliveNode().catalogManager();
int latestCatalogVersion = catalogManager.latestCatalogVersion();
Catalog catalog = Objects.requireNonNull(catalogManager.catalog(latestCatalogVersion));
+ CatalogZoneDescriptor defaultZone = catalog.defaultZone();
for (CatalogZoneDescriptor z : catalogManager.zones(latestCatalogVersion)) {
String zoneName = z.name();
- if (zoneName.equals(catalog.defaultZone().name())) {
+ if (defaultZone != null && zoneName.equals(defaultZone.name())) {
continue;
}
sql("DROP ZONE " + zoneName);
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
index 1262159..27869fc 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
@@ -31,6 +31,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.calcite.rel.type.RelDataType;
@@ -488,6 +489,6 @@
assert catalog != null;
- return catalog.defaultZone();
+ return Objects.requireNonNull(catalog.defaultZone());
}
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplierTest.java
index 976208a..0c267ac 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplierTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplierTest.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.table.distributed;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
-import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.createCatalogManagerWithTestUpdateLog;
import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -88,7 +88,7 @@
) {
String nodeName = testNodeName(testInfo, 0);
- catalogManager = createTestCatalogManager(nodeName, clock);
+ catalogManager = createCatalogManagerWithTestUpdateLog(nodeName, clock);
lowWatermark = new TestLowWatermark();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index d75985b..ac04b09 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -267,6 +267,8 @@
assertThat(catalogMetastore.deployWatches(), willCompleteSuccessfully());
+ CatalogTestUtils.awaitDefaultZoneCreation(catalogManager);
+
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
TopologyService topologyService = mock(TopologyService.class);
@@ -689,7 +691,7 @@
when(msm.invoke(any(), any(List.class), any(List.class))).thenReturn(trueCompletedFuture());
when(msm.get(any())).thenReturn(nullCompletedFuture());
- when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(1L));
+ when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(2L));
when(msm.prefixLocally(any(), anyLong())).thenReturn(CursorUtils.emptyCursor());
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableUtilsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableUtilsTest.java
index 959c20f..414dc14 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableUtilsTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableUtilsTest.java
@@ -62,7 +62,7 @@
public class TableUtilsTest extends IgniteAbstractTest {
private final HybridClock clock = new HybridClockImpl();
- private final CatalogManager catalogManager = CatalogTestUtils.createTestCatalogManager("test-node", clock);
+ private final CatalogManager catalogManager = CatalogTestUtils.createCatalogManagerWithTestUpdateLog("test-node", clock);
@BeforeEach
void setUp() {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
index ab0e4f6..4af5198 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
@@ -85,7 +85,7 @@
@BeforeEach
void setUp() {
- catalogManager = CatalogTestUtils.createTestCatalogManager("test", clock);
+ catalogManager = CatalogTestUtils.createCatalogManagerWithTestUpdateLog("test", clock);
indexChooser = new FullStateTransferIndexChooser(catalogManager, lowWatermark);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
index e78e587..60cc7b5 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
-import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.createCatalogManagerWithTestUpdateLog;
import static org.apache.ignite.internal.table.TableTestUtils.INDEX_NAME;
import static org.apache.ignite.internal.table.TableTestUtils.TABLE_NAME;
import static org.apache.ignite.internal.table.TableTestUtils.createSimpleHashIndex;
@@ -90,7 +90,7 @@
void testCollectNextRowIdToBuildIndexes() throws Exception {
HybridClock clock = new HybridClockImpl();
- CatalogManager catalogManager = createTestCatalogManager("test", clock);
+ CatalogManager catalogManager = createCatalogManagerWithTestUpdateLog("test", clock);
try {
assertThat(catalogManager.startAsync(), willCompleteSuccessfully());
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/ReplicatorUtilsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/ReplicatorUtilsTest.java
index 70bce78..bf1bc68 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/ReplicatorUtilsTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/ReplicatorUtilsTest.java
@@ -136,7 +136,7 @@
}
private void withCatalogManager(Consumer<CatalogManager> consumer) throws Exception {
- CatalogManager catalogManager = CatalogTestUtils.createTestCatalogManager("test-node", clock);
+ CatalogManager catalogManager = CatalogTestUtils.createCatalogManagerWithTestUpdateLog("test-node", clock);
assertThat(catalogManager.startAsync(), willCompleteSuccessfully());