blob: 57049fc974882d7d88a4962c1421b2ade50db998 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.api;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.util.ListToInternalSqlRowAdapter;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
/**
* Tests to verify {@link IgniteSqlImpl}.
*/
@SuppressWarnings("ThrowableNotThrown")
@ExtendWith(MockitoExtension.class)
class IgniteSqlImplTest extends BaseIgniteAbstractTest {
private final AtomicLong clock = new AtomicLong();
@Mock
private QueryProcessor queryProcessor;
private IgniteSqlImpl igniteSql;
@BeforeEach
void setUp() {
clock.set(1L);
igniteSql = new IgniteSqlImpl(queryProcessor, mock(IgniteTransactions.class));
}
@Test
@SuppressWarnings("unchecked")
void resultSetIsUnregisteredWhenUnderlyingCursorIsClosed() {
AsyncSqlCursor<InternalSqlRow> result = mock(AsyncSqlCursor.class);
when(result.requestNextAsync(anyInt()))
.thenReturn(completedFuture(new BatchedResult<>(List.of(new ListToInternalSqlRowAdapter(List.of(0L))), true)));
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
when(result.onClose())
.thenReturn(closeFuture);
when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class)))
.thenReturn(completedFuture(result));
AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1"));
assertThat(rs, notNullValue());
assertThat(igniteSql.openedCursors(), hasSize(1));
closeFuture.complete(null);
assertThat(igniteSql.openedCursors(), empty());
}
@Test
@SuppressWarnings("unchecked")
void internalCursorIsClosedWhenNodeIsStopped() throws Exception {
AsyncSqlCursor<InternalSqlRow> result = mock(AsyncSqlCursor.class);
when(result.requestNextAsync(anyInt()))
.thenReturn(completedFuture(new BatchedResult<>(List.of(new ListToInternalSqlRowAdapter(List.of(0L))), true)));
when(result.onClose()).thenReturn(new CompletableFuture<>());
when(result.closeAsync()).thenReturn(nullCompletedFuture());
when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class)))
.thenReturn(completedFuture(result));
AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1"));
assertThat(rs, notNullValue());
assertThat(igniteSql.openedCursors(), hasSize(1));
assertThat(igniteSql.stopAsync(), willCompleteSuccessfully());
verify(result).closeAsync();
}
@Test
@SuppressWarnings("unchecked")
void resultSetIsNotCreatedIfComponentIsStoppedInMiddleOfOperation() throws Exception {
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture = new CompletableFuture<>();
CountDownLatch executeQueryLatch = new CountDownLatch(1);
when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class)))
.thenAnswer(ignored -> {
executeQueryLatch.countDown();
return cursorFuture;
});
CompletableFuture<?> result = igniteSql.executeAsync(null, "SELECT 1");
assertThat(executeQueryLatch.await(5, TimeUnit.SECONDS), is(true));
assertThat(igniteSql.stopAsync(), willCompleteSuccessfully());
AsyncSqlCursor<InternalSqlRow> cursor = mock(AsyncSqlCursor.class);
cursorFuture.complete(cursor);
assertThrowsSqlException(
NODE_STOPPING_ERR,
"Node is stopping",
() -> await(result)
);
assertThat(igniteSql.openedCursors(), empty());
verify(cursor).closeAsync();
}
@Test
@SuppressWarnings("unchecked")
void batchProcessingIsInterruptedIfComponentIsStoppedInMiddle() throws Exception {
AsyncSqlCursor<InternalSqlRow> dummyResult = mock(AsyncSqlCursor.class);
when(dummyResult.requestNextAsync(anyInt()))
.thenReturn(completedFuture(new BatchedResult<>(List.of(new ListToInternalSqlRowAdapter(List.of(0L))), false)));
when(dummyResult.closeAsync())
.thenReturn(nullCompletedFuture());
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture = new CompletableFuture<>();
CountDownLatch executeQueryLatch = new CountDownLatch(3);
when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class)))
.thenAnswer(ignored -> {
executeQueryLatch.countDown();
if (executeQueryLatch.getCount() > 0) {
return completedFuture(dummyResult);
}
return cursorFuture;
});
BatchedArguments args = BatchedArguments.create();
for (int i = 0; i < 10; i++) {
args.add(i);
}
CompletableFuture<?> result = igniteSql.executeBatchAsync(null, "SELECT 1", args);
assertThat(executeQueryLatch.await(5, TimeUnit.SECONDS), is(true));
assertThat(igniteSql.stopAsync(), willCompleteSuccessfully());
AsyncSqlCursor<InternalSqlRow> cursor = mock(AsyncSqlCursor.class);
cursorFuture.complete(cursor);
assertThrowsSqlException(
NODE_STOPPING_ERR,
"Node is stopping",
() -> await(result)
);
assertThat(igniteSql.openedCursors(), empty());
verify(queryProcessor, times(3)).queryAsync(any(), any(), any(), any(), any(Object[].class));
verify(cursor).closeAsync();
}
@Test
@SuppressWarnings("unchecked")
void resultSetAreClosedWhenReadTillEnd() {
AsyncSqlCursor<InternalSqlRow> result = mock(AsyncSqlCursor.class);
AtomicBoolean hasMore = new AtomicBoolean(true);
when(result.requestNextAsync(anyInt()))
.thenAnswer(ignored -> completedFuture(new BatchedResult<>(List.of(List.of(0L)), hasMore.get())));
when(result.queryType())
.thenReturn(SqlQueryType.QUERY);
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
when(result.closeAsync())
.thenAnswer(ignored -> {
closeFuture.complete(null);
return closeFuture;
});
when(result.onClose())
.thenReturn(closeFuture);
when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class)))
.thenReturn(completedFuture(result));
AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1"));
assertThat(rs, notNullValue());
assertThat(igniteSql.openedCursors(), hasSize(1));
await(rs.fetchNextPage());
await(rs.fetchNextPage());
// still opened
assertThat(igniteSql.openedCursors(), hasSize(1));
hasMore.set(false);
await(rs.fetchNextPage());
assertThat(igniteSql.openedCursors(), empty());
}
@Test
public void scriptIteratesOverCursors() {
AsyncSqlCursor<InternalSqlRow> cursor1 = mock(AsyncSqlCursor.class, "cursor1");
AsyncSqlCursor<InternalSqlRow> cursor2 = mock(AsyncSqlCursor.class, "cursor2");
when(cursor1.hasNextResult()).thenReturn(true);
when(cursor1.nextResult()).thenReturn(completedFuture(cursor2));
when(cursor1.closeAsync()).thenReturn(nullCompletedFuture());
when(cursor2.hasNextResult()).thenReturn(false);
when(cursor2.closeAsync()).thenReturn(nullCompletedFuture());
when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class)))
.thenReturn(completedFuture(cursor1));
Void rs = await(igniteSql.executeScriptAsync("SELECT 1; SELECT 2"));
assertNull(rs);
assertThat(igniteSql.openedCursors(), empty());
}
@Test
public void scriptRethrowsExceptionFromCursor() {
AsyncSqlCursor<InternalSqlRow> cursor1 = mock(AsyncSqlCursor.class);
when(cursor1.hasNextResult()).thenReturn(true);
when(cursor1.nextResult()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("Broken")));
when(cursor1.closeAsync()).thenReturn(nullCompletedFuture());
when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class)))
.thenReturn(completedFuture(cursor1));
assertThrowsSqlException(
INTERNAL_ERR,
"Broken",
() -> await(igniteSql.executeScriptAsync("SELECT 1; SELECT 2"))
);
assertThat(igniteSql.openedCursors(), empty());
}
@Test
public void scriptRethrowsCloseCursorException() {
AsyncSqlCursor<InternalSqlRow> cursor1 = mock(AsyncSqlCursor.class, "cursor1");
AsyncSqlCursor<InternalSqlRow> cursor2 = mock(AsyncSqlCursor.class, "cursor2");
Exception cursorCloseException1 = new IllegalStateException("cursor1");
Exception cursorCloseException2 = new IllegalStateException("cursor2");
Exception lastCursorScriptException = new IllegalStateException("Script exception");
when(cursor1.hasNextResult()).thenReturn(true);
when(cursor1.nextResult()).thenReturn(completedFuture(cursor2));
when(cursor1.closeAsync()).thenReturn(CompletableFuture.failedFuture(cursorCloseException1));
when(cursor2.hasNextResult()).thenReturn(true);
when(cursor2.nextResult()).thenReturn(CompletableFuture.failedFuture(lastCursorScriptException));
when(cursor2.closeAsync()).thenReturn(CompletableFuture.failedFuture(cursorCloseException2));
when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class)))
.thenReturn(completedFuture(cursor1));
SqlException sqlEx = assertThrowsExactly(SqlException.class, () -> await(igniteSql.executeScriptAsync("SELECT 1; SELECT 2")));
Exception cause = cause(sqlEx, lastCursorScriptException.getClass(), lastCursorScriptException.getMessage());
assertNotNull(cause);
assertEquals(2, cause.getSuppressed().length);
assertSame(cursorCloseException1, cause.getSuppressed()[0]);
assertSame(cursorCloseException2, cause.getSuppressed()[1]);
}
@Test
public void scriptTerminatesWhenComponentIsStopped() {
AsyncSqlCursor<InternalSqlRow> cursor1 = mock(AsyncSqlCursor.class, "cursor1");
AsyncSqlCursor<InternalSqlRow> cursor2 = mock(AsyncSqlCursor.class, "cursor2");
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursor2Fut = new CompletableFuture<>();
when(cursor1.hasNextResult()).thenReturn(true);
when(cursor1.nextResult()).thenAnswer(ignored -> cursor2Fut);
when(cursor1.closeAsync()).thenReturn(nullCompletedFuture());
when(cursor2.closeAsync()).thenReturn(nullCompletedFuture());
when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class)))
.thenReturn(completedFuture(cursor1));
Thread thread = new Thread(() -> {
try {
assertThat(igniteSql.stopAsync(), willCompleteSuccessfully());
} finally {
cursor2Fut.complete(cursor2);
}
});
assertThrowsSqlException(
NODE_STOPPING_ERR,
"Node is stopping",
() -> {
CompletableFuture<Void> f = igniteSql.executeScriptAsync("SELECT 1; SELECT 2");
thread.start();
await(f);
}
);
assertThat(igniteSql.openedCursors(), empty());
}
}