blob: de06faf63e0f0bd32a101bd821eb92fa94b16538 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.raftlog;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
/** Log proto utilities. */
public final class LogProtoUtils {
private LogProtoUtils() {}
public static String toLogEntryString(LogEntryProto entry, Function<StateMachineLogEntryProto, String> function) {
if (entry == null) {
return null;
}
final String s;
if (entry.hasStateMachineLogEntry()) {
s = ", " + Optional.ofNullable(function)
.orElseGet(() -> proto -> "" + ClientInvocationId.valueOf(proto))
.apply(entry.getStateMachineLogEntry());
} else if (entry.hasMetadataEntry()) {
final MetadataProto metadata = entry.getMetadataEntry();
s = "(c:" + metadata.getCommitIndex() + ")";
} else if (entry.hasConfigurationEntry()) {
final RaftConfigurationProto config = entry.getConfigurationEntry();
s = "(current:" + peersToString(config.getPeersList())
+ ", old:" + peersToString(config.getOldPeersList()) + ")";
} else {
s = "";
}
return TermIndex.valueOf(entry) + ", " + entry.getLogEntryBodyCase() + s;
}
static String peersToString(List<RaftPeerProto> peers) {
return peers.stream().map(AbstractMessage::toString)
.map(s -> s.replace("\n", ""))
.map(s -> s.replace(" ", ""))
.collect(Collectors.joining(", "));
}
static String stateMachineLogEntryProtoToString(StateMachineLogEntryProto p) {
return "logData:" + p.getLogData() + ", stateMachineEntry:" + p.getType() + ":" + p.getStateMachineEntry();
}
public static String toLogEntryString(LogEntryProto entry) {
return toLogEntryString(entry, LogProtoUtils::stateMachineLogEntryProtoToString);
}
public static String toLogEntriesString(List<LogEntryProto> entries) {
return entries == null ? null
: entries.stream().map(LogProtoUtils::toLogEntryString).collect(Collectors.toList()).toString();
}
public static String toLogEntriesShortString(List<LogEntryProto> entries) {
return entries == null ? null
: entries.isEmpty()? "<empty>"
: "size=" + entries.size() + ", first=" + LogProtoUtils.toLogEntryString(entries.get(0));
}
public static LogEntryProto toLogEntryProto(RaftConfiguration conf, Long term, long index) {
final LogEntryProto.Builder b = LogEntryProto.newBuilder();
Optional.ofNullable(term).ifPresent(b::setTerm);
return b.setIndex(index)
.setConfigurationEntry(toRaftConfigurationProtoBuilder(conf))
.build();
}
public static RaftConfigurationProto.Builder toRaftConfigurationProtoBuilder(RaftConfiguration conf) {
return RaftConfigurationProto.newBuilder()
.addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers()))
.addAllListeners(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers(RaftPeerRole.LISTENER)))
.addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers()))
.addAllOldListeners(
ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers(RaftPeerRole.LISTENER)));
}
public static LogEntryProto toLogEntryProto(StateMachineLogEntryProto proto, long term, long index) {
return LogEntryProto.newBuilder()
.setTerm(term)
.setIndex(index)
.setStateMachineLogEntry(proto)
.build();
}
public static LogEntryProto toLogEntryProto(long commitIndex, long term, long index) {
return LogEntryProto.newBuilder()
.setTerm(term)
.setIndex(index)
.setMetadataEntry(MetadataProto.newBuilder().setCommitIndex(commitIndex))
.build();
}
/**
* If the given entry has state machine log entry and it has state machine data,
* build a new entry without the state machine data.
*
* @return a new entry without the state machine data if the given has state machine data;
* otherwise, return the given entry.
*/
public static LogEntryProto removeStateMachineData(LogEntryProto entry) {
return getStateMachineEntry(entry)
.map(StateMachineEntryProto::getStateMachineData)
.filter(stateMachineData -> !stateMachineData.isEmpty())
.map(_dummy -> replaceStateMachineDataWithSerializedSize(entry))
.orElse(entry);
}
private static LogEntryProto replaceStateMachineDataWithSerializedSize(LogEntryProto entry) {
return replaceStateMachineEntry(entry,
StateMachineEntryProto.newBuilder().setLogEntryProtoSerializedSize(entry.getSerializedSize()));
}
private static LogEntryProto replaceStateMachineEntry(LogEntryProto proto, StateMachineEntryProto.Builder newEntry) {
Preconditions.assertTrue(proto.hasStateMachineLogEntry(), () -> "Unexpected proto " + proto);
return LogEntryProto.newBuilder(proto).setStateMachineLogEntry(
StateMachineLogEntryProto.newBuilder(proto.getStateMachineLogEntry()).setStateMachineEntry(newEntry)
).build();
}
/**
* Return a new log entry based on the input log entry with stateMachineData added.
* @param stateMachineData - state machine data to be added
* @param entry - log entry to which stateMachineData needs to be added
* @return LogEntryProto with stateMachineData added
*/
static LogEntryProto addStateMachineData(ByteString stateMachineData, LogEntryProto entry) {
Preconditions.assertTrue(isStateMachineDataEmpty(entry),
() -> "Failed to addStateMachineData to " + entry + " since shouldReadStateMachineData is false.");
return replaceStateMachineEntry(entry, StateMachineEntryProto.newBuilder().setStateMachineData(stateMachineData));
}
public static boolean isStateMachineDataEmpty(LogEntryProto entry) {
return getStateMachineEntry(entry)
.map(StateMachineEntryProto::getStateMachineData)
.map(ByteString::isEmpty)
.orElse(false);
}
private static Optional<StateMachineEntryProto> getStateMachineEntry(LogEntryProto entry) {
return Optional.of(entry)
.filter(LogEntryProto::hasStateMachineLogEntry)
.map(LogEntryProto::getStateMachineLogEntry)
.filter(StateMachineLogEntryProto::hasStateMachineEntry)
.map(StateMachineLogEntryProto::getStateMachineEntry);
}
public static int getSerializedSize(LogEntryProto entry) {
return getStateMachineEntry(entry)
.filter(stateMachineEntry -> stateMachineEntry.getStateMachineData().isEmpty())
.map(StateMachineEntryProto::getLogEntryProtoSerializedSize)
.orElseGet(entry::getSerializedSize);
}
private static StateMachineLogEntryProto.Type toStateMachineLogEntryProtoType(RaftClientRequestProto.TypeCase type) {
switch (type) {
case WRITE: return StateMachineLogEntryProto.Type.WRITE;
case DATASTREAM: return StateMachineLogEntryProto.Type.DATASTREAM;
default:
throw new IllegalStateException("Unexpected request type " + type);
}
}
public static StateMachineLogEntryProto toStateMachineLogEntryProto(
RaftClientRequest request, ByteString logData, ByteString stateMachineData) {
if (logData == null) {
logData = request.getMessage().getContent();
}
final StateMachineLogEntryProto.Type type = toStateMachineLogEntryProtoType(request.getType().getTypeCase());
return toStateMachineLogEntryProto(request.getClientId(), request.getCallId(), type, logData, stateMachineData);
}
public static StateMachineLogEntryProto toStateMachineLogEntryProto(ClientId clientId, long callId,
StateMachineLogEntryProto.Type type, ByteString logData, ByteString stateMachineData) {
final StateMachineLogEntryProto.Builder b = StateMachineLogEntryProto.newBuilder()
.setClientId(clientId.toByteString())
.setCallId(callId)
.setType(type)
.setLogData(logData);
Optional.ofNullable(stateMachineData)
.map(StateMachineEntryProto.newBuilder()::setStateMachineData)
.ifPresent(b::setStateMachineEntry);
return b.build();
}
public static RaftConfiguration toRaftConfiguration(LogEntryProto entry) {
Preconditions.assertTrue(entry.hasConfigurationEntry());
final RaftConfigurationProto proto = entry.getConfigurationEntry();
final List<RaftPeer> conf = ProtoUtils.toRaftPeers(proto.getPeersList());
final List<RaftPeer> listener = ProtoUtils.toRaftPeers(proto.getListenersList());
final List<RaftPeer> oldConf = ProtoUtils.toRaftPeers(proto.getOldPeersList());
final List<RaftPeer> oldListener = ProtoUtils.toRaftPeers(proto.getOldListenersList());
return ServerImplUtils.newRaftConfiguration(conf, listener, entry.getIndex(), oldConf, oldListener);
}
}