blob: 130f5c7e9284f4dec37e5f6d68f8c90309ada4b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.framework;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolver;
import org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolverImpl;
import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserService;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.StringUtils;
import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.Nullable;
/**
* An object representing a node in test cluster.
*
* <p>Provides convenient access to the methods for optimization and execution of the queries.
*/
public class TestNode implements LifecycleAware {
private final String nodeName;
private final SqlSchemaManager schemaManager;
private final PrepareService prepareService;
private final ExecutionService executionService;
private final ParserService parserService;
private final MessageService messageService;
private final List<LifecycleAware> services = new ArrayList<>();
volatile boolean exceptionRaised;
private final IgniteSpinBusyLock holdLock;
private final HybridClock clock = new HybridClockImpl();
private final ClockService clockService = new TestClockService(clock);
/**
* Constructs the object.
*
* @param nodeName A name of the node to create.
* @param clusterService A cluster service.
* @param schemaManager A schema manager to use for query planning and execution.
*/
TestNode(
String nodeName,
ClusterService clusterService,
ParserService parserService,
PrepareService prepareService,
SqlSchemaManager schemaManager,
MappingService mappingService,
ExecutableTableRegistry tableRegistry,
DdlCommandHandler ddlCommandHandler,
SystemViewManager systemViewManager
) {
this.nodeName = nodeName;
this.parserService = parserService;
this.prepareService = prepareService;
this.schemaManager = schemaManager;
TopologyService topologyService = clusterService.topologyService();
MessagingService messagingService = clusterService.messagingService();
RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
MailboxRegistry mailboxRegistry = registerService(new MailboxRegistryImpl());
FailureProcessor failureProcessor = new FailureProcessor(nodeName, (a, b) -> exceptionRaised = true);
QueryTaskExecutorImpl queryExec = new QueryTaskExecutorImpl(nodeName, 4, failureProcessor);
QueryTaskExecutor taskExecutor = registerService(queryExec);
holdLock = new IgniteSpinBusyLock();
messageService = registerService(new MessageServiceImpl(
nodeName, messagingService, taskExecutor, holdLock, clockService
));
ExchangeService exchangeService = registerService(new ExchangeServiceImpl(
mailboxRegistry, messageService, clockService
));
ExecutionDependencyResolver dependencyResolver = new ExecutionDependencyResolverImpl(
tableRegistry, view -> () -> systemViewManager.scanView(view.name())
);
executionService = registerService(ExecutionServiceImpl.create(
topologyService,
messageService,
schemaManager,
ddlCommandHandler,
taskExecutor,
rowHandler,
mailboxRegistry,
exchangeService,
mappingService,
tableRegistry,
dependencyResolver,
clockService,
5_000
));
registerService(new IgniteComponentLifecycleAwareAdapter(systemViewManager));
}
/** {@inheritDoc} */
@Override
public void start() {
services.forEach(LifecycleAware::start);
}
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
holdLock.block();
List<AutoCloseable> closeables = services.stream()
.map(service -> ((AutoCloseable) service::stop))
.collect(Collectors.toList());
Collections.reverse(closeables);
IgniteUtils.closeAll(closeables);
}
/** Returns the name of the current node. */
public String name() {
return nodeName;
}
MessageService messageService() {
return messageService;
}
IgniteSpinBusyLock holdLock() {
return holdLock;
}
HybridClock clock() {
return clock;
}
ClockService clockService() {
return clockService;
}
/**
* Executes given plan on a cluster this node belongs to
* and returns an async cursor representing the result.
*
* @param plan A plan to execute.
* @param transaction External transaction.
* @return A cursor representing the result.
*/
public AsyncCursor<InternalSqlRow> executePlan(QueryPlan plan, @Nullable InternalTransaction transaction) {
return executionService.executePlan(transaction == null ? new NoOpTransaction(nodeName) : transaction, plan, createContext());
}
/**
* Executes given plan on a cluster this node belongs to
* and returns an async cursor representing the result.
*
* @param plan A plan to execute.
* @return A cursor representing the result.
*/
public AsyncCursor<InternalSqlRow> executePlan(QueryPlan plan) {
return executePlan(plan, null);
}
/**
* Prepares (aka parses, validates, and optimizes) the given query string
* and returns the plan to execute.
*
* @param query A query string to prepare.
* @return A plan to execute.
*/
public QueryPlan prepare(String query) {
ParsedResult parsedResult = parserService.parse(query);
BaseQueryContext ctx = createContext();
return await(prepareService.prepareAsync(parsedResult, ctx));
}
/**
* Prepares (validates, and optimizes) the given query AST
* and returns the plan to execute.
*
* @param parsedResult Parsed AST of a query to prepare.
* @return A plan to execute.
*/
public QueryPlan prepare(ParsedResult parsedResult) {
return await(prepareService.prepareAsync(parsedResult, createContext()));
}
/**
* Executes the given script.
*
* <p>This method splits given string by semicolon and execute every statement
* one by one. Technically it may execute SELECT statements as well, but since
* it returns nothing, it doesn't make any sense.
*
* @param script Script to execute.
*/
public void initSchema(String script) {
for (String statement : script.split(";")) {
if (StringUtils.nullOrBlank(statement) || statement.trim().startsWith("--")) {
continue;
}
ParsedResult parsedResult = parserService.parse(statement);
BaseQueryContext ctx = createContext();
QueryPlan plan = await(prepareService.prepareAsync(parsedResult, ctx));
if (plan.type() != SqlQueryType.DDL && plan.type() != SqlQueryType.DML) {
continue;
}
AsyncCursor<?> cursor = executionService.executePlan(new NoOpTransaction("tx"), plan, ctx);
await(cursor.requestNextAsync(1));
}
}
private BaseQueryContext createContext() {
return BaseQueryContext.builder()
.queryId(UUID.randomUUID())
.cancel(new QueryCancel())
.frameworkConfig(
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
.defaultSchema(schemaManager.schema(Long.MAX_VALUE).getSubSchema(DEFAULT_SCHEMA_NAME))
.build()
)
.timeZoneId(SqlQueryProcessor.DEFAULT_TIME_ZONE_ID)
.build();
}
private <T extends LifecycleAware> T registerService(T service) {
services.add(service);
return service;
}
private static class IgniteComponentLifecycleAwareAdapter implements LifecycleAware {
final IgniteComponent component;
private IgniteComponentLifecycleAwareAdapter(IgniteComponent component) {
this.component = component;
}
@Override
public void start() {
assertThat(component.startAsync(), willCompleteSuccessfully());
}
@Override
public void stop() {
assertThat(component.stopAsync(), willCompleteSuccessfully());
}
}
}