blob: 691d5e9dfbc36bdfdc31ea9727188cddb37dbbed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.util.concurrent.Striped;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
/**
* Class to track pending replication operations across the cluster. For
* each container with a pending replication or pending delete, there will
* be an entry in this class mapping ContainerID to a list of the pending
* operations.
*/
public class ContainerReplicaPendingOps {
private final ConfigurationSource config;
private final Clock clock;
private final ConcurrentHashMap<ContainerID, List<ContainerReplicaOp>>
pendingOps = new ConcurrentHashMap<>();
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(64);
public ContainerReplicaPendingOps(final ConfigurationSource conf,
Clock clock) {
this.config = conf;
this.clock = clock;
}
/**
* Get all the ContainerReplicaOp's associated with the given ContainerID.
* A new list is created and returned, so it can be modified by the caller,
* but any changes will not be reflected in the internal map.
* @param containerID The ContainerID for which to retrieve the pending
* ops.
* @return Standalone list of ContainerReplica or an empty list if none exist.
*/
public List<ContainerReplicaOp> getPendingOps(ContainerID containerID) {
Lock lock = readLock(containerID);
lock.lock();
try {
List<ContainerReplicaOp> ops = pendingOps.get(containerID);
if (ops == null) {
return Collections.emptyList();
}
return new ArrayList<>(ops);
} finally {
lock.unlock();
}
}
/**
* Store a ContainerReplicaOp to add a replica for the given ContainerID.
* @param containerID ContainerID for which to add a replica
* @param target The target datanode
* @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
*/
public void scheduleAddReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex) {
addReplica(ADD, containerID, target, replicaIndex);
}
/**
* Store a ContainerReplicaOp to delete a replica for the given ContainerID.
* @param containerID ContainerID for which to delete a replica
* @param target The target datanode
* @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
*/
public void scheduleDeleteReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex) {
addReplica(DELETE, containerID, target, replicaIndex);
}
/**
* Remove a stored ContainerReplicaOp from the given ContainerID as it has
* been replicated successfully.
* @param containerID ContainerID for which to complete the replication
* @param target The target Datanode
* @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
* @return True if a pending replica was found and removed, false otherwise.
*/
public boolean completeAddReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex) {
return completeOp(ADD, containerID, target, replicaIndex);
}
/**
* Remove a stored ContainerReplicaOp from the given ContainerID as it has
* been deleted successfully.
* @param containerID ContainerID for which to complete the deletion
* @param target The target Datanode
* @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
* @return True if a pending replica was found and removed, false otherwise.
*/
public boolean completeDeleteReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex) {
return completeOp(DELETE, containerID, target, replicaIndex);
}
/**
* Remove a stored pending operation from the given ContainerID.
* @param containerID ContainerID for which to remove the op.
* @param op ContainerReplicaOp to remove
* @return True if an element was found and deleted, false otherwise.
*/
public boolean removeOp(ContainerID containerID,
ContainerReplicaOp op) {
return completeOp(op.getOpType(), containerID, op.getTarget(),
op.getReplicaIndex());
}
/**
* Iterate over all pending entries and remove any which have expired, meaning
* they have not completed the operation inside the given time.
* @param expiryMilliSeconds
*/
public void removeExpiredEntries(long expiryMilliSeconds) {
for (ContainerID containerID : pendingOps.keySet()) {
// Rather than use an entry set, we get the map entry again. This is
// to protect against another thread modifying the value after this
// iterator started. Once we lock on the ContainerID object, no other
// changes can occur to the list of ops associated with it.
Lock lock = writeLock(containerID);
lock.lock();
try {
List<ContainerReplicaOp> ops = pendingOps.get(containerID);
if (ops == null) {
// There should not be null entries, but another thread may have
// removed the map entry after the iterator was started.
continue;
}
Iterator<ContainerReplicaOp> iterator = ops.listIterator();
while (iterator.hasNext()) {
ContainerReplicaOp op = iterator.next();
if (op.getScheduledEpochMillis() + expiryMilliSeconds
< clock.millis()) {
iterator.remove();
}
}
if (ops.size() == 0) {
pendingOps.remove(containerID);
}
} finally {
lock.unlock();
}
}
}
private void addReplica(ContainerReplicaOp.PendingOpType opType,
ContainerID containerID, DatanodeDetails target, int replicaIndex) {
Lock lock = writeLock(containerID);
lock.lock();
try {
List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
containerID, s -> new ArrayList<>());
ops.add(new ContainerReplicaOp(opType,
target, replicaIndex, clock.millis()));
} finally {
lock.unlock();
}
}
private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
ContainerID containerID, DatanodeDetails target, int replicaIndex) {
boolean found = false;
Lock lock = writeLock(containerID);
lock.lock();
try {
List<ContainerReplicaOp> ops = pendingOps.get(containerID);
if (ops != null) {
Iterator<ContainerReplicaOp> iterator = ops.listIterator();
while (iterator.hasNext()) {
ContainerReplicaOp op = iterator.next();
if (op.getOpType() == opType
&& op.getTarget().equals(target)
&& op.getReplicaIndex() == replicaIndex) {
found = true;
iterator.remove();
}
}
if (ops.size() == 0) {
pendingOps.remove(containerID);
}
}
} finally {
lock.unlock();
}
return found;
}
private Lock writeLock(ContainerID containerID) {
return stripedLock.get(containerID).writeLock();
}
private Lock readLock(ContainerID containerID) {
return stripedLock.get(containerID).readLock();
}
}