blob: 46429dee84e5d688466e60b288d4b9d066903caa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.exec;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.framework.TestCluster;
import org.apache.ignite.internal.sql.engine.framework.TestNode;
import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.property.SqlProperties;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
import org.apache.ignite.internal.sql.engine.util.InjectQueryCheckerFactory;
import org.apache.ignite.internal.sql.engine.util.QueryChecker;
import org.apache.ignite.internal.sql.engine.util.QueryCheckerExtension;
import org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.tx.IgniteTransactions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
/** Transactions enlist count test. */
@ExtendWith(QueryCheckerExtension.class)
public class TransactionEnlistTest extends BaseIgniteAbstractTest {
private static final String NODE_NAME1 = "N1";
@InjectQueryCheckerFactory
private static QueryCheckerFactory queryCheckerFactory;
private static final TestCluster CLUSTER = TestBuilders.cluster()
.nodes(NODE_NAME1, true)
.addTable()
.name("T1")
.addKeyColumn("ID", NativeTypes.INT32)
.addColumn("VAL", NativeTypes.INT32)
.end()
.dataProvider(NODE_NAME1, "T1", TestBuilders.tableScan(DataProvider.fromCollection(List.of())))
.build(); // add method use table partitions
@BeforeAll
static void startCluster() {
CLUSTER.start();
}
@AfterAll
static void stopCluster() throws Exception {
CLUSTER.stop();
}
/**
* Check that tx enlist is called only for exact partitions.
*/
@Test
void testEnlistCall() {
NoOpTransaction tx = NoOpTransaction.readWrite("t1");
NoOpTransaction spiedTx = Mockito.spy(tx);
try {
assertQuery("INSERT INTO t1 VALUES(1, 2), (2, 3)", spiedTx).check();
} catch (Exception ex) {
// No op.
}
Mockito.verify(spiedTx, times(2)).enlist(any(), any());
}
private static QueryChecker assertQuery(String qry, InternalTransaction tx) {
TestNode testNode = CLUSTER.node(NODE_NAME1);
return queryCheckerFactory.create(
testNode.name(),
new TestQueryProcessor(testNode, false),
null,
tx,
qry
);
}
private static class TestQueryProcessor implements QueryProcessor {
private final TestNode node;
private final boolean prepareOnly;
TestQueryProcessor(TestNode node, boolean prepareOnly) {
this.node = node;
this.prepareOnly = prepareOnly;
}
@Override
public CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties properties,
@Nullable InternalTransaction transaction, String qry, Object... params) {
assert params == null || params.length == 0 : "params are not supported";
assert prepareOnly : "Expected that the query will be executed";
QueryPlan plan = node.prepare(qry);
return CompletableFuture.completedFuture(new QueryMetadata(plan.metadata(), plan.parameterMetadata()));
}
@Override
public CompletableFuture<AsyncSqlCursor<InternalSqlRow>> queryAsync(
SqlProperties properties,
IgniteTransactions transactions,
@Nullable InternalTransaction transaction,
String qry,
Object... params
) {
assert params == null || params.length == 0 : "params are not supported";
assert !prepareOnly : "Expected that the query will only be prepared, but not executed";
QueryPlan plan = node.prepare(qry);
AsyncCursor<InternalSqlRow> dataCursor = node.executePlan(plan, transaction);
SqlQueryType type = plan.type();
assert type != null;
AsyncSqlCursor<InternalSqlRow> sqlCursor = new AsyncSqlCursorImpl<>(
type,
plan.metadata(),
new QueryTransactionWrapperImpl(transaction != null ? transaction : new NoOpTransaction("test"), false),
dataCursor,
nullCompletedFuture(),
null
);
return CompletableFuture.completedFuture(sqlCursor);
}
@Override
public CompletableFuture<Void> startAsync() {
// NO-OP
return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> stopAsync() {
// NO-OP
return nullCompletedFuture();
}
}
}