blob: 0ff87d49ac5c1bb3f2056ba7cc17b3cada2ee737 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.airavata.mft.agent;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.apache.airavata.mft.core.*;
import org.apache.airavata.mft.core.api.*;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
public class TransportMediator {
private static final Logger logger = LoggerFactory.getLogger(TransportMediator.class);
/*
Number of maximum transfers handled at atime
*/
private final ExecutorService monitorPool;
private String tempDataDir = "/tmp";
private final int chunkedSize;
private final boolean doChunkStreaming;
private final ExecutorService chunkedExecutorService;
public TransportMediator(String tempDataDir,
int concurrentTransfers,
int concurrentChunkedThreads,
int chunkedSize,
boolean doChunkStreaming) {
this.tempDataDir = tempDataDir;
monitorPool = Executors.newFixedThreadPool(concurrentTransfers);
this.chunkedSize = chunkedSize;
chunkedExecutorService = Executors.newFixedThreadPool(concurrentChunkedThreads);
this.doChunkStreaming = doChunkStreaming;
}
public void transferSingleThread(String transferId,
TransferApiRequest request,
ConnectorConfig srcCC,
ConnectorConfig dstCC,
BiConsumer<String, TransferState> onStatusCallback,
BiConsumer<String, Boolean> exitingCallback) {
final AtomicBoolean transferInProgress = new AtomicBoolean(true);
try {
logger.info("Stating transfer {}", transferId);
Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
.resolveIncomingStreamingConnector(request.getSourceType());
Optional<OutgoingStreamingConnector> outStreamingConnectorOp = ConnectorResolver
.resolveOutgoingStreamingConnector(request.getDestinationType());
Optional<IncomingChunkedConnector> inChunkedConnectorOp = ConnectorResolver
.resolveIncomingChunkedConnector(request.getSourceType());
Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
.resolveOutgoingChunkedConnector(request.getDestinationType());
onStatusCallback.accept(transferId, new TransferState()
.setPercentage(0)
.setState("RUNNING")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer is ongoing"));
long start = System.currentTimeMillis();
// Give priority for chunked transfers.
// TODO: Provide a preference at the API level
if (inChunkedConnectorOp.isPresent() && outChunkedConnectorOp.isPresent()) {
logger.info("Starting the chunked transfer for transfer {}", transferId);
long chunkSize = chunkedSize * 1024 * 1024L;
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(chunkedExecutorService);
long fileLength = srcCC.getMetadata().getResourceSize();
long uploadLength = 0L;
int chunkIdx = 0;
IncomingChunkedConnector inConnector = inChunkedConnectorOp
.orElseThrow(() -> new Exception("Could not find an in chunked connector for type " + request.getSourceType()));
OutgoingChunkedConnector outConnector = outChunkedConnectorOp
.orElseThrow(() -> new Exception("Could not find an out chunked connector for type " + request.getDestinationType()));
inConnector.init(srcCC);
outConnector.init(dstCC);
while(uploadLength < fileLength) {
long endPos = uploadLength + chunkSize;
if (endPos > fileLength) {
endPos = fileLength;
}
completionService.submit(new ChunkMover(inConnector,
outConnector, uploadLength, endPos, chunkIdx,
transferId, doChunkStreaming));
uploadLength = endPos;
chunkIdx++;
}
for (int i = 0; i < chunkIdx; i++) {
Future<Integer> future = completionService.take();
}
inConnector.complete();
outConnector.complete();
logger.info("Completed chunked transfer for transfer {}", transferId);
} else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) {
logger.info("Starting streaming transfer for transfer {}", transferId);
IncomingStreamingConnector inConnector = inStreamingConnectorOp
.orElseThrow(() -> new Exception("Could not find an in streaming connector for type " + request.getSourceType()));
OutgoingStreamingConnector outConnector = outStreamingConnectorOp
.orElseThrow(() -> new Exception("Could not find an out streaming connector for type " + request.getDestinationType()));
inConnector.init(srcCC);
outConnector.init(dstCC);
String srcChild = request.getSourceChildResourcePath();
String dstChild = request.getDestinationChildResourcePath();
InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
long count = 0;
final AtomicLong countAtomic = new AtomicLong();
countAtomic.set(count);
monitorPool.submit(() -> {
while (true) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// Ignore
}
if (!transferInProgress.get()) {
logger.info("Status monitor is exiting for transfer {}", transferId);
break;
}
double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize();
logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
onStatusCallback.accept(transferId, new TransferState()
.setPercentage(transferPercentage)
.setState("RUNNING")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer Progress Updated"));
}
});
int n;
byte[] buffer = new byte[128 * 1024];
for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
outputStream.write(buffer, 0, n);
countAtomic.set(count);
}
inConnector.complete();
outConnector.complete();
logger.info("Completed streaming transfer for transfer {}", transferId);
} else {
throw new Exception("No matching connector found to perform the transfer");
}
long endTime = System.currentTimeMillis();
double time = (endTime - start) / 1000.0;
logger.info("Transfer {} completed. Time {} S. Speed {} MB/s", transferId, time,
(srcCC.getMetadata().getResourceSize() * 1.0 / time) / (1024 * 1024));
onStatusCallback.accept(transferId, new TransferState()
.setPercentage(100)
.setState("COMPLETED")
.setUpdateTimeMils(endTime)
.setDescription("Transfer successfully completed"));
exitingCallback.accept(transferId, true);
} catch (Exception e) {
logger.error("Transfer {} failed with error", transferId, e);
onStatusCallback.accept(transferId, new TransferState()
.setPercentage(0)
.setState("FAILED")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
} finally {
transferInProgress.set(false);
}
}
public void destroy() {
monitorPool.shutdown();
}
private class ChunkMover implements Callable<Integer> {
IncomingChunkedConnector downloader;
OutgoingChunkedConnector uploader;
long startPos;
long endPos;
int chunkIdx;
String transferId;
boolean useStreaming;
public ChunkMover(IncomingChunkedConnector downloader, OutgoingChunkedConnector uploader, long startPos,
long endPos, int chunkIdx, String transferId, boolean useStreaming) {
this.downloader = downloader;
this.uploader = uploader;
this.startPos = startPos;
this.endPos = endPos;
this.chunkIdx = chunkIdx;
this.transferId = transferId;
this.useStreaming = useStreaming;
}
@Override
public Integer call() throws Exception {
try {
if (useStreaming) {
InputStream inputStream = downloader.downloadChunk(chunkIdx, startPos, endPos);
uploader.uploadChunk(chunkIdx, startPos, endPos, inputStream);
} else {
String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
new File(tempFile).delete();
}
return chunkIdx;
} catch (Exception e) {
logger.error("Failed to transfer ", e);
throw e;
}
}
}
}