blob: b49f4f3a471def129b8ee2dd1ba9e6d39fd7b7fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.exchange;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.thrift.TException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SourceHandleTest {
private static final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
private static long maxBytesPerFI;
@BeforeClass
public static void beforeClass() {
maxBytesPerFI = IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
IoTDBDescriptor.getInstance().getConfig().setMaxBytesPerFragmentInstance(5 * MOCK_TSBLOCK_SIZE);
}
@AfterClass
public static void afterClass() {
IoTDBDescriptor.getInstance().getConfig().setMaxBytesPerFragmentInstance(maxBytesPerFI);
}
@Test
public void testNonBlockedOneTimeReceive() {
final String queryId = "q0";
final int numOfMockTsBlock = 10;
final TEndPoint remoteEndpoint =
new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
// Construct a mock LocalMemoryManager that do not block any reservation.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
Mockito.mock(IClientManager.class);
// Construct a mock client.
SyncDataNodeMPPDataExchangeServiceClient mockClient =
Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
try {
Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doAnswer(
invocation -> {
TGetDataBlockRequest req = invocation.getArgument(0);
List<ByteBuffer> byteBuffers =
new ArrayList<>(req.getEndSequenceId() - req.getStartSequenceId());
for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); i++) {
byteBuffers.add(ByteBuffer.allocate(0));
}
return new TGetDataBlockResponse(byteBuffers);
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
} catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
// Construct a mock SourceHandleListener.
SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
SourceHandle sourceHandle =
new SourceHandle(
remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
0,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
mockTsBlockSerde,
mockSourceHandleListener,
mockClientManager);
Assert.assertFalse(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
// New data blocks event arrived.
sourceHandle.updatePendingDataBlockInfo(
0,
Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
Mockito.verify(mockClient, Mockito.timeout(10_000).times(1))
.getDataBlock(
Mockito.argThat(
req ->
remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId())
&& 0 == req.getStartSequenceId()
&& numOfMockTsBlock == req.getEndSequenceId()));
Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1))
.onAcknowledgeDataBlockEvent(
Mockito.argThat(
e ->
remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId())
&& 0 == e.getStartSequenceId()
&& numOfMockTsBlock == e.getEndSequenceId()));
} catch (TException e) {
e.printStackTrace();
Assert.fail();
}
Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
numOfMockTsBlock * MOCK_TSBLOCK_SIZE, sourceHandle.getBufferRetainedSizeInBytes());
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
sourceHandle.receive();
if (i < numOfMockTsBlock - 1) {
Assert.assertTrue(sourceHandle.isBlocked().isDone());
} else {
Assert.assertFalse(sourceHandle.isBlocked().isDone());
}
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
(numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
}
// Receive EndOfDataBlock event from upstream fragment instance.
sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertTrue(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockSourceHandleListener, Mockito.timeout(10_0000).times(1))
.onFinished(sourceHandle);
}
@Test
public void testBlockedOneTimeReceive() {
final String queryId = "q0";
final int numOfMockTsBlock = 10;
final TEndPoint remoteEndpoint =
new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
// Construct a mock LocalMemoryManager with capacity 5 * MOCK_TSBLOCK_SIZE per query.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool spyMemoryPool =
Mockito.spy(new MemoryPool("test", 10 * MOCK_TSBLOCK_SIZE, 5 * MOCK_TSBLOCK_SIZE));
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
Mockito.mock(IClientManager.class);
// Construct a mock client.
SyncDataNodeMPPDataExchangeServiceClient mockClient =
Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
try {
Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doAnswer(
invocation -> {
TGetDataBlockRequest req = invocation.getArgument(0);
List<ByteBuffer> byteBuffers =
new ArrayList<>(req.getEndSequenceId() - req.getStartSequenceId());
for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); i++) {
byteBuffers.add(ByteBuffer.allocate(0));
}
return new TGetDataBlockResponse(byteBuffers);
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
} catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
// Construct a mock SourceHandleListener.
SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
SourceHandle sourceHandle =
new SourceHandle(
remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
0,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
mockTsBlockSerde,
mockSourceHandleListener,
mockClientManager);
long maxBytesCanReserve =
Math.min(
5 * MOCK_TSBLOCK_SIZE,
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance());
sourceHandle.setMaxBytesCanReserve(maxBytesCanReserve);
Assert.assertFalse(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
// New data blocks event arrived.
sourceHandle.updatePendingDataBlockInfo(
0,
Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
.reserve(
queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
localFragmentInstanceId),
localPlanNodeId,
MOCK_TSBLOCK_SIZE,
maxBytesCanReserve);
Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1))
.getDataBlock(
Mockito.argThat(
req ->
remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId())
&& 0 == req.getStartSequenceId()
&& 5 == req.getEndSequenceId()));
Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1))
.onAcknowledgeDataBlockEvent(
Mockito.argThat(
e ->
remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId())
&& 0 == e.getStartSequenceId()
&& 5 == e.getEndSequenceId()));
} catch (TException e) {
e.printStackTrace();
Assert.fail();
}
Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(6 * MOCK_TSBLOCK_SIZE, sourceHandle.getBufferRetainedSizeInBytes());
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i))
.free(
queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
localFragmentInstanceId),
localPlanNodeId,
MOCK_TSBLOCK_SIZE);
sourceHandle.receive();
try {
if (i < 5) {
Assert.assertEquals(
i == 4 ? 5 * MOCK_TSBLOCK_SIZE : 6 * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
final int startSequenceId = 5 + i;
Mockito.verify(mockClient, Mockito.timeout(10_000).times(1))
.getDataBlock(
Mockito.argThat(
req ->
remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId())
&& startSequenceId == req.getStartSequenceId()
&& startSequenceId + 1 == req.getEndSequenceId()));
Mockito.verify(mockClient, Mockito.timeout(10_000).times(1))
.onAcknowledgeDataBlockEvent(
Mockito.argThat(
e ->
remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId())
&& startSequenceId == e.getStartSequenceId()
&& startSequenceId + 1 == e.getEndSequenceId()));
} else {
Assert.assertEquals(
(numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
}
} catch (TException e) {
e.printStackTrace();
Assert.fail();
}
if (i < numOfMockTsBlock - 1) {
Assert.assertTrue(sourceHandle.isBlocked().isDone());
} else {
Assert.assertFalse(sourceHandle.isBlocked().isDone());
}
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
}
// Receive EndOfDataBlock event from upstream fragment instance.
sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertTrue(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockSourceHandleListener, Mockito.timeout(10_0000).times(1))
.onFinished(sourceHandle);
}
@Test
public void testMultiTimesReceive() {
final String queryId = "q0";
final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final TEndPoint remoteEndpoint =
new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
// Construct a mock LocalMemoryManager that returns unblocked futures.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
// Construct a mock SourceHandleListener.
SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
Mockito.mock(IClientManager.class);
// Construct a mock client.
SyncDataNodeMPPDataExchangeServiceClient mockClient =
Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
try {
Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doAnswer(
invocation -> {
TGetDataBlockRequest req = invocation.getArgument(0);
List<ByteBuffer> byteBuffers =
new ArrayList<>(req.getEndSequenceId() - req.getStartSequenceId());
for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); i++) {
byteBuffers.add(ByteBuffer.allocate(0));
}
return new TGetDataBlockResponse(byteBuffers);
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
} catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
SourceHandle sourceHandle =
new SourceHandle(
remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
0,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
mockTsBlockSerde,
mockSourceHandleListener,
mockClientManager);
Assert.assertFalse(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
// New data blocks event arrived in unordered manner.
sourceHandle.updatePendingDataBlockInfo(
numOfMockTsBlock,
Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
Mockito.verify(mockClient, Mockito.timeout(10_000).times(0))
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
Mockito.verify(mockClient, Mockito.timeout(10_0000).times(0))
.onAcknowledgeDataBlockEvent(Mockito.any(TAcknowledgeDataBlockEvent.class));
} catch (TException e) {
e.printStackTrace();
Assert.fail();
}
Assert.assertFalse(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
sourceHandle.updatePendingDataBlockInfo(
0,
Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
Mockito.verify(mockClient, Mockito.timeout(10_000).times(1))
.getDataBlock(
Mockito.argThat(
req ->
remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId())
&& 0 == req.getStartSequenceId()
&& numOfMockTsBlock * 2 == req.getEndSequenceId()));
Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1))
.onAcknowledgeDataBlockEvent(
Mockito.argThat(
e ->
remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId())
&& 0 == e.getStartSequenceId()
&& numOfMockTsBlock * 2 == e.getEndSequenceId()));
} catch (TException e) {
e.printStackTrace();
Assert.fail();
}
Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
numOfMockTsBlock * 2 * MOCK_TSBLOCK_SIZE, sourceHandle.getBufferRetainedSizeInBytes());
// The local fragment instance consumes the data blocks.
for (int i = 0; i < 2 * numOfMockTsBlock; i++) {
sourceHandle.receive();
if (i < 2 * numOfMockTsBlock - 1) {
Assert.assertTrue(sourceHandle.isBlocked().isDone());
} else {
Assert.assertFalse(sourceHandle.isBlocked().isDone());
}
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
(2 * numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
}
// New data blocks event arrived.
sourceHandle.updatePendingDataBlockInfo(
numOfMockTsBlock * 2,
Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
Mockito.verify(mockClient, Mockito.timeout(10_000).times(1))
.getDataBlock(
Mockito.argThat(
req ->
remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId())
&& numOfMockTsBlock * 2 == req.getStartSequenceId()
&& numOfMockTsBlock * 3 == req.getEndSequenceId()));
Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1))
.onAcknowledgeDataBlockEvent(
Mockito.argThat(
e ->
remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId())
&& numOfMockTsBlock * 2 == e.getStartSequenceId()
&& numOfMockTsBlock * 3 == e.getEndSequenceId()));
} catch (TException e) {
e.printStackTrace();
Assert.fail();
}
Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
numOfMockTsBlock * MOCK_TSBLOCK_SIZE, sourceHandle.getBufferRetainedSizeInBytes());
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
sourceHandle.receive();
if (i < numOfMockTsBlock - 1) {
Assert.assertTrue(sourceHandle.isBlocked().isDone());
} else {
Assert.assertFalse(sourceHandle.isBlocked().isDone());
}
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
(numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
}
// Receive EndOfDataBlock event from upstream fragment instance.
sourceHandle.setNoMoreTsBlocks(3 * numOfMockTsBlock - 1);
Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertTrue(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockSourceHandleListener, Mockito.timeout(10_0000).times(1))
.onFinished(sourceHandle);
}
@Test
public void testFailedReceive() {
final String queryId = "q0";
final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final TEndPoint remoteEndpoint =
new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
// Construct a mock LocalMemoryManager that returns unblocked futures.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
// Construct a mock SourceHandleListener.
SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
Mockito.mock(IClientManager.class);
// Construct a mock client.
SyncDataNodeMPPDataExchangeServiceClient mockClient =
Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
TException mockException = new TException("Mock exception");
try {
Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doThrow(mockException)
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
} catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
SourceHandle sourceHandle =
new SourceHandle(
remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
0,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
mockTsBlockSerde,
mockSourceHandleListener,
mockClientManager);
sourceHandle.setRetryIntervalInMs(0L);
Future<?> blocked = sourceHandle.isBlocked();
Assert.assertFalse(blocked.isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
// New data blocks event arrived.
sourceHandle.updatePendingDataBlockInfo(
0,
Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
Mockito.verify(mockClient, Mockito.timeout(10_000).times(SourceHandle.MAX_ATTEMPT_TIMES))
.getDataBlock(Mockito.any());
} catch (TException e) {
e.printStackTrace();
Assert.fail();
}
Mockito.verify(mockSourceHandleListener, Mockito.timeout(10_0000).times(1))
.onFailure(sourceHandle, mockException);
Assert.assertFalse(blocked.isDone());
sourceHandle.abort();
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertTrue(sourceHandle.isAborted());
Assert.assertTrue(blocked.isDone());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockSourceHandleListener, Mockito.timeout(10_0000).times(1))
.onAborted(sourceHandle);
}
@Test
public void testForceClose() {
final String queryId = "q0";
final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
final TEndPoint remoteEndpoint =
new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
// Construct a mock LocalMemoryManager that do not block any reservation.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
Mockito.mock(IClientManager.class);
// Construct a mock client.
SyncDataNodeMPPDataExchangeServiceClient mockClient =
Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
try {
Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doAnswer(
invocation -> {
TGetDataBlockRequest req = invocation.getArgument(0);
List<ByteBuffer> byteBuffers =
new ArrayList<>(req.getEndSequenceId() - req.getStartSequenceId());
for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); i++) {
byteBuffers.add(ByteBuffer.allocate(0));
}
return new TGetDataBlockResponse(byteBuffers);
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
} catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
// Construct a mock SourceHandleListener.
SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
SourceHandle sourceHandle =
new SourceHandle(
remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
0,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
mockTsBlockSerde,
mockSourceHandleListener,
mockClientManager);
Future<?> blocked = sourceHandle.isBlocked();
Assert.assertFalse(blocked.isDone());
Assert.assertFalse(blocked.isCancelled());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
sourceHandle.abort();
Assert.assertTrue(blocked.isDone());
Assert.assertTrue(blocked.isCancelled());
Assert.assertTrue(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockSourceHandleListener, Mockito.timeout(10_0000).times(1))
.onAborted(sourceHandle);
}
}