blob: ba91866d71ab1903d96d1a2d6ac725e82b189ea7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.client.impl;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.protocol.*;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SlidingWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
* Streaming client implementation
* allows client to create streams and send asynchronously.
*/
public class DataStreamClientImpl implements DataStreamClient {
public static final Logger LOG = LoggerFactory.getLogger(DataStreamClientImpl.class);
private final RaftClient client;
private final ClientId clientId;
private final RaftGroupId groupId;
private final RaftPeer dataStreamServer;
private final DataStreamClientRpc dataStreamClientRpc;
private final OrderedStreamAsync orderedStreamAsync;
private final boolean skipSendForward;
DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer dataStreamServer,
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
this.skipSendForward = RaftConfigKeys.DataStream.skipSendForward(properties, LOG::info);
this.client = null;
this.clientId = clientId;
this.groupId = groupId;
this.dataStreamServer = dataStreamServer;
this.dataStreamClientRpc = dataStreamClientRpc;
this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, properties);
}
DataStreamClientImpl(RaftClient client, RaftPeer dataStreamServer,
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
this.skipSendForward = RaftConfigKeys.DataStream.skipSendForward(properties, LOG::info);
this.client = client;
this.clientId = client.getId();
this.groupId = client.getGroupId();
this.dataStreamServer = dataStreamServer;
this.dataStreamClientRpc = dataStreamClientRpc;
this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, properties);
}
public final class DataStreamOutputImpl implements DataStreamOutputRpc {
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
private final SlidingWindow.Client<OrderedStreamAsync.DataStreamWindowRequest, DataStreamReply> slidingWindow;
private final CompletableFuture<RaftClientReply> raftClientReplyFuture = new CompletableFuture<>();
private CompletableFuture<DataStreamReply> closeFuture;
private final MemoizedSupplier<WritableByteChannel> writableByteChannelSupplier
= JavaUtils.memoize(() -> new WritableByteChannel() {
@Override
public int write(ByteBuffer src) throws IOException {
final int remaining = src.remaining();
// flush each call; otherwise the future will not be completed.
final DataStreamReply reply = IOUtils.getFromFuture(writeAsync(src, StandardWriteOption.FLUSH),
() -> "write(" + remaining + " bytes for " + ClientInvocationId.valueOf(header) + ")");
return Math.toIntExact(reply.getBytesWritten());
}
@Override
public boolean isOpen() {
return !isClosed();
}
@Override
public void close() throws IOException {
if (isClosed()) {
return;
}
IOUtils.getFromFuture(writeAsync(DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER, StandardWriteOption.CLOSE),
() -> "close(" + ClientInvocationId.valueOf(header) + ")");
}
});
private long streamOffset = 0;
private DataStreamOutputImpl(RaftClientRequest request) {
this.header = request;
this.slidingWindow = new SlidingWindow.Client<>(ClientInvocationId.valueOf(clientId, header.getCallId()));
final ByteBuffer buffer = ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
// TODO: RATIS-1938: In order not to auto-flush the header, remove the FLUSH below.
this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining(),
Collections.singleton(StandardWriteOption.FLUSH));
}
private CompletableFuture<DataStreamReply> send(Type type, Object data, long length,
Iterable<WriteOption> options) {
final DataStreamRequestHeader h =
new DataStreamRequestHeader(header.getClientId(), type, header.getCallId(), streamOffset, length, options);
return orderedStreamAsync.sendRequest(h, data, slidingWindow);
}
private CompletableFuture<DataStreamReply> combineHeader(CompletableFuture<DataStreamReply> future) {
return future.thenCombine(headerFuture, (reply, headerReply) -> headerReply.isSuccess()? reply : headerReply);
}
private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, Iterable<WriteOption> options) {
if (isClosed()) {
return JavaUtils.completeExceptionally(new AlreadyClosedException(
clientId + ": stream already closed, request=" + header));
}
final CompletableFuture<DataStreamReply> f = combineHeader(send(Type.STREAM_DATA, data, length, options));
if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
if (skipSendForward) {
closeFuture = f;
} else {
closeFuture = client != null? f.thenCompose(this::sendForward): f;
}
closeFuture.thenApply(ClientProtoUtils::getRaftClientReply)
.whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
}
streamOffset += length;
return f;
}
public CompletableFuture<DataStreamReply> writeAsync(ByteBuf src, Iterable<WriteOption> options) {
return writeAsyncImpl(src, src.readableBytes(), options);
}
@Override
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, Iterable<WriteOption> options) {
return writeAsyncImpl(src, src.remaining(), options);
}
@Override
public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, WriteOption... options) {
return writeAsyncImpl(src, src.getCount(), Arrays.asList(options));
}
boolean isClosed() {
return closeFuture != null;
}
@Override
public CompletableFuture<DataStreamReply> closeAsync() {
if (!isClosed()) {
writeAsync(DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER, StandardWriteOption.CLOSE);
}
return Objects.requireNonNull(closeFuture, "closeFuture == null");
}
public RaftClientRequest getHeader() {
return header;
}
@Override
public CompletableFuture<DataStreamReply> getHeaderFuture() {
return headerFuture;
}
@Override
public CompletableFuture<RaftClientReply> getRaftClientReplyFuture() {
return raftClientReplyFuture;
}
@Override
public WritableByteChannel getWritableByteChannel() {
return writableByteChannelSupplier.get();
}
private CompletableFuture<DataStreamReply> sendForward(DataStreamReply writeReply) {
LOG.debug("sendForward {}", writeReply);
if (!writeReply.isSuccess()) {
return CompletableFuture.completedFuture(writeReply);
}
final AsyncRpcApi asyncRpc = (AsyncRpcApi) client.async();
return asyncRpc.sendForward(header).thenApply(clientReply -> DataStreamReplyByteBuffer.newBuilder()
.setClientId(clientId)
.setType(writeReply.getType())
.setStreamId(writeReply.getStreamId())
.setStreamOffset(writeReply.getStreamOffset())
.setBuffer(ClientProtoUtils.toRaftClientReplyProto(clientReply).toByteString().asReadOnlyByteBuffer())
.setSuccess(clientReply.isSuccess())
.setBytesWritten(writeReply.getBytesWritten())
.setCommitInfos(clientReply.getCommitInfos())
.build());
}
}
@Override
public DataStreamClientRpc getClientRpc() {
return dataStreamClientRpc;
}
@Override
public DataStreamOutputImpl stream(RaftClientRequest request) {
return new DataStreamOutputImpl(request);
}
@Override
public DataStreamOutputRpc stream(ByteBuffer headerMessage) {
return stream(headerMessage, null);
}
@Override
public DataStreamOutputRpc stream(ByteBuffer headerMessage, RoutingTable routingTable) {
if (routingTable != null) {
// Validate that the primary peer is equal to the primary peer passed by the RoutingTable
Preconditions.assertTrue(dataStreamServer.getId().equals(routingTable.getPrimary()),
() -> "Primary peer mismatched: the routing table has " + routingTable.getPrimary()
+ " but the client has " + dataStreamServer.getId());
}
final Message message =
Optional.ofNullable(headerMessage).map(ByteString::copyFrom).map(Message::valueOf).orElse(null);
RaftClientRequest request = RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(dataStreamServer.getId())
.setGroupId(groupId)
.setCallId(CallId.getAndIncrement())
.setMessage(message)
.setType(RaftClientRequest.dataStreamRequestType())
.setRoutingTable(routingTable)
.build();
return new DataStreamOutputImpl(request);
}
@Override
public void close() throws IOException {
dataStreamClientRpc.close();
}
}