blob: 0812c29fd3343ccacb76405fe91c1432abfe2082 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ResourceSemaphore;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
class PendingRequests {
public static final Logger LOG = LoggerFactory.getLogger(PendingRequests.class);
private static final int ONE_MB = SizeInBytes.ONE_MB.getSizeInt();
/**
* Round up to the nearest MB.
*/
static int roundUpMb(long bytes) {
return Math.toIntExact((bytes - 1) / ONE_MB + 1);
}
static class Permit {}
static class RequestLimits extends ResourceSemaphore.Group {
RequestLimits(int elementLimit, int megabyteLimit) {
super(elementLimit, megabyteLimit);
}
int getElementCount() {
return get(0).used();
}
int getMegaByteSize() {
return get(1).used();
}
ResourceSemaphore.ResourceAcquireStatus tryAcquire(int messageSizeMb) {
return tryAcquire(1, messageSizeMb);
}
void releaseExtraMb(int extraMb) {
release(0, extraMb);
}
void release(int diffMb) {
release(1, diffMb);
}
}
private static class RequestMap {
private final Object name;
private final ConcurrentMap<Long, PendingRequest> map = new ConcurrentHashMap<>();
private final RaftServerMetricsImpl raftServerMetrics;
/** Permits to put new requests, always synchronized. */
private final Map<Permit, Permit> permits = new HashMap<>();
/** Track and limit the number of requests and the total message size. */
private final RequestLimits resource;
/** The size (in byte) of all the requests in this map. */
private final AtomicLong requestSize = new AtomicLong();
RequestMap(Object name, int elementLimit, int megabyteLimit, RaftServerMetricsImpl raftServerMetrics) {
this.name = name;
this.resource = new RequestLimits(elementLimit, megabyteLimit);
this.raftServerMetrics = raftServerMetrics;
raftServerMetrics.addNumPendingRequestsGauge(resource::getElementCount);
raftServerMetrics.addNumPendingRequestsMegaByteSize(resource::getMegaByteSize);
}
Permit tryAcquire(Message message) {
final int messageSize = Message.getSize(message);
final int messageSizeMb = roundUpMb(messageSize );
final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(messageSizeMb);
LOG.trace("tryAcquire {} MB? {}", messageSizeMb, acquired);
if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT) {
raftServerMetrics.onRequestQueueLimitHit();
raftServerMetrics.onResourceLimitHit();
return null;
} else if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT) {
raftServerMetrics.onRequestByteSizeLimitHit();
raftServerMetrics.onResourceLimitHit();
return null;
}
// release extra MB
final long oldSize = requestSize.getAndAdd(messageSize);
final long newSize = oldSize + messageSize;
final int diffMb = roundUpMb(newSize) - roundUpMb(oldSize);
if (messageSizeMb > diffMb) {
resource.releaseExtraMb(messageSizeMb - diffMb);
}
return putPermit();
}
private synchronized Permit putPermit() {
if (resource.isClosed()) {
return null;
}
final Permit permit = new Permit();
permits.put(permit, permit);
return permit;
}
synchronized PendingRequest put(Permit permit, long index, PendingRequest p) {
LOG.debug("{}: PendingRequests.put {} -> {}", name, index, p);
final Permit removed = permits.remove(permit);
if (removed == null) {
return null;
}
Preconditions.assertTrue(removed == permit);
final PendingRequest previous = map.put(index, p);
Preconditions.assertTrue(previous == null);
return p;
}
PendingRequest get(long index) {
final PendingRequest r = map.get(index);
LOG.debug("{}: PendingRequests.get {} returns {}", name, index, r);
return r;
}
PendingRequest remove(long index) {
final PendingRequest r = map.remove(index);
LOG.debug("{}: PendingRequests.remove {} returns {}", name, index, r);
if (r == null) {
return null;
}
final int messageSize = Message.getSize(r.getRequest().getMessage());
final long oldSize = requestSize.getAndAdd(-messageSize);
final long newSize = oldSize - messageSize;
final int diffMb = roundUpMb(oldSize) - roundUpMb(newSize);
resource.release(diffMb);
LOG.trace("release {} MB", diffMb);
return r;
}
Collection<TransactionContext> setNotLeaderException(NotLeaderException nle,
Collection<CommitInfoProto> commitInfos) {
synchronized (this) {
resource.close();
permits.clear();
}
LOG.debug("{}: PendingRequests.setNotLeaderException", name);
final List<TransactionContext> transactions = new ArrayList<>(map.size());
for(;;) {
final Iterator<Long> i = map.keySet().iterator();
if (!i.hasNext()) { // the map is empty
return transactions;
}
final PendingRequest pending = map.remove(i.next());
if (pending != null) {
transactions.add(pending.setNotLeaderException(nle, commitInfos));
}
}
}
void close() {
if (raftServerMetrics != null) {
raftServerMetrics.removeNumPendingRequestsGauge();
raftServerMetrics.removeNumPendingRequestsByteSize();
}
}
}
private PendingRequest pendingSetConf;
private final String name;
private final RequestMap pendingRequests;
PendingRequests(RaftGroupMemberId id, RaftProperties properties, RaftServerMetricsImpl raftServerMetrics) {
this.name = id + "-" + JavaUtils.getClassSimpleName(getClass());
this.pendingRequests = new RequestMap(id,
RaftServerConfigKeys.Write.elementLimit(properties),
Math.toIntExact(
RaftServerConfigKeys.Write.byteLimit(properties).getSize()
/ SizeInBytes.ONE_MB.getSize()), //round down
raftServerMetrics);
}
Permit tryAcquire(Message message) {
return pendingRequests.tryAcquire(message);
}
PendingRequest add(Permit permit, RaftClientRequest request, TransactionContext entry) {
// externally synced for now
final long index = entry.getLogEntry().getIndex();
LOG.debug("{}: addPendingRequest at index={}, request={}", name, index, request);
final PendingRequest pending = new PendingRequest(index, request, entry);
return pendingRequests.put(permit, index, pending);
}
PendingRequest addConfRequest(SetConfigurationRequest request) {
Preconditions.assertTrue(pendingSetConf == null);
pendingSetConf = new PendingRequest(request);
return pendingSetConf;
}
void replySetConfiguration(Function<RaftClientRequest, RaftClientReply> newSuccessReply) {
// we allow the pendingRequest to be null in case that the new leader
// commits the new configuration while it has not received the retry
// request from the client
if (pendingSetConf != null) {
final RaftClientRequest request = pendingSetConf.getRequest();
LOG.debug("{}: sends success for {}", name, request);
// for setConfiguration we do not need to wait for statemachine. send back
// reply after it's committed.
pendingSetConf.setReply(newSuccessReply.apply(request));
pendingSetConf = null;
}
}
void failSetConfiguration(RaftException e) {
Preconditions.assertTrue(pendingSetConf != null);
pendingSetConf.setException(e);
pendingSetConf = null;
}
TransactionContext getTransactionContext(long index) {
PendingRequest pendingRequest = pendingRequests.get(index);
// it is possible that the pendingRequest is null if this peer just becomes
// the new leader and commits transactions received by the previous leader
return pendingRequest != null ? pendingRequest.getEntry() : null;
}
void replyPendingRequest(long index, RaftClientReply reply) {
final PendingRequest pending = pendingRequests.remove(index);
if (pending != null) {
Preconditions.assertTrue(pending.getIndex() == index);
pending.setReply(reply);
}
}
/**
* The leader state is stopped. Send NotLeaderException to all the pending
* requests since they have not got applied to the state machine yet.
*/
Collection<TransactionContext> sendNotLeaderResponses(NotLeaderException nle,
Collection<CommitInfoProto> commitInfos) {
LOG.info("{}: sendNotLeaderResponses", name);
final Collection<TransactionContext> transactions = pendingRequests.setNotLeaderException(nle, commitInfos);
if (pendingSetConf != null) {
pendingSetConf.setNotLeaderException(nle, commitInfos);
}
return transactions;
}
void close() {
if (pendingRequests != null) {
pendingRequests.close();
}
}
}