blob: fb68179d08b17eaf1a6c903f1b15662d62925d9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.catalog;
import static java.util.concurrent.CompletableFuture.allOf;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnCommand;
import org.apache.ignite.internal.catalog.commands.AlterTableDropColumnCommand;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommand;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommandBuilder;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.ColumnParams.Builder;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommandBuilder;
import org.apache.ignite.internal.catalog.commands.DropTableCommand;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.storage.SnapshotEntry;
import org.apache.ignite.internal.catalog.storage.UpdateLog;
import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
import org.apache.ignite.internal.catalog.storage.UpdateLogEvent;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.sql.ColumnType;
import org.jetbrains.annotations.Nullable;
/** Utilities for working with the catalog in tests. */
public class CatalogTestUtils {
/**
* Creates a test implementation of {@link CatalogManager}.
*
* <p>NOTE: Uses {@link CatalogManagerImpl} under the hood and creates the internals it needs, may change in the future.
*
* @param nodeName Node name.
* @param clock Hybrid clock.
*/
public static CatalogManager createTestCatalogManager(String nodeName, HybridClock clock) {
StandaloneMetaStorageManager metastore = StandaloneMetaStorageManager.create(new SimpleInMemoryKeyValueStorage(nodeName), clock);
var clockWaiter = new ClockWaiter(nodeName, clock);
ClockService clockService = new TestClockService(clock, clockWaiter);
return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockService) {
@Override
public CompletableFuture<Void> startAsync() {
return allOf(metastore.startAsync(), clockWaiter.startAsync(), super.startAsync())
.thenCompose(unused -> metastore.deployWatches());
}
@Override
public void beforeNodeStop() {
super.beforeNodeStop();
clockWaiter.beforeNodeStop();
metastore.beforeNodeStop();
}
@Override
public CompletableFuture<Void> stopAsync() {
return IgniteUtils.stopAsync(super::stopAsync, clockWaiter::stopAsync, metastore::stopAsync);
}
};
}
/**
* Creates a test implementation of {@link CatalogManager}.
*
* <p>NOTE: Uses {@link CatalogManagerImpl} under the hood and creates the internals it needs, may change in the future.
*
* @param nodeName Node name.
* @param clockWaiter Clock waiter.
* @param clock Hybrid clock.
*/
public static CatalogManager createTestCatalogManager(String nodeName, ClockWaiter clockWaiter, HybridClock clock) {
StandaloneMetaStorageManager metastore = StandaloneMetaStorageManager.create(new SimpleInMemoryKeyValueStorage(nodeName));
return new CatalogManagerImpl(new UpdateLogImpl(metastore), new TestClockService(clock, clockWaiter)) {
@Override
public CompletableFuture<Void> startAsync() {
return allOf(metastore.startAsync(), super.startAsync()).thenCompose(unused -> metastore.deployWatches());
}
@Override
public void beforeNodeStop() {
super.beforeNodeStop();
metastore.beforeNodeStop();
}
@Override
public CompletableFuture<Void> stopAsync() {
return IgniteUtils.stopAsync(super::stopAsync, metastore::stopAsync);
}
};
}
/**
* Creates a test implementation of {@link CatalogManager}.
*
* <p>NOTE: Uses {@link CatalogManagerImpl} under the hood and creates the internals it needs, may change in the future.
*
* @param nodeName Node name.
* @param clock Hybrid clock.
* @param metastore Meta storage manager.
*/
public static CatalogManager createTestCatalogManager(String nodeName, HybridClock clock, MetaStorageManager metastore) {
var clockWaiter = new ClockWaiter(nodeName, clock);
return new CatalogManagerImpl(new UpdateLogImpl(metastore), new TestClockService(clock, clockWaiter)) {
@Override
public CompletableFuture<Void> startAsync() {
return allOf(clockWaiter.startAsync(), super.startAsync());
}
@Override
public void beforeNodeStop() {
super.beforeNodeStop();
clockWaiter.beforeNodeStop();
}
@Override
public CompletableFuture<Void> stopAsync() {
return IgniteUtils.stopAsync(super::stopAsync, clockWaiter::stopAsync);
}
};
}
/**
* Creates a test implementation of {@link CatalogManager}.
*
* <p>NOTE: Uses {@link CatalogManagerImpl} under the hood and creates the internals it needs, may change in the future.
*
* @param nodeName Node name.
* @param clock Hybrid clock.
* @param metastore Meta storage manager.
*/
public static CatalogManager createTestCatalogManagerWithInterceptor(
String nodeName,
HybridClock clock,
MetaStorageManager metastore,
UpdateHandlerInterceptor interceptor
) {
var clockWaiter = new ClockWaiter(nodeName, clock);
UpdateLogImpl updateLog = new UpdateLogImpl(metastore) {
@Override
public void registerUpdateHandler(OnUpdateHandler handler) {
interceptor.registerUpdateHandler(handler);
super.registerUpdateHandler(interceptor);
}
};
return new CatalogManagerImpl(updateLog, new TestClockService(clock, clockWaiter)) {
@Override
public CompletableFuture<Void> startAsync() {
return allOf(clockWaiter.startAsync(), super.startAsync());
}
@Override
public void beforeNodeStop() {
super.beforeNodeStop();
clockWaiter.beforeNodeStop();
}
@Override
public CompletableFuture<Void> stopAsync() {
return IgniteUtils.stopAsync(super::stopAsync, clockWaiter::stopAsync);
}
};
}
/**
* Create the same {@link CatalogManager} as for normal operations, but with {@link UpdateLog} that
* simply notifies the manager without storing any updates in metastore.
*
* <p>Particular configuration of manager pretty fast (in terms of awaiting of certain safe time) and lightweight.
* It doesn't contain any mocks from {@link org.mockito.Mockito}.
*
* @param nodeName Name of the node that is meant to own this manager. Any thread spawned by returned instance
* will have it as thread's name prefix.
* @param clock This clock is used to assign activation timestamp for incoming updates, thus make it possible
* to acquired schema that was valid at give time.
* @return An instance of {@link CatalogManager catalog manager}.
*/
public static CatalogManager createCatalogManagerWithTestUpdateLog(String nodeName, HybridClock clock) {
var clockWaiter = new ClockWaiter(nodeName, clock);
return new CatalogManagerImpl(new TestUpdateLog(clock), new TestClockService(clock, clockWaiter)) {
@Override
public CompletableFuture<Void> startAsync() {
return allOf(clockWaiter.startAsync(), super.startAsync());
}
@Override
public void beforeNodeStop() {
super.beforeNodeStop();
clockWaiter.beforeNodeStop();
}
@Override
public CompletableFuture<Void> stopAsync() {
return IgniteUtils.stopAsync(super::stopAsync, clockWaiter::stopAsync);
}
};
}
/** Default nullable behavior. */
public static final boolean DEFAULT_NULLABLE = false;
/** Append precision\scale according to type requirement. */
public static Builder initializeColumnWithDefaults(ColumnType type, Builder colBuilder) {
if (type.precisionAllowed()) {
colBuilder.precision(11);
}
if (type.scaleAllowed()) {
colBuilder.scale(0);
}
if (type.lengthAllowed()) {
colBuilder.length(1 << 5);
}
return colBuilder;
}
/** Append precision according to type requirement. */
public static void applyNecessaryPrecision(ColumnType type, Builder colBuilder) {
if (type.precisionAllowed()) {
colBuilder.precision(11);
}
}
/** Append length according to type requirement. */
public static void applyNecessaryLength(ColumnType type, Builder colBuilder) {
if (type.lengthAllowed()) {
colBuilder.length(1 << 5);
}
}
static ColumnParams columnParams(String name, ColumnType type) {
return columnParams(name, type, DEFAULT_NULLABLE);
}
static ColumnParams columnParams(String name, ColumnType type, boolean nullable) {
return columnParamsBuilder(name, type, nullable).build();
}
static ColumnParams columnParams(String name, ColumnType type, boolean nullable, int precision) {
return columnParamsBuilder(name, type, nullable, precision).build();
}
static ColumnParams columnParams(String name, ColumnType type, boolean nullable, int precision, int scale) {
return columnParamsBuilder(name, type, nullable, precision, scale).build();
}
static ColumnParams columnParams(String name, ColumnType type, int length, boolean nullable) {
return columnParamsBuilder(name, type, length, nullable).build();
}
static ColumnParams.Builder columnParamsBuilder(String name, ColumnType type) {
return columnParamsBuilder(name, type, DEFAULT_NULLABLE);
}
static ColumnParams.Builder columnParamsBuilder(String name, ColumnType type, boolean nullable) {
return ColumnParams.builder().name(name).nullable(nullable).type(type);
}
static ColumnParams.Builder columnParamsBuilder(String name, ColumnType type, boolean nullable, int precision) {
return ColumnParams.builder().name(name).nullable(nullable).type(type).precision(precision);
}
static ColumnParams.Builder columnParamsBuilder(String name, ColumnType type, boolean nullable, int precision, int scale) {
return ColumnParams.builder().name(name).nullable(nullable).type(type).precision(precision).scale(scale);
}
static ColumnParams.Builder columnParamsBuilder(String name, ColumnType type, int length, boolean nullable) {
return ColumnParams.builder().name(name).nullable(nullable).type(type).length(length);
}
static CatalogCommand dropTableCommand(String tableName) {
return DropTableCommand.builder().schemaName(DEFAULT_SCHEMA_NAME).tableName(tableName).build();
}
static CatalogCommand dropColumnParams(String tableName, String... columns) {
return AlterTableDropColumnCommand.builder().schemaName(DEFAULT_SCHEMA_NAME).tableName(tableName).columns(Set.of(columns)).build();
}
static CatalogCommand addColumnParams(String tableName, ColumnParams... columns) {
return AlterTableAddColumnCommand.builder().schemaName(DEFAULT_SCHEMA_NAME).tableName(tableName).columns(List.of(columns)).build();
}
/**
* Builder for {@link CreateZoneCommand}.
*
* @param zoneName Zone name.
* @return Builder for {@link CreateZoneCommand}.
*/
public static CreateZoneCommandBuilder createZoneBuilder(String zoneName) {
return CreateZoneCommand.builder()
.zoneName(zoneName)
.storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build()));
}
public static AlterZoneCommandBuilder alterZoneBuilder(String zoneName) {
return AlterZoneCommand.builder().zoneName(zoneName);
}
private static class TestUpdateLog implements UpdateLog {
private final HybridClock clock;
private long lastSeenVersion = 0;
private long snapshotVersion = 0;
private volatile OnUpdateHandler onUpdateHandler;
private TestUpdateLog(HybridClock clock) {
this.clock = clock;
}
@Override
public synchronized CompletableFuture<Boolean> append(VersionedUpdate update) {
if (update.version() - 1 != lastSeenVersion) {
return falseCompletedFuture();
}
lastSeenVersion = update.version();
return onUpdateHandler.handle(update, clock.now(), update.version()).thenApply(ignored -> true);
}
@Override
public synchronized CompletableFuture<Boolean> saveSnapshot(SnapshotEntry snapshotEntry) {
snapshotVersion = snapshotEntry.version();
return trueCompletedFuture();
}
@Override
public void registerUpdateHandler(OnUpdateHandler handler) {
this.onUpdateHandler = handler;
}
@Override
public CompletableFuture<Void> startAsync() throws IgniteInternalException {
if (onUpdateHandler == null) {
throw new IgniteInternalException(
Common.INTERNAL_ERR,
"Handler must be registered prior to component start"
);
}
lastSeenVersion = snapshotVersion;
return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> stopAsync() {
return nullCompletedFuture();
}
}
/**
* Searches for a table by name in the requested version of the catalog.
*
* @param catalogService Catalog service.
* @param catalogVersion Catalog version in which to find the table.
* @param tableName Table name.
*/
public static CatalogTableDescriptor table(CatalogService catalogService, int catalogVersion, String tableName) {
CatalogTableDescriptor tableDescriptor = catalogService.tables(catalogVersion).stream()
.filter(table -> tableName.equals(table.name()))
.findFirst()
.orElse(null);
assertNotNull(tableDescriptor, "catalogVersion=" + catalogVersion + ", tableName=" + tableName);
return tableDescriptor;
}
/**
* Searches for an index by name in the requested version of the catalog. Throws if the index is not found.
*
* @param catalogService Catalog service.
* @param catalogVersion Catalog version in which to find the index.
* @param indexName Index name.
* @return Index (cannot be null).
*/
public static CatalogIndexDescriptor index(CatalogService catalogService, int catalogVersion, String indexName) {
CatalogIndexDescriptor indexDescriptor = indexOrNull(catalogService,
catalogVersion, indexName);
assertNotNull(indexDescriptor, "catalogVersion=" + catalogVersion + ", indexName=" + indexName);
return indexDescriptor;
}
/**
* Searches for an index by name in the requested version of the catalog.
*
* @param catalogService Catalog service.
* @param catalogVersion Catalog version in which to find the index.
* @param indexName Index name.
* @return Index or {@code null} if not found.
*/
@Nullable
public static CatalogIndexDescriptor indexOrNull(CatalogService catalogService, int catalogVersion, String indexName) {
return catalogService.indexes(catalogVersion).stream()
.filter(index -> indexName.equals(index.name()))
.findFirst()
.orElse(null);
}
/**
* Update handler interceptor for test purposes.
*/
public abstract static class UpdateHandlerInterceptor implements OnUpdateHandler {
private OnUpdateHandler delegate;
void registerUpdateHandler(OnUpdateHandler handler) {
this.delegate = handler;
}
protected OnUpdateHandler delegate() {
return delegate;
}
}
/**
* An interceptor, which allow dropping events.
*/
public static class TestUpdateHandlerInterceptor extends UpdateHandlerInterceptor {
private volatile boolean dropEvents;
public void dropSnapshotEvents() {
this.dropEvents = true;
}
@Override
public CompletableFuture<Void> handle(UpdateLogEvent update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
if (dropEvents && update instanceof SnapshotEntry) {
return nullCompletedFuture();
}
return delegate().handle(update, metaStorageUpdateTimestamp, causalityToken);
}
}
}