blob: 4194a187f194f188dd2fc07c8ca28c26d6d97865 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.framework;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.PLANNING_THREAD_COUNT;
import static org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.PLANNING_TIMEOUT;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.ColumnParams.Builder;
import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
import org.apache.ignite.internal.catalog.commands.CreateSortedIndexCommand;
import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
import org.apache.ignite.internal.catalog.commands.DefaultValue;
import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand;
import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
import org.apache.ignite.internal.catalog.commands.TablePrimaryKey;
import org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
import org.apache.ignite.internal.sql.engine.exec.UpdatableTable;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
import org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPrunerImpl;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.IgniteTableImpl;
import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.type.BitmaskNativeType;
import org.apache.ignite.internal.type.DecimalNativeType;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypeSpec;
import org.apache.ignite.internal.type.NumberNativeType;
import org.apache.ignite.internal.type.TemporalNativeType;
import org.apache.ignite.internal.type.VarlenNativeType;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.SubscriptionUtils;
import org.apache.ignite.internal.util.TransformingIterator;
import org.apache.ignite.internal.util.subscription.TransformingPublisher;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
/**
* A collection of builders to create test objects.
*/
public class TestBuilders {
private static final AtomicInteger TABLE_ID_GEN = new AtomicInteger();
private static final IgniteLogger LOG = Loggers.forClass(TestBuilders.class);
/** Returns a builder of the test cluster object. */
public static ClusterBuilder cluster() {
return new ClusterBuilderImpl();
}
/** Returns a builder of the test table object. */
public static TableBuilder table() {
return new TableBuilderImpl();
}
/** Returns a builder of the execution context. */
public static ExecutionContextBuilder executionContext() {
return new ExecutionContextBuilderImpl();
}
/** Factory method to create a cluster service factory for cluster consisting of provided nodes. */
public static ClusterServiceFactory clusterServiceFactory(List<String> nodes) {
return new ClusterServiceFactory(nodes);
}
/**
* Factory method to create {@link ScannableTable table} instance from given data provider with
* only implemented {@link ScannableTable#scan table scan}.
*/
public static ScannableTable tableScan(DataProvider<Object[]> dataProvider) {
return new ScannableTable() {
@Override
public <RowT> Publisher<RowT> scan(
ExecutionContext<RowT> ctx,
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
@Nullable BitSet requiredColumns
) {
return new TransformingPublisher<>(
SubscriptionUtils.fromIterable(
() -> new TransformingIterator<>(
dataProvider.iterator(),
row -> project(row, requiredColumns)
)
),
rowFactory::create
);
}
@Override
public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String> columns, @Nullable RangeCondition<RowT> cond,
@Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
@Override
public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String> columns, RowT key, @Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
@Override
public <RowT> CompletableFuture<@Nullable RowT> primaryKeyLookup(ExecutionContext<RowT> ctx, InternalTransaction tx,
RowFactory<RowT> rowFactory, RowT key, @Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
};
}
/**
* Factory method to create {@link ScannableTable table} instance from given data provider with
* only implemented {@link ScannableTable#indexRangeScan index range scan}.
*/
public static ScannableTable indexRangeScan(DataProvider<Object[]> dataProvider) {
return new ScannableTable() {
@Override
public <RowT> Publisher<RowT> scan(
ExecutionContext<RowT> ctx,
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
@Nullable BitSet requiredColumns
) {
throw new UnsupportedOperationException();
}
@Override
public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String> columns, @Nullable RangeCondition<RowT> cond,
@Nullable BitSet requiredColumns) {
return new TransformingPublisher<>(
SubscriptionUtils.fromIterable(
() -> new TransformingIterator<>(
dataProvider.iterator(),
row -> project(row, requiredColumns)
)
),
rowFactory::create
);
}
@Override
public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String> columns, RowT key, @Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
@Override
public <RowT> CompletableFuture<@Nullable RowT> primaryKeyLookup(ExecutionContext<RowT> ctx, InternalTransaction tx,
RowFactory<RowT> rowFactory, RowT key, @Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
};
}
/**
* Factory method to create {@link ScannableTable table} instance from given data provider with
* only implemented {@link ScannableTable#indexLookup index lookup}.
*/
public static ScannableTable indexLookup(DataProvider<Object[]> dataProvider) {
return new ScannableTable() {
@Override
public <RowT> Publisher<RowT> scan(
ExecutionContext<RowT> ctx,
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
@Nullable BitSet requiredColumns
) {
throw new UnsupportedOperationException();
}
@Override
public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String> columns, @Nullable RangeCondition<RowT> cond,
@Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
@Override
public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String> columns, RowT key, @Nullable BitSet requiredColumns) {
return new TransformingPublisher<>(
SubscriptionUtils.fromIterable(
() -> new TransformingIterator<>(
dataProvider.iterator(),
row -> project(row, requiredColumns)
)
),
rowFactory::create
);
}
@Override
public <RowT> CompletableFuture<@Nullable RowT> primaryKeyLookup(ExecutionContext<RowT> ctx, InternalTransaction tx,
RowFactory<RowT> rowFactory, RowT key, @Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
};
}
/**
* A builder to create a test cluster object.
*
* @see TestCluster
*/
public interface ClusterBuilder {
/**
* Sets desired names for the cluster nodes.
*
* @param firstNodeName A name of the first node. There is no difference in what node should be first. This parameter was
* introduced to force user to provide at least one node name.
* @param otherNodeNames An array of rest of the names to create cluster from.
* @return {@code this} for chaining.
*/
ClusterBuilder nodes(String firstNodeName, String... otherNodeNames);
/**
* Sets desired names for the cluster nodes.
*
* @param firstNodeName A name of the first node. There is no difference in what node should be first. This parameter was
* introduced to force user to provide at least one node name.
* @param useTablePartitions If {@code true} map table partitions to whole defined nodes.
* @param otherNodeNames An array of rest of the names to create cluster from.
* @return {@code this} for chaining.
*/
public ClusterBuilder nodes(String firstNodeName, boolean useTablePartitions, String... otherNodeNames);
/**
* Creates a table builder to add to the cluster.
*
* @return An instance of table builder.
*/
ClusterTableBuilder addTable();
/**
* Adds the given system view to the cluster.
*
* @param systemView System view.
* @return {@code this} for chaining.
* @param <T> System view data type.
*/
<T> ClusterBuilder addSystemView(SystemView<T> systemView);
/**
* Builds the cluster object.
*
* @return Created cluster object.
*/
TestCluster build();
/**
* Provides implementation of table with given name local per given node.
*
* @param nodeName Name of the node given instance of table will be assigned to.
* @param tableName Name of the table given instance represents.
* @param table Actual table that will be used for read operations during execution.
* @return {@code this} for chaining.
*/
ClusterBuilder dataProvider(String nodeName, String tableName, ScannableTable table);
/**
* Registers a previously added system view (see {@link #addSystemView(SystemView)}) on the specified node.
*
* @param nodeName Name of the node the view is going to be registered at.
* @param systemViewName Name of previously registered system.
* @return {@code this} for chaining.
*/
ClusterBuilder registerSystemView(String nodeName, String systemViewName);
}
/**
* A builder to create a test table object.
*/
public interface TableBuilder extends TableBuilderBase<TableBuilder> {
/** Returns a builder of the test sorted-index object. */
SortedIndexBuilder sortedIndex();
/** Returns a builder of the test hash-index object. */
HashIndexBuilder hashIndex();
/** Sets the distribution of the table. */
TableBuilder distribution(IgniteDistribution distribution);
/** Sets the size of the table. */
TableBuilder size(int size);
/** Sets id for the table. The caller must guarantee that provided id is unique. */
TableBuilder tableId(int id);
/** Sets the number of partitions fot this table. Default value is equal to {@link CatalogUtils#DEFAULT_PARTITION_COUNT}. */
TableBuilder partitions(int num);
/**
* Builds a table.
*
* @return Created table object.
*/
IgniteTable build();
}
/**
* A builder to create a test object that representing sorted index.
*
* @see TestIndex
*/
public interface SortedIndexBuilder extends SortedIndexBuilderBase<SortedIndexBuilder>, NestedBuilder<TableBuilder> {
}
/**
* A builder to create a test object that representing hash index.
*
* @see TestIndex
*/
public interface HashIndexBuilder extends HashIndexBuilderBase<HashIndexBuilder>, NestedBuilder<TableBuilder> {
}
/**
* A builder to create a test table as nested object of the cluster.
*
* @see TestCluster
*/
public interface ClusterTableBuilder extends TableBuilderBase<ClusterTableBuilder>,
NestedBuilder<ClusterBuilder> {
/**
* Creates a sorted-index builder to add to the cluster.
*
* @return An instance of sorted-index builder.
*/
ClusterSortedIndexBuilder addSortedIndex();
/**
* Creates a hash-index builder to add to the cluster.
*
* @return An instance of hash builder.
*/
ClusterHashIndexBuilder addHashIndex();
}
/**
* A builder to create a test object, which represents sorted index, as nested object of the cluster.
*
* @see TestIndex
* @see TestCluster
*/
public interface ClusterSortedIndexBuilder extends SortedIndexBuilderBase<ClusterSortedIndexBuilder>,
NestedBuilder<ClusterTableBuilder> {
}
/**
* A builder to create a test object, which represents hash index, as nested object of the cluster.
*
* @see TestIndex
* @see TestCluster
*/
public interface ClusterHashIndexBuilder extends HashIndexBuilderBase<ClusterHashIndexBuilder>,
NestedBuilder<ClusterTableBuilder> {
}
/**
* A builder to create an execution context.
*
* @see ExecutionContext
*/
public interface ExecutionContextBuilder {
/** Sets the identifier of the query. */
ExecutionContextBuilder queryId(UUID queryId);
/** Sets the description of the fragment this context will be created for. */
ExecutionContextBuilder fragment(FragmentDescription fragmentDescription);
/** Sets the query task executor. */
ExecutionContextBuilder executor(QueryTaskExecutor executor);
/** Sets the node this fragment will be executed on. */
ExecutionContextBuilder localNode(ClusterNode node);
/** Sets the dynamic parameters this fragment will be executed with. */
ExecutionContextBuilder dynamicParameters(Object... params);
/**
* Builds the context object.
*
* @return Created context object.
*/
ExecutionContext<Object[]> build();
}
private static class ExecutionContextBuilderImpl implements ExecutionContextBuilder {
private FragmentDescription description = new FragmentDescription(0, true, null, null, null, null);
private UUID queryId = null;
private QueryTaskExecutor executor = null;
private ClusterNode node = null;
private Object[] dynamicParams = ArrayUtils.OBJECT_EMPTY_ARRAY;
/** {@inheritDoc} */
@Override
public ExecutionContextBuilder queryId(UUID queryId) {
this.queryId = Objects.requireNonNull(queryId, "queryId");
return this;
}
/** {@inheritDoc} */
@Override
public ExecutionContextBuilder fragment(FragmentDescription fragmentDescription) {
this.description = Objects.requireNonNull(fragmentDescription, "fragmentDescription");
return this;
}
/** {@inheritDoc} */
@Override
public ExecutionContextBuilder executor(QueryTaskExecutor executor) {
this.executor = Objects.requireNonNull(executor, "executor");
return this;
}
/** {@inheritDoc} */
@Override
public ExecutionContextBuilder localNode(ClusterNode node) {
this.node = Objects.requireNonNull(node, "node");
return this;
}
@Override
public ExecutionContextBuilder dynamicParameters(Object... params) {
this.dynamicParams = params;
return this;
}
/** {@inheritDoc} */
@Override
public ExecutionContext<Object[]> build() {
return new ExecutionContext<>(
Objects.requireNonNull(executor, "executor"),
queryId,
Objects.requireNonNull(node, "node"),
node.name(),
description,
ArrayRowHandler.INSTANCE,
Commons.parametersMap(dynamicParams),
TxAttributes.fromTx(new NoOpTransaction(node.name())),
SqlQueryProcessor.DEFAULT_TIME_ZONE_ID
);
}
}
private static class ClusterBuilderImpl implements ClusterBuilder {
private final List<ClusterTableBuilderImpl> tableBuilders = new ArrayList<>();
private List<String> nodeNames;
private boolean useTablePartitions;
private final Map<String, Map<String, ScannableTable>> nodeName2tableName2table = new HashMap<>();
private final List<SystemView<?>> systemViews = new ArrayList<>();
private final Map<String, Set<String>> nodeName2SystemView = new HashMap<>();
/** {@inheritDoc} */
@Override
public ClusterBuilder nodes(String firstNodeName, String... otherNodeNames) {
this.nodeNames = new ArrayList<>();
nodeNames.add(firstNodeName);
nodeNames.addAll(Arrays.asList(otherNodeNames));
return this;
}
/** {@inheritDoc} */
@Override
public ClusterBuilder nodes(String firstNodeName, boolean useTablePartitions, String... otherNodeNames) {
this.nodeNames = new ArrayList<>();
this.useTablePartitions = useTablePartitions;
nodeNames.add(firstNodeName);
nodeNames.addAll(Arrays.asList(otherNodeNames));
return this;
}
/** {@inheritDoc} */
@Override
public ClusterTableBuilder addTable() {
return new ClusterTableBuilderImpl(this);
}
@Override
public <T> ClusterBuilder addSystemView(SystemView<T> systemView) {
systemViews.add(systemView);
return this;
}
@Override
public ClusterBuilder dataProvider(String nodeName, String tableName, ScannableTable table) {
nodeName2tableName2table.computeIfAbsent(nodeName, key -> new HashMap<>()).put(tableName, table);
return this;
}
@Override
public ClusterBuilder registerSystemView(String nodeName, String systemViewName) {
nodeName2SystemView.computeIfAbsent(nodeName, key -> new HashSet<>()).add(systemViewName);
return this;
}
/** {@inheritDoc} */
@Override
public TestCluster build() {
validateConfiguredDataProviders();
var clusterService = new ClusterServiceFactory(nodeNames);
var clusterName = "test_cluster";
HybridClock clock = new HybridClockImpl();
CatalogManager catalogManager = CatalogTestUtils.createCatalogManagerWithTestUpdateLog(clusterName, clock);
var parserService = new ParserServiceImpl();
var prepareService = new PrepareServiceImpl(clusterName, 0, CaffeineCacheFactory.INSTANCE,
new DdlSqlToCommandConverter(), PLANNING_TIMEOUT, PLANNING_THREAD_COUNT,
mock(MetricManagerImpl.class));
Map<String, List<String>> owningNodesByTableName = new HashMap<>();
for (Entry<String, Map<String, ScannableTable>> entry : nodeName2tableName2table.entrySet()) {
for (String tableName : entry.getValue().keySet()) {
owningNodesByTableName.computeIfAbsent(tableName, key -> new ArrayList<>()).add(entry.getKey());
}
}
Map<String, List<String>> systemViewsByNode = new HashMap<>();
for (Entry<String, Set<String>> entry : nodeName2SystemView.entrySet()) {
String nodeName = entry.getKey();
for (String systemViewName : entry.getValue()) {
systemViewsByNode.computeIfAbsent(nodeName, (k) -> new ArrayList<>()).add(systemViewName);
}
}
ClockWaiter clockWaiter = new ClockWaiter("test", clock);
var ddlHandler = new DdlCommandHandler(catalogManager, new TestClockService(clock, clockWaiter), () -> 100);
var schemaManager = new SqlSchemaManagerImpl(catalogManager, CaffeineCacheFactory.INSTANCE, 0);
Runnable initClosure = () -> {
assertThat(clockWaiter.startAsync(), willCompleteSuccessfully());
initAction(catalogManager);
};
List<LogicalNode> logicalNodes = nodeNames.stream()
.map(name -> {
List<String> systemViewForNode = systemViewsByNode.getOrDefault(name, List.of());
NetworkAddress addr = NetworkAddress.from("127.0.0.1:10000");
LogicalNode logicalNode = new LogicalNode(name, name, addr);
if (systemViewForNode.isEmpty()) {
return logicalNode;
} else {
String attrName = SystemViewManagerImpl.NODE_ATTRIBUTES_KEY;
String nodeNameSep = SystemViewManagerImpl.NODE_ATTRIBUTES_LIST_SEPARATOR;
String nodeNamesString = String.join(nodeNameSep, systemViewForNode);
return new LogicalNode(logicalNode, Map.of(), Map.of(attrName, nodeNamesString), List.of());
}
})
.collect(Collectors.toList());
Map<String, TestNode> nodes = nodeNames.stream()
.map(name -> {
var systemViewManager = new SystemViewManagerImpl(name, catalogManager);
var targetProvider = new TestNodeExecutionTargetProvider(
systemViewManager::owningNodes,
owningNodesByTableName,
useTablePartitions
);
var partitionPruner = new PartitionPrunerImpl();
var mappingService = new MappingServiceImpl(
name,
targetProvider,
EmptyCacheFactory.INSTANCE,
0,
partitionPruner,
Runnable::run
);
systemViewManager.register(() -> systemViews);
LogicalTopologySnapshot newTopology = new LogicalTopologySnapshot(1L, logicalNodes);
mappingService.onTopologyLeap(newTopology);
systemViewManager.onTopologyLeap(newTopology);
return new TestNode(
name,
clusterService.forNode(name),
parserService,
prepareService,
schemaManager,
mappingService,
new TestExecutableTableRegistry(nodeName2tableName2table.get(name), schemaManager),
ddlHandler,
systemViewManager
);
})
.collect(Collectors.toMap(TestNode::name, Function.identity()));
return new TestCluster(
nodes,
catalogManager,
prepareService,
clockWaiter,
initClosure
);
}
private void validateConfiguredDataProviders() {
Set<String> dataProvidersOwners = new HashSet<>(nodeName2tableName2table.keySet());
dataProvidersOwners.removeAll(Set.copyOf(nodeNames));
if (!dataProvidersOwners.isEmpty()) {
Map<String, List<String>> problematicTables = new HashMap<>();
for (String outsiderNode : dataProvidersOwners) {
for (String problematicTable : nodeName2tableName2table.get(outsiderNode).keySet()) {
problematicTables.computeIfAbsent(problematicTable, k -> new ArrayList<>()).add(outsiderNode);
}
}
String problematicTablesString = problematicTables.entrySet().stream()
.map(e -> e.getKey() + ": " + e.getValue())
.collect(Collectors.joining(", "));
throw new AssertionError(format("The table has a dataProvider that is outside the cluster "
+ "[{}]", problematicTablesString));
}
}
private void initAction(CatalogManager catalogManager) {
List<CatalogCommand> initialSchema = tableBuilders.stream()
.flatMap(builder -> builder.build().stream())
.collect(Collectors.toList());
CompletableFuture<Boolean> indicesReadyFut = new CompletableFuture<>();
CopyOnWriteArraySet<Integer> initialIndices = new CopyOnWriteArraySet<>();
// Make indices registered via builder API available on startup.
if (!tableBuilders.isEmpty()) {
Consumer<MakeIndexAvailableEventParameters> indexAvailableHandler = params -> {
initialIndices.remove(params.indexId());
if (initialIndices.isEmpty()) {
indicesReadyFut.complete(true);
}
};
EventListener<MakeIndexAvailableEventParameters> listener = EventListener.fromConsumer(indexAvailableHandler);
catalogManager.listen(CatalogEvent.INDEX_AVAILABLE, listener);
// Remove listener, when all indices become available.
indicesReadyFut.whenComplete((r, t) -> {
catalogManager.removeListener(CatalogEvent.INDEX_AVAILABLE, listener);
});
} else {
indicesReadyFut.complete(true);
}
// Every time an index is created add `start building `and `make available` commands
// to make that index accessible to the SQL engine.
Consumer<CreateIndexEventParameters> createIndexHandler = (params) -> {
CatalogIndexDescriptor index = params.indexDescriptor();
int indexId = index.id();
CatalogCommand startBuildIndexCommand = StartBuildingIndexCommand.builder().indexId(indexId).build();
CatalogCommand makeIndexAvailableCommand = MakeIndexAvailableCommand.builder().indexId(indexId).build();
// Collect initial indexes only if catalog init future has not completed.
if (!indicesReadyFut.isDone()) {
initialIndices.add(indexId);
}
LOG.info("Index has been created. Sending commands to make index available. id: {}, name: {}, status: {}",
indexId, index.name(), index.status());
catalogManager.execute(List.of(startBuildIndexCommand, makeIndexAvailableCommand));
};
catalogManager.listen(CatalogEvent.INDEX_CREATE, EventListener.fromConsumer(createIndexHandler));
// Init schema.
await(catalogManager.execute(initialSchema));
// Wait until all indices become available.
await(indicesReadyFut);
}
}
private static class TableBuilderImpl implements TableBuilder {
private final List<AbstractTableIndexBuilderImpl<?>> indexBuilders = new ArrayList<>();
private final List<ColumnDescriptor> columns = new ArrayList<>();
private String name;
private IgniteDistribution distribution;
private int size = 100_000;
private Integer tableId;
private int partitions = CatalogUtils.DEFAULT_PARTITION_COUNT;
/** {@inheritDoc} */
@Override
public TableBuilder name(String name) {
this.name = name;
return this;
}
/** {@inheritDoc} */
@Override
public SortedIndexBuilder sortedIndex() {
return new SortedIndexBuilderImpl(this);
}
/** {@inheritDoc} */
@Override
public HashIndexBuilder hashIndex() {
return new HashIndexBuilderImpl(this);
}
/** {@inheritDoc} */
@Override
public TableBuilder distribution(IgniteDistribution distribution) {
this.distribution = distribution;
return this;
}
/** {@inheritDoc} */
@Override
public TableBuilder addColumn(String name, NativeType type, boolean nullable) {
columns.add(new ColumnDescriptorImpl(
name, false, false, nullable, columns.size(), type, DefaultValueStrategy.DEFAULT_NULL, null
));
return this;
}
/** {@inheritDoc} */
@Override
public TableBuilder addColumn(String name, NativeType type) {
return addColumn(name, type, true);
}
/** {@inheritDoc} */
@Override
public TableBuilder addColumn(String name, NativeType type, @Nullable Object defaultValue) {
if (defaultValue == null) {
return addColumn(name, type);
} else {
ColumnDescriptorImpl desc = new ColumnDescriptorImpl(
name, false, false, true, columns.size(), type, DefaultValueStrategy.DEFAULT_CONSTANT, () -> defaultValue
);
columns.add(desc);
}
return this;
}
/** {@inheritDoc} */
@Override
public TableBuilder addKeyColumn(String name, NativeType type) {
columns.add(new ColumnDescriptorImpl(
name, true, false, false, columns.size(), type, DefaultValueStrategy.DEFAULT_NULL, null
));
return this;
}
/** {@inheritDoc} */
@Override
public TableBuilder size(int size) {
this.size = size;
return this;
}
/** {@inheritDoc} */
@Override
public TableBuilder tableId(int id) {
this.tableId = id;
return this;
}
/** {@inheritDoc} */
@Override
public TableBuilder partitions(int num) {
this.partitions = num;
return this;
}
/** {@inheritDoc} */
@Override
public IgniteTable build() {
if (distribution == null) {
throw new IllegalArgumentException("Distribution is not specified");
}
if (name == null) {
throw new IllegalArgumentException("Name is not specified");
}
if (columns.isEmpty()) {
throw new IllegalArgumentException("Table must contain at least one column");
}
TableDescriptorImpl tableDescriptor = new TableDescriptorImpl(columns, distribution);
Map<String, IgniteIndex> indexes = indexBuilders.stream()
.map(idx -> idx.build(tableDescriptor))
.collect(Collectors.toUnmodifiableMap(IgniteIndex::name, Function.identity()));
return new IgniteTableImpl(
Objects.requireNonNull(name),
tableId != null ? tableId : TABLE_ID_GEN.incrementAndGet(),
1,
tableDescriptor,
findPrimaryKey(tableDescriptor, indexes.values()),
new TestStatistic(size),
indexes,
partitions
);
}
}
private static ImmutableIntList findPrimaryKey(TableDescriptor descriptor, Collection<IgniteIndex> indexList) {
IgniteIndex primaryKey = indexList.stream()
.filter(IgniteIndex::primaryKey)
.findFirst()
.orElse(null);
if (primaryKey != null) {
return primaryKey.collation().getKeys();
}
List<Integer> list = new ArrayList<>();
for (ColumnDescriptor column : descriptor) {
if (column.key()) {
list.add(column.logicalIndex());
}
}
return ImmutableIntList.copyOf(list);
}
private static class ClusterTableBuilderImpl implements ClusterTableBuilder {
private final List<AbstractClusterTableIndexBuilderImpl<?>> indexBuilders = new ArrayList<>();
private final List<ColumnParams> columns = new ArrayList<>();
private final List<String> keyColumns = new ArrayList<>();
private final ClusterBuilderImpl parent;
private final String schemaName = CatalogManager.DEFAULT_SCHEMA_NAME;
private String name;
private ClusterTableBuilderImpl(ClusterBuilderImpl parent) {
this.parent = parent;
}
/** {@inheritDoc} */
@Override
public ClusterTableBuilder name(String name) {
this.name = name;
return this;
}
/** {@inheritDoc} */
@Override
public ClusterSortedIndexBuilder addSortedIndex() {
return new ClusterSortedIndexBuilderImpl(this);
}
/** {@inheritDoc} */
@Override
public ClusterHashIndexBuilder addHashIndex() {
return new ClusterHashIndexBuilderImpl(this);
}
@Override
public ClusterTableBuilder addColumn(String name, NativeType type, boolean nullable) {
columns.add(columnParams(name, type, nullable, null));
return this;
}
/** {@inheritDoc} */
@Override
public ClusterTableBuilder addColumn(String name, NativeType type) {
return addColumn(name, type, true);
}
@Override
public ClusterTableBuilder addColumn(String name, NativeType type, @Nullable Object defaultValue) {
columns.add(columnParams(name, type, true, defaultValue));
return this;
}
@Override
public ClusterTableBuilder addKeyColumn(String name, NativeType type) {
keyColumns.add(name);
return addColumn(name, type, false);
}
/** {@inheritDoc} */
@Override
public ClusterBuilder end() {
parent.tableBuilders.add(this);
return parent;
}
private List<CatalogCommand> build() {
List<CatalogCommand> commands = new ArrayList<>(1 + indexBuilders.size());
// TODO https://issues.apache.org/jira/browse/IGNITE-21715 Update after TestFramework provides API
// to specify type of a primary key index.
// Use sorted index by default.
TablePrimaryKey primaryKey = TableHashPrimaryKey.builder()
.columns(keyColumns)
.build();
commands.add(
CreateTableCommand.builder()
.schemaName(schemaName)
.tableName(name)
.columns(columns)
.primaryKey(primaryKey)
.build()
);
for (AbstractClusterTableIndexBuilderImpl<?> builder : indexBuilders) {
commands.add(builder.build(schemaName, name));
}
return commands;
}
}
private static class SortedIndexBuilderImpl extends AbstractTableIndexBuilderImpl<SortedIndexBuilder>
implements SortedIndexBuilder {
private final TableBuilderImpl parent;
private SortedIndexBuilderImpl(TableBuilderImpl parent) {
this.parent = parent;
}
/** {@inheritDoc} */
@Override
SortedIndexBuilder self() {
return this;
}
/** {@inheritDoc} */
@Override
public TableBuilder end() {
parent.indexBuilders.add(this);
return parent;
}
/** {@inheritDoc} */
@Override
public TestIndex build(TableDescriptor desc) {
if (name == null) {
throw new IllegalArgumentException("Name is not specified");
}
if (columns.isEmpty()) {
throw new IllegalArgumentException("Index must contain at least one column");
}
if (collations.size() != columns.size()) {
throw new IllegalArgumentException("Collation must be specified for each of columns.");
}
return TestIndex.createSorted(name, columns, collations, desc);
}
}
private static class HashIndexBuilderImpl extends AbstractTableIndexBuilderImpl<HashIndexBuilder> implements HashIndexBuilder {
private final TableBuilderImpl parent;
private HashIndexBuilderImpl(TableBuilderImpl parent) {
this.parent = parent;
}
/** {@inheritDoc} */
@Override
HashIndexBuilder self() {
return this;
}
/** {@inheritDoc} */
@Override
public TableBuilder end() {
parent.indexBuilders.add(this);
return parent;
}
/** {@inheritDoc} */
@Override
public TestIndex build(TableDescriptor desc) {
if (name == null) {
throw new IllegalArgumentException("Name is not specified");
}
if (columns.isEmpty()) {
throw new IllegalArgumentException("Index must contain at least one column");
}
assert collations == null : "Collation is not supported.";
return TestIndex.createHash(name, columns, desc);
}
}
private static class ClusterSortedIndexBuilderImpl extends AbstractClusterTableIndexBuilderImpl<ClusterSortedIndexBuilder>
implements ClusterSortedIndexBuilder {
private final ClusterTableBuilderImpl parent;
ClusterSortedIndexBuilderImpl(ClusterTableBuilderImpl parent) {
this.parent = parent;
}
/** {@inheritDoc} */
@Override
ClusterSortedIndexBuilder self() {
return this;
}
/** {@inheritDoc} */
@Override
public ClusterTableBuilder end() {
parent.indexBuilders.add(this);
return parent;
}
@Override
CatalogCommand build(String schemaName, String tableName) {
assert collations.size() == columns.size();
List<CatalogColumnCollation> catalogCollations = collations.stream()
.map(c -> CatalogColumnCollation.get(c.asc, c.nullsFirst))
.collect(Collectors.toList());
return CreateSortedIndexCommand.builder()
.schemaName(schemaName)
.tableName(tableName)
.indexName(name)
.columns(columns)
.collations(catalogCollations)
.build();
}
}
private static class ClusterHashIndexBuilderImpl extends AbstractClusterTableIndexBuilderImpl<ClusterHashIndexBuilder>
implements ClusterHashIndexBuilder {
private final ClusterTableBuilderImpl parent;
ClusterHashIndexBuilderImpl(ClusterTableBuilderImpl parent) {
this.parent = parent;
}
/** {@inheritDoc} */
@Override
ClusterHashIndexBuilder self() {
return this;
}
/** {@inheritDoc} */
@Override
public ClusterTableBuilder end() {
parent.indexBuilders.add(this);
return parent;
}
@Override
CatalogCommand build(String schemaName, String tableName) {
return CreateHashIndexCommand.builder()
.schemaName(schemaName)
.tableName(tableName)
.indexName(name)
.columns(columns)
.build();
}
}
private abstract static class AbstractIndexBuilderImpl<ChildT> implements SortedIndexBuilderBase<ChildT>, HashIndexBuilderBase<ChildT> {
String name;
final List<String> columns = new ArrayList<>();
List<Collation> collations;
/** {@inheritDoc} */
@Override
public ChildT name(String name) {
this.name = name;
return self();
}
/** {@inheritDoc} */
@Override
public ChildT addColumn(String columnName) {
columns.add(columnName);
return self();
}
/** {@inheritDoc} */
@Override
public ChildT addColumn(String columnName, Collation collation) {
if (collations == null) {
collations = new ArrayList<>();
}
columns.add(columnName);
collations.add(collation);
return self();
}
abstract ChildT self();
}
private abstract static class AbstractTableIndexBuilderImpl<ChildT> extends AbstractIndexBuilderImpl<ChildT> {
abstract TestIndex build(TableDescriptor desc);
}
private abstract static class AbstractClusterTableIndexBuilderImpl<ChildT> extends AbstractIndexBuilderImpl<ChildT> {
abstract CatalogCommand build(String schemaName, String tableName);
}
/**
* Base interface describing the complete set of table-related fields.
*
* <p>The sole purpose of this interface is to keep in sync both variants of table's builders.
*
* @param <ChildT> An actual type of builder that should be exposed to the user.
* @see ClusterTableBuilder
* @see TableBuilder
*/
private interface TableBuilderBase<ChildT> {
/** Sets the name of the table. */
ChildT name(String name);
/** Adds a key column to the table. */
ChildT addKeyColumn(String name, NativeType type);
/** Adds a column to the table. */
ChildT addColumn(String name, NativeType type);
/** Adds a column with given nullability to the table. */
ChildT addColumn(String name, NativeType type, boolean nullable);
/** Adds a column with the given default value to the table. */
ChildT addColumn(String name, NativeType type, @Nullable Object defaultValue);
}
/**
* Base interface describing the common set of index-related fields.
*
* <p>The sole purpose of this interface is to keep in sync both variants of index's builders.
*
* @param <ChildT> An actual type of builder that should be exposed to the user.
* @see ClusterHashIndexBuilder
* @see ClusterSortedIndexBuilder
* @see HashIndexBuilder
* @see SortedIndexBuilder
*/
private interface IndexBuilderBase<ChildT> {
/** Sets the name of the index. */
ChildT name(String name);
}
/**
* Base interface describing the set of sorted-index related fields.
*
* @param <ChildT> An actual type of builder that should be exposed to the user.
*/
private interface SortedIndexBuilderBase<ChildT> extends IndexBuilderBase<ChildT> {
/** Adds a column with specified collation to the index. */
ChildT addColumn(String columnName, Collation collation);
}
/**
* Base interface describing the set of hash-index related fields.
*
* @param <ChildT> An actual type of builder that should be exposed to the user.
*/
private interface HashIndexBuilderBase<ChildT> extends IndexBuilderBase<ChildT> {
/** Adds a column to the index. */
ChildT addColumn(String columnName);
}
/**
* This interfaces provides a nested builder with ability to return on the previous layer.
*
* <p>For example:</p>
* <pre>
* interface ChildBuilder implements NestedBuilder&lt;ParentBuilder&gt; {
* ChildBuilder nestedFoo();
* }
*
* interface ParentBuilder {
* ParentBuilder foo();
* ParentBuilder bar();
* ChildBuilder child();
* }
*
* Builders.parent()
* .foo()
* .child() // now we are dealing with the ChildBuilder
* .nestedFoo()
* .end() // and here we are returning back to the ParentBuilder
* .bar()
* .build()
* </pre>
*/
@FunctionalInterface
private interface NestedBuilder<ParentT> {
/**
* Notifies the builder's chain of the nested builder that we need to return back to the previous layer.
*
* @return An instance of the parent builder.
*/
ParentT end();
}
private static class TestExecutableTableRegistry implements ExecutableTableRegistry {
private final Map<String, ScannableTable> tablesByName;
private final SqlSchemaManager schemaManager;
TestExecutableTableRegistry(Map<String, ScannableTable> tablesByName, SqlSchemaManager schemaManager) {
this.tablesByName = tablesByName;
this.schemaManager = schemaManager;
}
@Override
public CompletableFuture<ExecutableTable> getTable(int schemaVersion, int tableId) {
IgniteTable table = schemaManager.table(schemaVersion, tableId);
assert table != null;
return CompletableFuture.completedFuture(new ExecutableTable() {
@Override
public ScannableTable scannableTable() {
ScannableTable scannableTable = tablesByName.get(table.name());
assert scannableTable != null;
return scannableTable;
}
@Override
public UpdatableTable updatableTable() {
throw new UnsupportedOperationException();
}
@Override
public TableDescriptor tableDescriptor() {
return table.descriptor();
}
@Override
public Supplier<PartitionCalculator> partitionCalculator() {
return table.partitionCalculator();
}
});
}
}
private static ColumnParams columnParams(String name, NativeType type, boolean nullable, @Nullable Object defaultValue) {
NativeTypeSpec typeSpec = type.spec();
Builder builder = ColumnParams.builder()
.name(name)
.type(typeSpec.asColumnType())
.nullable(nullable)
.defaultValue(DefaultValue.constant(defaultValue));
switch (typeSpec) {
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT:
case DOUBLE:
case DATE:
case UUID:
case BOOLEAN:
break;
case NUMBER:
assert type instanceof NumberNativeType : type.getClass().getCanonicalName();
builder.precision(((NumberNativeType) type).precision());
break;
case DECIMAL:
assert type instanceof DecimalNativeType : type.getClass().getCanonicalName();
builder.precision(((DecimalNativeType) type).precision());
builder.scale(((DecimalNativeType) type).scale());
break;
case STRING:
case BYTES:
assert type instanceof VarlenNativeType : type.getClass().getCanonicalName();
builder.length(((VarlenNativeType) type).length());
break;
case BITMASK:
assert type instanceof BitmaskNativeType : type.getClass().getCanonicalName();
builder.length(((BitmaskNativeType) type).bits());
break;
case TIME:
case DATETIME:
case TIMESTAMP:
assert type instanceof TemporalNativeType : type.getClass().getCanonicalName();
builder.precision(((TemporalNativeType) type).precision());
break;
default:
throw new IllegalArgumentException("Unsupported native type: " + typeSpec);
}
return builder.build();
}
private static Object[] project(Object[] row, @Nullable BitSet requiredElements) {
if (requiredElements == null) {
return row;
}
Object[] newRow = new Object[requiredElements.cardinality()];
int idx = 0;
for (int i = requiredElements.nextSetBit(0); i != -1; i = requiredElements.nextSetBit(i + 1)) {
newRow[idx++] = row[i];
}
return newRow;
}
/** Returns a builder for {@link ExecutionTargetProvider}. */
public static ExecutionTargetProviderBuilder executionTargetProviderBuilder() {
return new ExecutionTargetProviderBuilder();
}
/** A builder to create instances of {@link ExecutionTargetProvider}. */
public static final class ExecutionTargetProviderBuilder {
private final Map<String, List<String>> owningNodesByTableName = new HashMap<>();
private Function<String, List<String>> owningNodesBySystemViewName = (n) -> null;
private boolean useTablePartitions;
private ExecutionTargetProviderBuilder() {
}
/** Adds tables to list of nodes mapping. */
public ExecutionTargetProviderBuilder addTables(Map<String, List<String>> tables) {
this.owningNodesByTableName.putAll(tables);
return this;
}
/**
* Sets a function that returns system views. Function accepts a view name and returns a list of nodes
* a system view is available at.
*/
public ExecutionTargetProviderBuilder setSystemViews(Function<String, List<String>> systemViews) {
this.owningNodesBySystemViewName = systemViews;
return this;
}
/** Use table partitions to build mapping targets. Default is {@code false}. */
public ExecutionTargetProviderBuilder useTablePartitions(boolean value) {
useTablePartitions = value;
return this;
}
/** Creates an instance of {@link ExecutionTargetProvider}. */
public ExecutionTargetProvider build() {
return new TestNodeExecutionTargetProvider(
owningNodesBySystemViewName,
Map.copyOf(owningNodesByTableName),
useTablePartitions
);
}
}
private static class TestNodeExecutionTargetProvider implements ExecutionTargetProvider {
final Function<String, List<String>> owningNodesBySystemViewName;
final Map<String, List<String>> owningNodesByTableName;
final boolean useTablePartitions;
private TestNodeExecutionTargetProvider(
Function<String, List<String>> owningNodesBySystemViewName,
Map<String, List<String>> owningNodesByTableName,
boolean useTablePartitions
) {
this.owningNodesBySystemViewName = owningNodesBySystemViewName;
this.owningNodesByTableName = Map.copyOf(owningNodesByTableName);
this.useTablePartitions = useTablePartitions;
}
@Override
public CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, IgniteTable table) {
List<String> owningNodes = owningNodesByTableName.get(table.name());
if (nullOrEmpty(owningNodes)) {
throw new AssertionError("DataProvider is not configured for table " + table.name());
}
List<NodeWithConsistencyToken> nodes;
if (useTablePartitions) {
int p = table.partitions();
nodes = IntStream.range(0, p).mapToObj(n -> {
String nodeName = owningNodes.get(n % owningNodes.size());
return new NodeWithConsistencyToken(nodeName, p);
}).collect(Collectors.toList());
} else {
nodes = owningNodes.stream()
.map(name -> new NodeWithConsistencyToken(name, 1))
.collect(Collectors.toList());
}
ExecutionTarget target = factory.partitioned(nodes);
return CompletableFuture.completedFuture(target);
}
@Override
public CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
List<String> nodes = owningNodesBySystemViewName.apply(view.name());
if (nullOrEmpty(nodes)) {
return CompletableFuture.failedFuture(
new SqlException(Sql.MAPPING_ERR, format("The view with name '{}' could not be found on"
+ " any active nodes in the cluster", view.name()))
);
}
return CompletableFuture.completedFuture(
view.distribution() == IgniteDistributions.single()
? factory.oneOf(nodes)
: factory.allOf(nodes)
);
}
}
}