blob: a9735ee2bed1a55472142eec2aab3772214a517d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.network.file;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.network.file.Channel.FILE_TRANSFER_CHANNEL;
import static org.apache.ignite.internal.network.file.messages.FileTransferError.fromThrowable;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import java.nio.file.Path;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
import org.apache.ignite.internal.network.file.messages.FileHeader;
import org.apache.ignite.internal.network.file.messages.FileTransferErrorMessageImpl;
import org.apache.ignite.internal.network.file.messages.FileTransferFactory;
import org.apache.ignite.internal.network.file.messages.FileTransferInitMessage;
import org.apache.ignite.internal.network.file.messages.FileTransferInitResponse;
import org.apache.ignite.internal.network.file.messages.FileTransferMessageType;
import org.apache.ignite.internal.network.file.messages.Identifier;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
@ExtendWith(WorkDirectoryExtension.class)
class FileTransferServiceImplTest extends BaseIgniteAbstractTest {
private static final String SOURCE_CONSISTENT_ID = "source";
private static final String TARGET_CONSISTENT_ID = "target";
private static final ClusterNode TARGET_NODE = new ClusterNodeImpl(
UUID.randomUUID().toString(),
TARGET_CONSISTENT_ID,
new NetworkAddress("target", 1234)
);
@WorkDirectory
private Path workDir;
private Path transferDir;
@Mock
private FileSender fileSender;
@Mock
private FileReceiver fileReceiver;
@Spy
private TestMessagingService messagingService = new TestMessagingService();
@Spy
private TestTopologyService topologyService = new TestTopologyService();
private FileTransferServiceImpl fileTransferService;
private final FileTransferFactory messageFactory = new FileTransferFactory();
@BeforeEach
void setUp() {
transferDir = workDir.resolve("transfer");
fileTransferService = new FileTransferServiceImpl(
10_000,
topologyService,
messagingService,
transferDir,
fileSender,
fileReceiver,
Executors.newSingleThreadExecutor()
);
assertThat(fileTransferService.startAsync(), willCompleteSuccessfully());
}
@AfterEach
void tearDown() {
assertThat(fileTransferService.stopAsync(), willCompleteSuccessfully());
}
@Test
void handlersAreAddedOnStart() {
verify(messagingService).addMessageHandler(eq(FileTransferMessageType.class), any(NetworkMessageHandler.class));
verify(topologyService).addEventHandler(any());
}
@Test
void fileTransfersCanceledWhenSenderLeft() {
topologyService.fireDisappearedEvent(new ClusterNodeImpl("node1", "sender", new NetworkAddress("localhost", 1234)));
verify(fileReceiver).cancelTransfersFromSender("sender");
}
@Test
void uploadFailsWhenInvokeReturnsResponseWithErrorOnFileTransferInitMessage() {
// Set messaging service to fail to invoke.
FileTransferInitResponse response = messageFactory.fileTransferInitResponse()
.error(fromThrowable(messageFactory, new RuntimeException("Test exception")))
.build();
doReturn(completedFuture(response))
.when(messagingService)
.invoke(eq(TARGET_CONSISTENT_ID), eq(FILE_TRANSFER_CHANNEL), any(FileTransferInitMessage.class), anyLong());
// Set file provider to provide a file.
Path path1 = FileGenerator.randomFile(workDir, 100);
fileTransferService.addFileProvider(Identifier.class, id -> completedFuture(List.of(path1)));
// Upload file to target.
CompletableFuture<Void> uploaded = fileTransferService.upload(TARGET_CONSISTENT_ID, messageFactory.identifier().build());
// Check that upload failed.
assertThat(uploaded, willThrow(RuntimeException.class, "Test exception"));
}
@Test
void uploadFailsWhenSenderReturnsException() {
// Set messaging service to fail to invoke.
FileTransferInitResponse fileTransferInitResponse = messageFactory.fileTransferInitResponse().build();
doReturn(completedFuture(fileTransferInitResponse))
.when(messagingService)
.invoke(eq(TARGET_CONSISTENT_ID), eq(FILE_TRANSFER_CHANNEL), any(FileTransferInitMessage.class), anyLong());
doReturn(failedFuture(new RuntimeException("Test exception")))
.when(fileSender)
.send(eq(TARGET_CONSISTENT_ID), any(UUID.class), anyList());
// Set file provider to provide a file.
Path path1 = FileGenerator.randomFile(workDir, 100);
fileTransferService.addFileProvider(Identifier.class, id -> completedFuture(List.of(path1)));
// Upload file to target.
CompletableFuture<Void> uploaded = fileTransferService.upload(TARGET_CONSISTENT_ID, messageFactory.identifier().build());
// Check that upload failed.
assertThat(uploaded, willThrow(RuntimeException.class, "Test exception"));
// Check that error message was sent.
await().untilAsserted(() -> {
verify(messagingService, atLeastOnce())
.send(eq(TARGET_CONSISTENT_ID), eq(FILE_TRANSFER_CHANNEL), any(FileTransferErrorMessageImpl.class));
});
}
@Test
void uploadFailsWhenInvokeReturnsExceptionOnFileTransferInitResponse() {
long correlationId = 1L;
// Set messaging service to fail to send upload response.
RuntimeException testException = new RuntimeException("Test exception");
doReturn(failedFuture(testException))
.when(messagingService)
.respond(eq(TARGET_CONSISTENT_ID), eq(FILE_TRANSFER_CHANNEL), any(FileTransferInitResponse.class),
eq(correlationId));
// Set file receiver to complete transfer registration.
CompletableFuture<UUID> registeredTransferIdFuture = new CompletableFuture<>();
CompletableFuture<List<Path>> transferredFilesFuture = new CompletableFuture<>();
doAnswer(invocation -> {
registeredTransferIdFuture.complete(invocation.getArgument(1, UUID.class));
return transferredFilesFuture;
}).when(fileReceiver).registerTransfer(eq(TARGET_CONSISTENT_ID), any(UUID.class), anyList(), any(Path.class));
// Set file receiver to complete transfer cancellation.
CompletableFuture<UUID> candeledTransferIdFuture = new CompletableFuture<>();
doAnswer(invocation -> {
candeledTransferIdFuture.complete(invocation.getArgument(0, UUID.class));
transferredFilesFuture.completeExceptionally(invocation.getArgument(1, RuntimeException.class));
return null;
}).when(fileReceiver).cancelTransfer(any(UUID.class), any(RuntimeException.class));
Path path1 = FileGenerator.randomFile(workDir, 0);
List<Path> paths = List.of(path1);
FileTransferInitMessage uploadRequest = messageFactory.fileTransferInitMessage()
.transferId(UUID.randomUUID())
.identifier(messageFactory.identifier().build())
.headers(FileHeader.fromPaths(messageFactory, paths))
.build();
messagingService.fireMessage(uploadRequest, TARGET_NODE, correlationId);
// Check that transfer was registered and canceled.
assertThat(registeredTransferIdFuture, willCompleteSuccessfully());
assertThat(candeledTransferIdFuture, willBe(registeredTransferIdFuture.join()));
// Check that transfer directory is empty.
await().untilAsserted(() -> assertThat(transferDir.toFile().listFiles(), emptyArray()));
}
@Test
void downloadFailsWhenInvokeReturnsExceptionOnDownloadRequest() {
// Set messaging service to fail to send download request.
RuntimeException testException = new RuntimeException("Test exception");
doReturn(failedFuture(testException))
.when(messagingService)
.invoke(eq(SOURCE_CONSISTENT_ID), eq(FILE_TRANSFER_CHANNEL), any(FileDownloadRequest.class), any(Long.class));
CompletableFuture<List<Path>> downloaded = fileTransferService.download(SOURCE_CONSISTENT_ID, messageFactory.identifier().build(),
workDir.resolve("download"));
// Check that download future is completed exceptionally.
assertThat(downloaded, willThrow(RuntimeException.class, "Test exception"));
// Check that transfer directory is empty.
await().untilAsserted(() -> assertThat(transferDir.toFile().listFiles(), nullValue()));
}
@Test
void downloadFailsWhenInvokeReturnsResponseWithErrorOnDownloadRequest() {
// Set messaging service to fail to send download request.
FileDownloadResponse response = messageFactory.fileDownloadResponse()
.error(fromThrowable(messageFactory, new RuntimeException("Test exception")))
.build();
doReturn(completedFuture(response))
.when(messagingService)
.invoke(eq(SOURCE_CONSISTENT_ID), eq(FILE_TRANSFER_CHANNEL), any(FileDownloadRequest.class), any(Long.class));
CompletableFuture<List<Path>> downloaded = fileTransferService.download(SOURCE_CONSISTENT_ID, messageFactory.identifier().build(),
workDir.resolve("download"));
// Check that download future is completed exceptionally.
assertThat(downloaded, willThrow(RuntimeException.class, "Test exception"));
// Check that transfer directory is empty.
await().untilAsserted(() -> assertThat(transferDir.toFile().listFiles(), nullValue()));
}
@Test
void transferIsRegisteredBeforeResponseIsSent() {
// Set file receiver to complete transfer registration.
AtomicInteger transferLifecycleState = new AtomicInteger(0);
doAnswer(invocation -> {
// Update transfer lifecycle state - transfer is registered.
transferLifecycleState.compareAndSet(0, 1);
return (TransferredFilesCollector) CompletableFutures::emptyListCompletedFuture;
}).when(fileReceiver).registerTransfer(anyString(), any(UUID.class), anyList(), any(Path.class));
// Set messaging service to fail to send upload response if transfer is not registered.
doAnswer(invocation -> {
// Check lifecycle state - transfer is registered.
transferLifecycleState.compareAndSet(1, 2);
return nullCompletedFuture();
}).when(messagingService).respond(anyString(), eq(FILE_TRANSFER_CHANNEL), any(FileTransferInitResponse.class), anyLong());
// Create file transfer request.
Path path1 = FileGenerator.randomFile(workDir, 0);
List<Path> paths = List.of(path1);
FileTransferInitMessage fileTransferInitMessage = messageFactory.fileTransferInitMessage()
.transferId(UUID.randomUUID())
.identifier(messageFactory.identifier().build())
.headers(FileHeader.fromPaths(messageFactory, paths))
.build();
// Send file transfer request.
messagingService.fireMessage(fileTransferInitMessage, TARGET_NODE, 1L);
// Check that transfer was registered before response was sent.
await().untilAtomic(transferLifecycleState, equalTo(2));
}
}