blob: b15b860570a39e4e7b10762b3e92a26a52e5c48c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.examples.filestore.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.examples.filestore.FileStoreClient;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import java.io.File;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
/**
* Subcommand to generate load in filestore data stream state machine.
*/
@Parameters(commandDescription = "Load Generator for FileStore DataStream")
public class DataStream extends Client {
enum Type {
DirectByteBuffer(DirectByteBufferType::new),
MappedByteBuffer(MappedByteBufferType::new),
NettyFileRegion(NettyFileRegionType::new);
private final BiFunction<String, DataStream, TransferType> constructor;
Type(BiFunction<String, DataStream, TransferType> constructor) {
this.constructor = constructor;
}
BiFunction<String, DataStream, TransferType> getConstructor() {
return constructor;
}
static Type valueOfIgnoreCase(String s) {
for (Type type : values()) {
if (type.name().equalsIgnoreCase(s)) {
return type;
}
}
return null;
}
}
// To be used as a Java annotation attribute value
private static final String DESCRIPTION = "[DirectByteBuffer, MappedByteBuffer, NettyFileRegion]";
{
// Assert if the description is correct.
final String expected = Arrays.asList(Type.values()).toString();
Preconditions.assertTrue(expected.equals(DESCRIPTION),
() -> "Unexpected description: " + DESCRIPTION + " does not equal to the expected string " + expected);
}
@Parameter(names = {"--type"}, description = DESCRIPTION, required = true)
private String dataStreamType = Type.NettyFileRegion.name();
@Parameter(names = {"--syncSize"}, description = "Sync every syncSize, syncSize % bufferSize should be zero," +
"-1 means on sync", required = true)
private int syncSize = -1;
int getSyncSize() {
return syncSize;
}
private boolean checkParam() {
if (syncSize != -1 && syncSize % getBufferSizeInBytes() != 0) {
System.err.println("Error: syncSize % bufferSize should be zero");
return false;
}
if (Type.valueOfIgnoreCase(dataStreamType) == null) {
System.err.println("Error: dataStreamType should be one of " + DESCRIPTION);
return false;
}
return true;
}
@Override
protected void operation(List<FileStoreClient> clients) throws IOException, ExecutionException, InterruptedException {
if (!checkParam()) {
close(clients);
return;
}
final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
List<String> paths = generateFiles(executor);
dropCache();
System.out.println("Starting DataStream write now ");
RoutingTable routingTable = getRoutingTable(Arrays.asList(getPeers()), getPrimary());
long startTime = System.currentTimeMillis();
long totalWrittenBytes = waitStreamFinish(streamWrite(paths, clients, routingTable, executor));
long endTime = System.currentTimeMillis();
System.out.println("Total files written: " + getNumFiles());
System.out.println("Each files size: " + getFileSizeInBytes());
System.out.println("Total data written: " + totalWrittenBytes + " bytes");
System.out.println("Total time taken: " + (endTime - startTime) + " millis");
close(clients);
}
private Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> streamWrite(
List<String> paths, List<FileStoreClient> clients, RoutingTable routingTable,
ExecutorService executor) {
Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> fileMap = new HashMap<>();
int clientIndex = 0;
for(String path : paths) {
final CompletableFuture<List<CompletableFuture<DataStreamReply>>> future = new CompletableFuture<>();
final FileStoreClient client = clients.get(clientIndex % clients.size());
clientIndex ++;
CompletableFuture.supplyAsync(() -> {
File file = new File(path);
final long fileLength = file.length();
Preconditions.assertTrue(fileLength == getFileSizeInBytes(), "Unexpected file size: expected size is "
+ getFileSizeInBytes() + " but actual size is " + fileLength);
final Type type = Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
.orElseThrow(IllegalStateException::new);
final TransferType writer = type.getConstructor().apply(path, this);
try {
future.complete(writer.transfer(client, routingTable));
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}, executor);
fileMap.put(path, future);
}
return fileMap;
}
private long waitStreamFinish(Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> fileMap)
throws ExecutionException, InterruptedException {
long totalBytes = 0;
for (CompletableFuture<List<CompletableFuture<DataStreamReply>>> futures : fileMap.values()) {
long writtenLen = 0;
for (CompletableFuture<DataStreamReply> future : futures.get()) {
writtenLen += future.join().getBytesWritten();
}
if (writtenLen != getFileSizeInBytes()) {
System.out.println("File written:" + writtenLen + " does not match expected:" + getFileSizeInBytes());
}
totalBytes += writtenLen;
}
return totalBytes;
}
abstract static class TransferType {
private final String path;
private final File file;
private final long fileSize;
private final int bufferSize;
private final long syncSize;
private long syncPosition = 0;
TransferType(String path, DataStream cli) {
this.path = path;
this.file = new File(path);
this.fileSize = cli.getFileSizeInBytes();
this.bufferSize = cli.getBufferSizeInBytes();
this.syncSize = cli.getSyncSize();
final long actualSize = file.length();
Preconditions.assertTrue(actualSize == fileSize, () -> "Unexpected file size: expected size is "
+ fileSize + " but actual size is " + actualSize + ", path=" + path);
}
File getFile() {
return file;
}
int getBufferSize() {
return bufferSize;
}
long getPacketSize(long offset) {
return Math.min(bufferSize, fileSize - offset);
}
boolean isSync(long position) {
if (syncSize > 0) {
if (position >= fileSize || position - syncPosition >= syncSize) {
syncPosition = position;
return true;
}
}
return false;
}
List<CompletableFuture<DataStreamReply>> transfer(
FileStoreClient client, RoutingTable routingTable) throws IOException {
if (fileSize <= 0) {
return Collections.emptyList();
}
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
final DataStreamOutput out = client.getStreamOutput(file.getName(), fileSize, routingTable);
try (FileChannel in = FileUtils.newFileChannel(file, StandardOpenOption.READ)) {
for (long offset = 0L; offset < fileSize; ) {
offset += write(in, out, offset, futures);
}
} catch (Throwable e) {
throw new IOException("Failed to transfer " + path);
} finally {
futures.add(out.closeAsync());
}
return futures;
}
abstract long write(FileChannel in, DataStreamOutput out, long offset,
List<CompletableFuture<DataStreamReply>> futures) throws IOException;
@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + "{" + path + ", size=" + fileSize + "}";
}
}
static class DirectByteBufferType extends TransferType {
DirectByteBufferType(String path, DataStream cli) {
super(path, cli);
}
@Override
long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures)
throws IOException {
final int bufferSize = getBufferSize();
final ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize);
final int bytesRead = buf.writeBytes(in, bufferSize);
if (bytesRead < 0) {
throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this
+ ". The channel has reached end-of-stream at " + offset);
} else if (bytesRead > 0) {
final CompletableFuture<DataStreamReply> f = isSync(offset + bytesRead) ?
out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) : out.writeAsync(buf.nioBuffer());
f.thenRun(buf::release);
futures.add(f);
}
return bytesRead;
}
}
static class MappedByteBufferType extends TransferType {
MappedByteBufferType(String path, DataStream cli) {
super(path, cli);
}
@Override
long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures)
throws IOException {
final long packetSize = getPacketSize(offset);
final MappedByteBuffer mappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, offset, packetSize);
final int remaining = mappedByteBuffer.remaining();
futures.add(isSync(offset + remaining) ?
out.writeAsync(mappedByteBuffer, StandardWriteOption.SYNC) : out.writeAsync(mappedByteBuffer));
return remaining;
}
}
static class NettyFileRegionType extends TransferType {
NettyFileRegionType(String path, DataStream cli) {
super(path, cli);
}
@Override
long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures) {
final long packetSize = getPacketSize(offset);
futures.add(isSync(offset + packetSize) ?
out.writeAsync(getFile(), offset, packetSize, StandardWriteOption.SYNC) :
out.writeAsync(getFile(), offset, packetSize));
return packetSize;
}
}
}