blob: 1d9d55bfbfbb69259c4826672e09edeff310b4e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This class maintains the map of the commitIndexes to be watched for
* successful replication in the datanodes in a given pipeline. It also releases
* the buffers associated with the user data back to {@Link BufferPool} once
* minimum replication criteria is achieved during an ozone key write.
*/
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
/**
* This class executes watchForCommit on ratis pipeline and releases
* buffers once data successfully gets replicated.
*/
public class CommitWatcher {
private static final Logger LOG =
LoggerFactory.getLogger(CommitWatcher.class);
// A reference to the pool of buffers holding the data
private BufferPool bufferPool;
// The map should maintain the keys (logIndexes) in order so that while
// removing we always end up updating incremented data flushed length.
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private ConcurrentSkipListMap<Long, List<ByteBuffer>>
commitIndex2flushedDataMap;
// future Map to hold up all putBlock futures
private ConcurrentHashMap<Long,
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
futureMap;
private XceiverClientSpi xceiverClient;
private final long watchTimeout;
// total data which has been successfully flushed and acknowledged
// by all servers
private long totalAckDataLength;
public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient,
long watchTimeout) {
this.bufferPool = bufferPool;
this.xceiverClient = xceiverClient;
this.watchTimeout = watchTimeout;
commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
totalAckDataLength = 0;
futureMap = new ConcurrentHashMap<>();
}
/**
* just update the totalAckDataLength. In case of failure,
* we will read the data starting from totalAckDataLength.
*/
private long releaseBuffers(List<Long> indexes) {
Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
for (long index : indexes) {
Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
List<ByteBuffer> buffers = commitIndex2flushedDataMap.remove(index);
long length = buffers.stream().mapToLong(value -> {
int pos = value.position();
return pos;
}).sum();
totalAckDataLength += length;
// clear the future object from the future Map
Preconditions.checkNotNull(futureMap.remove(totalAckDataLength));
for (ByteBuffer byteBuffer : buffers) {
bufferPool.releaseBuffer(byteBuffer);
}
}
return totalAckDataLength;
}
public void updateCommitInfoMap(long index, List<ByteBuffer> byteBufferList) {
commitIndex2flushedDataMap
.put(index, byteBufferList);
}
int getCommitInfoMapSize() {
return commitIndex2flushedDataMap.size();
}
/**
* Calls watch for commit for the first index in commitIndex2flushedDataMap to
* the Ratis client.
* @return reply reply from raft client
* @throws IOException in case watchForCommit fails
*/
public XceiverClientReply watchOnFirstIndex() throws IOException {
if (!commitIndex2flushedDataMap.isEmpty()) {
// wait for the first commit index in the commitIndex2flushedDataMap
// to get committed to all or majority of nodes in case timeout
// happens.
long index =
commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).min()
.getAsLong();
if (LOG.isDebugEnabled()) {
LOG.debug("waiting for first index " + index + " to catch up");
}
return watchForCommit(index);
} else {
return null;
}
}
/**
* Calls watch for commit for the first index in commitIndex2flushedDataMap to
* the Ratis client.
* @return reply reply from raft client
* @throws IOException in case watchForCommit fails
*/
public XceiverClientReply watchOnLastIndex()
throws IOException {
if (!commitIndex2flushedDataMap.isEmpty()) {
// wait for the commit index in the commitIndex2flushedDataMap
// to get committed to all or majority of nodes in case timeout
// happens.
long index =
commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).max()
.getAsLong();
if (LOG.isDebugEnabled()) {
LOG.debug("waiting for last flush Index " + index + " to catch up");
}
return watchForCommit(index);
} else {
return null;
}
}
private void adjustBuffers(long commitIndex) {
List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
.filter(p -> p <= commitIndex).collect(Collectors.toList());
if (keyList.isEmpty()) {
return;
} else {
releaseBuffers(keyList);
}
}
// It may happen that once the exception is encountered , we still might
// have successfully flushed up to a certain index. Make sure the buffers
// only contain data which have not been sufficiently replicated
void releaseBuffersOnException() {
adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
}
/**
* calls watchForCommit API of the Ratis Client. For Standalone client,
* it is a no op.
* @param commitIndex log index to watch for
* @return minimum commit index replicated to all nodes
* @throws IOException IOException in case watch gets timed out
*/
public XceiverClientReply watchForCommit(long commitIndex)
throws IOException {
long index;
try {
XceiverClientReply reply =
xceiverClient.watchForCommit(commitIndex, watchTimeout);
if (reply == null) {
index = 0;
} else {
index = reply.getLogIndex();
}
adjustBuffers(index);
return reply;
} catch (TimeoutException | InterruptedException | ExecutionException e) {
LOG.warn("watchForCommit failed for index " + commitIndex, e);
IOException ioException = new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
releaseBuffersOnException();
throw ioException;
}
}
@VisibleForTesting
public ConcurrentSkipListMap<Long,
List<ByteBuffer>> getCommitIndex2flushedDataMap() {
return commitIndex2flushedDataMap;
}
public ConcurrentHashMap<Long,
CompletableFuture<ContainerProtos.
ContainerCommandResponseProto>> getFutureMap() {
return futureMap;
}
public long getTotalAckDataLength() {
return totalAckDataLength;
}
public void cleanup() {
if (commitIndex2flushedDataMap != null) {
commitIndex2flushedDataMap.clear();
}
if (futureMap != null) {
futureMap.clear();
}
commitIndex2flushedDataMap = null;
}
}