blob: 3ed175c099c77c18aecde589946c038db823cc79 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.placementdriver;
import static java.util.Objects.hash;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.placementdriver.leases.Lease;
import org.apache.ignite.internal.placementdriver.leases.LeaseBatch;
import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
import org.apache.ignite.internal.placementdriver.leases.Leases;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverActorMessage;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
import org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreement;
import org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
/**
* A processor to manger leases. The process is started when placement driver activates and stopped when it deactivates.
*/
public class LeaseUpdater {
/** Negative value means that printing statistics is disabled. */
private static final int LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS = IgniteSystemProperties
.getInteger("LEASE_STATISTICS_PRINT_ONCE_PER_ITERATIONS", 10);
/** Ignite logger. */
private static final IgniteLogger LOG = Loggers.forClass(LeaseUpdater.class);
/** Update attempts interval in milliseconds. */
private static final long UPDATE_LEASE_MS = 500L;
/** Lease holding interval. */
private static final long LEASE_INTERVAL = 10 * UPDATE_LEASE_MS;
/** The lock is available when the actor is changing state. */
private final IgniteSpinBusyLock stateChangingLock = new IgniteSpinBusyLock();
private final AtomicBoolean active = new AtomicBoolean();
/** The interval in milliseconds that is used in the beginning of lease granting process. */
private final long longLeaseInterval;
/** Cluster service. */
private final ClusterService clusterService;
/** Meta storage manager. */
private final MetaStorageManager msManager;
/** Assignments tracker. */
private final AssignmentsTracker assignmentsTracker;
/** Topology tracker. */
private final TopologyTracker topologyTracker;
/** Lease tracker. */
private final LeaseTracker leaseTracker;
/** Cluster clock. */
private final ClockService clockService;
/** Closure to update leases. */
private final Updater updater;
/** Dedicated thread to update leases. */
private IgniteThread updaterThread;
/** Processor to communicate with the leaseholder to negotiate the lease. */
private LeaseNegotiator leaseNegotiator;
/** Node name. */
private final String nodeName;
/**
* Constructor.
*
* @param clusterService Cluster service.
* @param msManager Meta storage manager.
* @param topologyService Topology service.
* @param leaseTracker Lease tracker.
* @param clockService Clock service.
*/
LeaseUpdater(
String nodeName,
ClusterService clusterService,
MetaStorageManager msManager,
LogicalTopologyService topologyService,
LeaseTracker leaseTracker,
ClockService clockService
) {
this.nodeName = nodeName;
this.clusterService = clusterService;
this.msManager = msManager;
this.leaseTracker = leaseTracker;
this.clockService = clockService;
this.longLeaseInterval = IgniteSystemProperties.getLong("IGNITE_LONG_LEASE", 120_000);
this.assignmentsTracker = new AssignmentsTracker(msManager);
this.topologyTracker = new TopologyTracker(topologyService);
this.updater = new Updater();
clusterService.messagingService().addMessageHandler(PlacementDriverMessageGroup.class, new PlacementDriverActorMessageHandler());
}
/** Initializes the class. */
public void init() {
topologyTracker.startTrack();
assignmentsTracker.startTrack();
}
/** De-initializes the class. */
void deInit() {
topologyTracker.stopTrack();
assignmentsTracker.stopTrack();
}
/** Activates a lease updater to renew leases. */
void activate() {
if (active()) {
return;
}
stateChangingLock.block();
try {
if (!active.compareAndSet(false, true)) {
return;
}
leaseNegotiator = new LeaseNegotiator(clusterService);
updaterThread = new IgniteThread(nodeName, "lease-updater", updater);
updaterThread.start();
} finally {
stateChangingLock.unblock();
}
}
/** Stops a dedicated thread to renew or assign leases. */
void deactivate() {
if (!active()) {
return;
}
stateChangingLock.block();
try {
if (!active.compareAndSet(true, false)) {
return;
}
leaseNegotiator = null;
updaterThread.interrupt();
this.updaterThread = null;
} finally {
stateChangingLock.unblock();
}
}
/**
* Denies a lease and write the denied one to Meta storage.
*
* @param grpId Replication group id.
* @param lease Lease to deny.
* @param redirectProposal Consistent id of the cluster node proposed for redirection.
* @return Future completes true when the lease will not prolong in the future, false otherwise.
*/
private CompletableFuture<Boolean> denyLease(ReplicationGroupId grpId, Lease lease, String redirectProposal) {
Lease deniedLease = lease.denyLease(redirectProposal);
leaseNegotiator.onLeaseRemoved(grpId);
Leases leasesCurrent = leaseTracker.leasesCurrent();
Collection<Lease> leases = leasesCurrent.leaseByGroupId().values();
List<Lease> renewedLeases = new ArrayList<>();
for (Lease ls : leases) {
if (ls.replicationGroupId().equals(grpId)) {
renewedLeases.add(deniedLease);
} else {
renewedLeases.add(ls);
}
}
ByteArray key = PLACEMENTDRIVER_LEASES_KEY;
return msManager.invoke(
or(notExists(key), value(key).eq(leasesCurrent.leasesBytes())),
put(key, new LeaseBatch(renewedLeases).bytes()),
noop()
);
}
/**
* Finds a node that can be the leaseholder.
*
* @param assignments Replication group assignment.
* @param grpId Group id.
* @param proposedConsistentId Proposed consistent id, found out of a lease negotiation. The parameter might be {@code null}.
* @return Cluster node, or {@code null} if no node in assignments can be the leaseholder.
*/
private @Nullable ClusterNode nextLeaseHolder(
Set<Assignment> assignments,
ReplicationGroupId grpId,
@Nullable String proposedConsistentId
) {
// TODO: IGNITE-18879 Implement more intellectual algorithm to choose a node.
ClusterNode primaryCandidate = null;
for (Assignment assignment : assignments) {
// Check whether given assignments is actually available in logical topology. It's a best effort check because it's possible
// for proposed primary candidate to leave the topology at any time. In that case primary candidate will be recalculated.
ClusterNode candidateNode = topologyTracker.nodeByConsistentId(assignment.consistentId());
if (candidateNode == null) {
continue;
}
if (assignment.consistentId().equals(proposedConsistentId)) {
primaryCandidate = candidateNode;
break;
} else if (primaryCandidate == null) {
primaryCandidate = candidateNode;
} else {
int candidateHash = hash(primaryCandidate.name(), grpId);
int assignmentHash = hash(assignment.consistentId(), grpId);
if (candidateHash > assignmentHash) {
primaryCandidate = candidateNode;
}
}
}
return primaryCandidate;
}
/** Returns {@code true} if active. */
boolean active() {
return active.get();
}
/** Runnable to update lease in Meta storage. */
private class Updater implements Runnable {
private LeaseStats leaseUpdateStatistics = new LeaseStats();
/** This field should be accessed only from updater thread. */
private int statisticsLogCounter;
@Override
public void run() {
while (active() && !Thread.interrupted()) {
if (!stateChangingLock.enterBusy()) {
continue;
}
try {
if (active()) {
updateLeaseBatchInternal();
}
} catch (Throwable e) {
LOG.error("Error occurred when updating the leases.", e);
if (e instanceof Error) {
// TODO IGNITE-20368 The node should be halted in case of an error here.
throw (Error) e;
}
} finally {
stateChangingLock.leaveBusy();
}
try {
Thread.sleep(UPDATE_LEASE_MS);
} catch (InterruptedException e) {
LOG.warn("Lease updater is interrupted");
}
}
}
/** Updates leases in Meta storage. This method is supposed to be used in the busy lock. */
private void updateLeaseBatchInternal() {
HybridTimestamp now = clockService.now();
leaseUpdateStatistics = new LeaseStats();
long outdatedLeaseThreshold = now.getPhysical() + LEASE_INTERVAL / 2;
Leases leasesCurrent = leaseTracker.leasesCurrent();
Map<ReplicationGroupId, Boolean> toBeNegotiated = new HashMap<>();
Map<ReplicationGroupId, Lease> renewedLeases = new HashMap<>(leasesCurrent.leaseByGroupId());
Map<ReplicationGroupId, Set<Assignment>> currentAssignments = assignmentsTracker.assignments();
Set<ReplicationGroupId> currentAssignmentsReplicationGroupIds = currentAssignments.keySet();
// Remove all expired leases that are no longer present in assignments.
renewedLeases.entrySet().removeIf(e -> clockService.before(e.getValue().getExpirationTime(), now)
&& !currentAssignmentsReplicationGroupIds.contains(e.getKey()));
int currentAssignmentsSize = currentAssignments.size();
int activeLeasesCount = 0;
for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry : currentAssignments.entrySet()) {
ReplicationGroupId grpId = entry.getKey();
Set<Assignment> assignments = entry.getValue();
Lease lease = leaseTracker.getLease(grpId);
if (lease.isAccepted() && !isLeaseOutdated(lease)) {
activeLeasesCount++;
}
if (!lease.isAccepted()) {
LeaseAgreement agreement = leaseNegotiator.getAndRemoveIfReady(grpId);
agreement.checkValid(grpId, topologyTracker.currentTopologySnapshot(), assignments);
if (agreement.isAccepted()) {
publishLease(grpId, lease, renewedLeases);
continue;
} else if (agreement.isDeclined()) {
// Here we initiate negotiations for UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
ClusterNode candidate = nextLeaseHolder(assignments, grpId, agreement.getRedirectTo());
if (candidate == null) {
leaseUpdateStatistics.onLeaseWithoutCandidate();
continue;
}
// New lease is granted.
writeNewLease(grpId, candidate, renewedLeases);
boolean force = Objects.equals(lease.getLeaseholder(), candidate.name());
toBeNegotiated.put(grpId, force);
continue;
}
}
// The lease is expired or close to this.
if (lease.getExpirationTime().getPhysical() < outdatedLeaseThreshold) {
String proposedLeaseholder = lease.isProlongable()
? lease.getLeaseholder()
: lease.proposedCandidate();
ClusterNode candidate = nextLeaseHolder(assignments, grpId, proposedLeaseholder);
if (candidate == null) {
leaseUpdateStatistics.onLeaseWithoutCandidate();
continue;
}
// We can't prolong the expired lease because we already have an interval of time when the lease was not active,
// so we must start a negotiation round from the beginning; the same we do for the groups that don't have
// leaseholders at all.
if (isLeaseOutdated(lease)) {
// New lease is granted.
writeNewLease(grpId, candidate, renewedLeases);
boolean force = !lease.isProlongable() && lease.proposedCandidate() != null;
toBeNegotiated.put(grpId, force);
} else if (lease.isProlongable() && candidate.id().equals(lease.getLeaseholderId())) {
// Old lease is renewed.
prolongLease(grpId, lease, renewedLeases);
}
}
}
byte[] renewedValue = new LeaseBatch(renewedLeases.values()).bytes();
ByteArray key = PLACEMENTDRIVER_LEASES_KEY;
if (shouldLogLeaseStatistics()) {
LOG.info(
"Leases updated (printed once per {} iteration(s)): [inCurrentIteration={}, active={}, "
+ "currentAssignmentsSize={}].",
LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS,
leaseUpdateStatistics,
activeLeasesCount,
currentAssignmentsSize
);
}
msManager.invoke(
or(notExists(key), value(key).eq(leasesCurrent.leasesBytes())),
put(key, renewedValue),
noop()
).whenComplete((success, e) -> {
if (e != null) {
LOG.error("Lease update invocation failed", e);
return;
}
if (!success) {
LOG.debug("Lease update invocation failed");
return;
}
for (Map.Entry<ReplicationGroupId, Boolean> entry : toBeNegotiated.entrySet()) {
Lease lease = renewedLeases.get(entry.getKey());
boolean force = entry.getValue();
leaseNegotiator.negotiate(lease, force);
}
});
}
/**
* Writes a new lease.
*
* @param grpId Replication group id.
* @param candidate Lease candidate.
* @param renewedLeases Leases to renew.
*/
private void writeNewLease(
ReplicationGroupId grpId,
ClusterNode candidate,
Map<ReplicationGroupId, Lease> renewedLeases
) {
HybridTimestamp startTs = clockService.now();
var expirationTs = new HybridTimestamp(startTs.getPhysical() + longLeaseInterval, 0);
Lease renewedLease = new Lease(candidate.name(), candidate.id(), startTs, expirationTs, grpId);
renewedLeases.put(grpId, renewedLease);
leaseUpdateStatistics.onLeaseCreate();
}
/**
* Prolongs the lease.
*
* @param grpId Replication group id.
* @param lease Lease to prolong.
*/
private void prolongLease(ReplicationGroupId grpId, Lease lease, Map<ReplicationGroupId, Lease> renewedLeases) {
var newTs = new HybridTimestamp(clockService.now().getPhysical() + LEASE_INTERVAL, 0);
Lease renewedLease = lease.prolongLease(newTs);
renewedLeases.put(grpId, renewedLease);
leaseUpdateStatistics.onLeaseProlong();
}
/**
* Writes an accepted lease in Meta storage. After the lease will be written to Meta storage,
* the lease becomes available to all components.
*
* @param grpId Replication group id.
* @param lease Lease to accept.
*/
private void publishLease(ReplicationGroupId grpId, Lease lease, Map<ReplicationGroupId, Lease> renewedLeases) {
var newTs = new HybridTimestamp(clockService.now().getPhysical() + LEASE_INTERVAL, 0);
Lease renewedLease = lease.acceptLease(newTs);
renewedLeases.put(grpId, renewedLease);
leaseUpdateStatistics.onLeasePublish();
}
/**
* Checks that the lease is outdated.
* {@link Lease#emptyLease} is always outdated.
*
* @param lease Lease.
* @return True when the candidate can be a leaseholder, otherwise false.
*/
private boolean isLeaseOutdated(Lease lease) {
HybridTimestamp now = clockService.now();
return clockService.after(now, lease.getExpirationTime());
}
private boolean shouldLogLeaseStatistics() {
if (LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS < 0) {
return false;
}
boolean result = ++statisticsLogCounter > LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS;
if (result) {
statisticsLogCounter = 0;
}
return result;
}
}
private static class LeaseStats {
@IgniteToStringInclude
int leasesCreated;
@IgniteToStringInclude
int leasesPublished;
@IgniteToStringInclude
int leasesProlonged;
@IgniteToStringInclude
int leasesWithoutCandidates;
private void onLeaseCreate() {
leasesCreated++;
}
private void onLeasePublish() {
leasesPublished++;
}
private void onLeaseProlong() {
leasesProlonged++;
}
private void onLeaseWithoutCandidate() {
leasesWithoutCandidates++;
}
@Override
public String toString() {
return S.toString(this);
}
}
/** Message handler to process notification from replica side. */
private class PlacementDriverActorMessageHandler implements NetworkMessageHandler {
@Override
public void onReceived(NetworkMessage msg0, ClusterNode sender, @Nullable Long correlationId) {
if (!(msg0 instanceof PlacementDriverActorMessage)) {
return;
}
var msg = (PlacementDriverActorMessage) msg0;
if (!active() || !stateChangingLock.enterBusy()) {
return;
}
try {
processMessageInternal(sender.name(), msg);
} finally {
stateChangingLock.leaveBusy();
}
}
/**
* Processes an placement driver actor message. The method should be invoked under state lock.
*
* @param sender Sender node name.
* @param msg Message.
*/
private void processMessageInternal(String sender, PlacementDriverActorMessage msg) {
ReplicationGroupId grpId = msg.groupId();
Lease lease = leaseTracker.getLease(grpId);
if (msg instanceof StopLeaseProlongationMessage) {
if (lease.isProlongable() && sender.equals(lease.getLeaseholder())) {
StopLeaseProlongationMessage stopLeaseProlongationMessage = (StopLeaseProlongationMessage) msg;
denyLease(grpId, lease, stopLeaseProlongationMessage.redirectProposal()).whenComplete((res, th) -> {
if (th != null) {
LOG.warn("Prolongation denial failed due to exception [groupId={}]", th, grpId);
} else {
LOG.info("Stop lease prolongation message was handled [groupId={}, sender={}, deny={}]", grpId, sender, res);
}
});
}
} else {
LOG.warn("Unknown message type [msg={}]", msg.getClass().getSimpleName());
}
}
}
}