blob: 8bad0aca73eaf434a9ddac5cef87f3413d6d98db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.exec;
import static org.apache.ignite.internal.sql.engine.util.BaseQueryContext.CLUSTER;
import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.rel.Node;
import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
import org.apache.ignite.internal.sql.engine.message.ExecutionContextAwareMessage;
import org.apache.ignite.internal.sql.engine.message.MessageListener;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
import org.apache.ignite.internal.sql.engine.metadata.RemoteException;
import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTable;
import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Test class to verify {@link ExecutionServiceImplTest}.
*/
public class ExecutionServiceImplTest {
private static final IgniteLogger LOG = Loggers.forClass(ExecutionServiceImpl.class);
/** Timeout in ms for async operations. */
private static final long TIMEOUT_IN_MS = 2_000;
private final List<String> nodeIds = List.of("node_1", "node_2", "node_3");
private final Map<String, List<Object[]>> dataPerNode = Map.of(
nodeIds.get(0), List.of(new Object[]{0, 0}, new Object[]{3, 3}, new Object[]{6, 6}),
nodeIds.get(1), List.of(new Object[]{1, 1}, new Object[]{4, 4}, new Object[]{7, 7}),
nodeIds.get(2), List.of(new Object[]{2, 2}, new Object[]{5, 5}, new Object[]{8, 8})
);
private final TestTable table = createTable("TEST_TBL", 1_000_000, IgniteDistributions.random(),
"ID", Integer.class, "VAL", Integer.class);
private final IgniteSchema schema = new IgniteSchema("PUBLIC", Map.of(table.name(), table));
private TestCluster testCluster;
private List<ExecutionServiceImpl<?>> executionServices;
private PrepareService prepareService;
@BeforeEach
public void init() {
testCluster = new TestCluster();
executionServices = nodeIds.stream().map(this::create).collect(Collectors.toList());
prepareService = new PrepareServiceImpl("test", 0, null);
prepareService.start();
}
@AfterEach
public void tearDown() throws Exception {
prepareService.stop();
}
/**
* The very simple case where a cursor is closed in the middle of a normal execution.
*/
@Test
public void testCloseByCursor() throws Exception {
var execService = executionServices.get(0);
var ctx = createContext();
var plan = prepare("SELECT * FROM test_tbl", ctx);
nodeIds.stream().map(testCluster::node).forEach(TestNode::pauseScan);
var cursor = execService.executePlan(plan, ctx);
assertTrue(waitForCondition(
() -> executionServices.stream().map(es -> es.localFragments(ctx.queryId()).size())
.mapToInt(i -> i).sum() == 4, TIMEOUT_IN_MS));
CompletionStage<?> batchFut = cursor.requestNextAsync(1);
await(cursor.closeAsync());
assertTrue(waitForCondition(
() -> executionServices.stream().map(es -> es.localFragments(ctx.queryId()).size())
.mapToInt(i -> i).sum() == 0, TIMEOUT_IN_MS));
await(batchFut.exceptionally(ex -> {
assertInstanceOf(CompletionException.class, ex);
assertInstanceOf(ExecutionCancelledException.class, ex.getCause());
return null;
}));
assertTrue(batchFut.toCompletableFuture().isCompletedExceptionally());
}
/**
* The very simple case where a query is cancelled in the middle of a normal execution.
*/
@Test
public void testCancelOnInitiator() throws InterruptedException {
var execService = executionServices.get(0);
var ctx = createContext();
var plan = prepare("SELECT * FROM test_tbl", ctx);
nodeIds.stream().map(testCluster::node).forEach(TestNode::pauseScan);
var cursor = execService.executePlan(plan, ctx);
assertTrue(waitForCondition(
() -> executionServices.stream().map(es -> es.localFragments(ctx.queryId()).size())
.mapToInt(i -> i).sum() == 4, TIMEOUT_IN_MS));
CompletionStage<?> batchFut = cursor.requestNextAsync(1);
await(executionServices.get(0).cancel(ctx.queryId()));
assertTrue(waitForCondition(
() -> executionServices.stream().map(es -> es.localFragments(ctx.queryId()).size())
.mapToInt(i -> i).sum() == 0, TIMEOUT_IN_MS));
await(batchFut.exceptionally(ex -> {
assertInstanceOf(CompletionException.class, ex);
assertInstanceOf(ExecutionCancelledException.class, ex.getCause());
return null;
}));
assertTrue(batchFut.toCompletableFuture().isCompletedExceptionally());
}
/**
* A query initialization is failed on one of the remotes. Need to verify that rest of the query is closed properly.
*/
@Test
public void testInitializationFailedOnRemoteNode() throws InterruptedException {
var execService = executionServices.get(0);
var ctx = createContext();
var plan = prepare("SELECT * FROM test_tbl", ctx);
nodeIds.stream().map(testCluster::node).forEach(TestNode::pauseScan);
var expectedEx = new RuntimeException("Test error");
testCluster.node(nodeIds.get(2)).interceptor((nodeId, msg, original) -> {
if (msg instanceof QueryStartRequest) {
try {
testCluster.node(nodeIds.get(2)).messageService().send(nodeId, new SqlQueryMessagesFactory().queryStartResponse()
.queryId(((QueryStartRequest) msg).queryId())
.fragmentId(((QueryStartRequest) msg).fragmentId())
.error(expectedEx)
.build()
);
} catch (IgniteInternalCheckedException e) {
throw new IgniteInternalException(e);
}
} else {
original.onMessage(nodeId, msg);
}
});
var cursor = execService.executePlan(plan, ctx);
CompletionStage<?> batchFut = cursor.requestNextAsync(1);
assertTrue(waitForCondition(() -> batchFut.toCompletableFuture().isDone(), TIMEOUT_IN_MS));
assertTrue(waitForCondition(
() -> executionServices.stream().map(es -> es.localFragments(ctx.queryId()).size())
.mapToInt(i -> i).sum() == 0, TIMEOUT_IN_MS));
await(batchFut.exceptionally(ex -> {
assertInstanceOf(CompletionException.class, ex);
assertEquals(expectedEx, ex.getCause());
return null;
}));
assertTrue(batchFut.toCompletableFuture().isCompletedExceptionally());
}
/**
* The very simple case where a query is cancelled in the middle of a normal execution on non-initiator node.
*/
@Test
public void testCancelOnRemote() throws InterruptedException {
var execService = executionServices.get(0);
var ctx = createContext();
var plan = prepare("SELECT * FROM test_tbl", ctx);
nodeIds.stream().map(testCluster::node).forEach(TestNode::pauseScan);
var cursor = execService.executePlan(plan, ctx);
assertTrue(waitForCondition(
() -> executionServices.stream().map(es -> es.localFragments(ctx.queryId()).size())
.mapToInt(i -> i).sum() == 4, TIMEOUT_IN_MS));
var batchFut = cursor.requestNextAsync(1);
await(executionServices.get(1).cancel(ctx.queryId()));
assertTrue(waitForCondition(
() -> executionServices.stream().map(es -> es.localFragments(ctx.queryId()).size())
.mapToInt(i -> i).sum() == 0, TIMEOUT_IN_MS));
await(batchFut.exceptionally(ex -> {
assertInstanceOf(CompletionException.class, ex);
assertInstanceOf(RemoteException.class, ex.getCause());
assertInstanceOf(ExecutionCancelledException.class, ex.getCause().getCause());
return null;
}));
assertTrue(batchFut.toCompletableFuture().isCompletedExceptionally());
}
/**
* Read all data from the cursor. Requested amount is less than size of the result set.
*/
@Test
public void testCursorIsClosedAfterAllDataRead() throws InterruptedException {
var execService = executionServices.get(0);
var ctx = createContext();
var plan = prepare("SELECT * FROM test_tbl", ctx);
var cursor = execService.executePlan(plan, ctx);
BatchedResult<?> res = await(cursor.requestNextAsync(8));
assertNotNull(res);
assertTrue(res.hasMore());
assertEquals(8, res.items().size());
res = await(cursor.requestNextAsync(1));
assertNotNull(res);
assertFalse(res.hasMore());
assertEquals(1, res.items().size());
assertTrue(waitForCondition(
() -> executionServices.stream().map(es -> es.localFragments(ctx.queryId()).size())
.mapToInt(i -> i).sum() == 0, TIMEOUT_IN_MS));
}
/**
* Read all data from the cursor. Requested amount is exactly the same as the size of the result set.
*/
@Test
public void testCursorIsClosedAfterAllDataRead2() throws InterruptedException {
var execService = executionServices.get(0);
var ctx = createContext();
var plan = prepare("SELECT * FROM test_tbl", ctx);
var cursor = execService.executePlan(plan, ctx);
BatchedResult<?> res = await(cursor.requestNextAsync(9));
assertNotNull(res);
assertFalse(res.hasMore());
assertEquals(9, res.items().size());
assertTrue(waitForCondition(
() -> executionServices.stream().map(es -> es.localFragments(ctx.queryId()).size())
.mapToInt(i -> i).sum() == 0, TIMEOUT_IN_MS));
}
/** Creates an execution service instance for the node with given id. */
public ExecutionServiceImpl<Object[]> create(String nodeId) {
if (!nodeIds.contains(nodeId)) {
throw new IllegalArgumentException(format("Node id should be one of {}, but was '{}'", nodeIds, nodeId));
}
var taskExecutor = new QueryTaskExecutorImpl(nodeId);
var node = testCluster.addNode(nodeId, taskExecutor);
node.dataset(dataPerNode.get(nodeId));
var messageService = node.messageService();
var mailboxRegistry = new MailboxRegistryImpl();
var exchangeService = new ExchangeServiceImpl(
nodeId,
taskExecutor,
mailboxRegistry,
messageService
);
var schemaManagerMock = mock(SqlSchemaManager.class);
when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table);
var executionService = new ExecutionServiceImpl<>(
nodeId,
messageService,
(single, filter) -> single ? List.of(nodeIds.get(ThreadLocalRandom.current().nextInt(nodeIds.size()))) : nodeIds,
schemaManagerMock,
mock(DdlCommandHandler.class),
taskExecutor,
ArrayRowHandler.INSTANCE,
exchangeService,
ctx -> node.implementor(ctx, mailboxRegistry, exchangeService)
);
taskExecutor.start();
exchangeService.start();
executionService.start();
return executionService;
}
private BaseQueryContext createContext() {
return BaseQueryContext.builder()
.cancel(new QueryCancel())
.frameworkConfig(
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
.defaultSchema(wrap(schema))
.build()
)
.logger(LOG)
.build();
}
private SchemaPlus wrap(IgniteSchema schema) {
var schemaPlus = Frameworks.createRootSchema(false);
schemaPlus.add(schema.getName(), schema);
return schemaPlus.getSubSchema(schema.getName());
}
private QueryPlan prepare(String query, BaseQueryContext ctx) {
SqlNodeList nodes = Commons.parse(query, FRAMEWORK_CONFIG.getParserConfig());
assertThat(nodes, hasSize(1));
return prepareService.prepareAsync(nodes.get(0), ctx).join();
}
static class TestCluster {
private final Map<String, TestNode> nodes = new ConcurrentHashMap<>();
public TestNode addNode(String nodeId, QueryTaskExecutor taskExecutor) {
return nodes.computeIfAbsent(nodeId, key -> new TestNode(nodeId, taskExecutor));
}
public TestNode node(String nodeId) {
return nodes.get(nodeId);
}
class TestNode {
private final Map<Short, MessageListener> msgListeners = new ConcurrentHashMap<>();
private final Queue<RunnableX> pending = new LinkedBlockingQueue<>();
private volatile boolean dead = false;
private volatile List<Object[]> dataset = List.of();
private volatile MessageInterceptor interceptor = null;
private final QueryTaskExecutor taskExecutor;
private final String nodeId;
private boolean scanPaused = false;
public TestNode(String nodeId, QueryTaskExecutor taskExecutor) {
this.nodeId = nodeId;
this.taskExecutor = taskExecutor;
}
public void dead(boolean dead) {
this.dead = dead;
}
public boolean dead() {
return dead;
}
public void dataset(List<Object[]> dataset) {
this.dataset = dataset;
}
public void interceptor(@Nullable MessageInterceptor interceptor) {
this.interceptor = interceptor;
}
public void pauseScan() {
synchronized (pending) {
scanPaused = true;
}
}
public void resumeScan() {
synchronized (pending) {
scanPaused = false;
Throwable t = null;
for (RunnableX runnableX : pending) {
try {
runnableX.run();
} catch (Throwable t0) {
if (t == null) {
t = t0;
} else {
t.addSuppressed(t0);
}
}
}
}
}
public MessageService messageService() {
return new MessageService() {
/** {@inheritDoc} */
@Override
public void send(String targetNodeId, NetworkMessage msg) {
TestNode node = nodes.get(targetNodeId);
node.onReceive(nodeId, msg);
}
/** {@inheritDoc} */
@Override
public boolean alive(String nodeId) {
return !nodes.get(nodeId).dead();
}
/** {@inheritDoc} */
@Override
public void register(MessageListener lsnr, short msgId) {
var old = msgListeners.put(msgId, lsnr);
if (old != null) {
throw new RuntimeException(format("Listener was replaced [nodeId={}, msgId={}]", nodeId, msgId));
}
}
/** {@inheritDoc} */
@Override
public void start() {
// NO-OP
}
/** {@inheritDoc} */
@Override
public void stop() {
// NO-OP
}
};
}
public LogicalRelImplementor<Object[]> implementor(
ExecutionContext<Object[]> ctx,
MailboxRegistry mailboxRegistry,
ExchangeService exchangeService
) {
return new LogicalRelImplementor<>(ctx, cacheId -> Objects::hashCode, mailboxRegistry, exchangeService) {
@Override
public Node<Object[]> visit(IgniteTableScan rel) {
RelDataType rowType = rel.getRowType();
return new ScanNode<>(ctx, rowType, dataset) {
@Override
public void request(int rowsCnt) {
RunnableX task = () -> super.request(rowsCnt);
synchronized (pending) {
if (scanPaused) {
pending.add(task);
} else {
try {
task.run();
} catch (Throwable ex) {
throw new IgniteInternalException(ex);
}
}
}
}
};
}
};
}
private void onReceive(String senderNodeId, NetworkMessage message) {
MessageListener original = (nodeId, msg) -> {
MessageListener listener = msgListeners.get(msg.messageType());
if (listener == null) {
throw new IllegalStateException(
format("Listener not found [senderNodeId={}, msgId={}]", nodeId, msg.messageType()));
}
if (msg instanceof ExecutionContextAwareMessage) {
ExecutionContextAwareMessage msg0 = (ExecutionContextAwareMessage) msg;
taskExecutor.execute(msg0.queryId(), msg0.fragmentId(), () -> listener.onMessage(nodeId, msg));
} else {
taskExecutor.execute(() -> listener.onMessage(nodeId, msg));
}
};
MessageInterceptor interceptor = this.interceptor;
if (interceptor != null) {
interceptor.intercept(senderNodeId, message, original);
} else {
original.onMessage(senderNodeId, message);
}
}
}
interface MessageInterceptor {
void intercept(String senderNodeId, NetworkMessage msg, MessageListener original);
}
}
/**
* Creates test table with given params.
*
* @param name Name of the table.
* @param size Required size of the table.
* @param distr Distribution of the table.
* @param fields List of the required fields. Every odd item should be a string representing a column name, every even item should be a
* class representing column's type. E.g. {@code createTable("MY_TABLE", 500, distribution, "ID", Integer.class, "VAL",
* String.class)}.
* @return Instance of the {@link TestTable}.
*/
// TODO: copy-pasted from AbstractPlannerTest. Should be derived to an independent class.
protected TestTable createTable(String name, int size, IgniteDistribution distr, Object... fields) {
if (ArrayUtils.nullOrEmpty(fields) || fields.length % 2 != 0) {
throw new IllegalArgumentException("'fields' should be non-null array with even number of elements");
}
RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(CLUSTER.getTypeFactory());
for (int i = 0; i < fields.length; i += 2) {
b.add((String) fields[i], CLUSTER.getTypeFactory().createJavaType((Class<?>) fields[i + 1]));
}
return new TestTable(name, b.build(), size) {
@Override
public IgniteDistribution distribution() {
return distr;
}
@Override
public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forNodes(nodeIds);
}
};
}
}