blob: ccf11bf1d52d086893c0963c40a5be2f8cb2b77f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.consensus.multileader.logdispatcher;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.client.DispatchLogHandler;
import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.consensus.ratis.Utils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** Manage all asynchronous replication threads and corresponding async clients */
public class LogDispatcher {
private final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
private final MultiLeaderServerImpl impl;
private final List<LogDispatcherThread> threads;
private final String selfPeerId;
private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
private ExecutorService executorService;
private boolean stopped = false;
public LogDispatcher(
MultiLeaderServerImpl impl,
IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager) {
this.impl = impl;
this.selfPeerId = impl.getThisNode().getEndpoint().toString();
this.clientManager = clientManager;
this.threads =
impl.getConfiguration().stream()
.filter(x -> !Objects.equals(x, impl.getThisNode()))
.map(x -> new LogDispatcherThread(x, impl.getConfig(), DEFAULT_INITIAL_SYNC_INDEX))
.collect(Collectors.toList());
if (!threads.isEmpty()) {
// We use cached thread pool here because each LogDispatcherThread will occupy one thread.
// And every LogDispatcherThread won't release its thread in this pool because it won't stop
// unless LogDispatcher stop.
// Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
this.executorService =
IoTDBThreadPoolFactory.newCachedThreadPool(
"LogDispatcher-" + impl.getThisNode().getGroupId());
}
}
public synchronized void start() {
if (!threads.isEmpty()) {
threads.forEach(executorService::submit);
}
}
public synchronized void stop() {
if (!threads.isEmpty()) {
threads.forEach(LogDispatcherThread::stop);
executorService.shutdownNow();
int timeout = 10;
try {
if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
logger.error("Unable to shutdown LogDispatcher service after {} seconds", timeout);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Unexpected Interruption when closing LogDispatcher service ");
}
}
stopped = true;
}
public synchronized void addLogDispatcherThread(Peer peer, long initialSyncIndex) {
if (stopped) {
return;
}
//
LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex);
threads.add(thread);
executorService.submit(thread);
}
public synchronized void removeLogDispatcherThread(Peer peer) throws IOException {
if (stopped) {
return;
}
int threadIndex = -1;
for (int i = 0; i < threads.size(); i++) {
if (threads.get(i).peer.equals(peer)) {
threadIndex = i;
break;
}
}
if (threadIndex == -1) {
return;
}
threads.get(threadIndex).stop();
threads.get(threadIndex).cleanup();
threads.remove(threadIndex);
}
public synchronized OptionalLong getMinSyncIndex() {
return threads.stream().mapToLong(LogDispatcherThread::getCurrentSyncIndex).min();
}
public void offer(IndexedConsensusRequest request) {
List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
// we put the serialization step outside the synchronized block because it is stateless and
// time-consuming
synchronized (this) {
threads.forEach(
thread -> {
logger.debug(
"{}->{}: Push a log to the queue, where the queue length is {}",
impl.getThisNode().getGroupId(),
thread.getPeer().getEndpoint().getIp(),
thread.getPendingRequest().size());
if (!thread
.getPendingRequest()
.offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
logger.debug(
"{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
impl.getThisNode().getGroupId(),
thread.getPeer(),
request.getSearchIndex());
}
});
}
}
public class LogDispatcherThread implements Runnable {
private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
private static final long START_INDEX = 1;
private final MultiLeaderConfig config;
private final Peer peer;
private final IndexController controller;
// A sliding window class that manages asynchronously pendingBatches
private final SyncStatus syncStatus;
// A queue used to receive asynchronous replication requests
private final BlockingQueue<IndexedConsensusRequest> pendingRequest;
// A container used to cache requests, whose size changes dynamically
private final List<IndexedConsensusRequest> bufferedRequest = new LinkedList<>();
// A reader management class that gets requests from the DataRegion
private final ConsensusReqReader reader =
(ConsensusReqReader) impl.getStateMachine().read(new GetConsensusReqReaderPlan());
private volatile boolean stopped = false;
private ConsensusReqReader.ReqIterator walEntryiterator;
public LogDispatcherThread(Peer peer, MultiLeaderConfig config, long initialSyncIndex) {
this.peer = peer;
this.config = config;
this.pendingRequest =
new ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
this.controller =
new IndexController(
impl.getStorageDir(),
Utils.fromTEndPointToString(peer.getEndpoint()),
initialSyncIndex,
config.getReplication().getCheckpointGap());
this.syncStatus = new SyncStatus(controller, config);
this.walEntryiterator = reader.getReqIterator(START_INDEX);
}
public IndexController getController() {
return controller;
}
public long getCurrentSyncIndex() {
return controller.getCurrentIndex();
}
public Peer getPeer() {
return peer;
}
public MultiLeaderConfig getConfig() {
return config;
}
public BlockingQueue<IndexedConsensusRequest> getPendingRequest() {
return pendingRequest;
}
public void stop() {
stopped = true;
}
public void cleanup() throws IOException {
this.controller.cleanupVersionFiles();
}
public boolean isStopped() {
return stopped;
}
@Override
public void run() {
logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
try {
PendingBatch batch;
while (!Thread.interrupted() && !stopped) {
while ((batch = getBatch()).isEmpty()) {
// we may block here if there is no requests in the queue
IndexedConsensusRequest request =
pendingRequest.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
if (request != null) {
bufferedRequest.add(request);
// If write pressure is low, we simply sleep a little to reduce the number of RPC
if (pendingRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
}
}
}
// we may block here if the synchronization pipeline is full
syncStatus.addNextBatch(batch);
// sends batch asynchronously and migrates the retry logic into the callback handler
sendBatchAsync(batch, new DispatchLogHandler(this, batch));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Unexpected error in logDispatcher for peer {}", peer, e);
}
logger.info("{}: Dispatcher for {} exits", impl.getThisNode(), peer);
}
public void updateSafelyDeletedSearchIndex() {
// update safely deleted search index to delete outdated info,
// indicating that insert nodes whose search index are before this value can be deleted
// safely
long currentSafelyDeletedSearchIndex = impl.getCurrentSafelyDeletedSearchIndex();
reader.setSafelyDeletedSearchIndex(
currentSafelyDeletedSearchIndex
- currentSafelyDeletedSearchIndex % config.getReplication().getCheckpointGap());
// notify
if (impl.unblockWrite()) {
impl.signal();
}
}
public PendingBatch getBatch() {
PendingBatch batch;
List<TLogBatch> logBatches = new ArrayList<>();
long startIndex = syncStatus.getNextSendingIndex();
long maxIndexWhenBufferedRequestEmpty = startIndex;
logger.debug("[GetBatch] startIndex: {}", startIndex);
long endIndex;
if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
// Use drainTo instead of poll to reduce lock overhead
logger.debug(
"{} : pendingRequest Size: {}, bufferedRequest size: {}",
impl.getThisNode().getGroupId(),
pendingRequest.size(),
bufferedRequest.size());
synchronized (impl.getIndexObject()) {
pendingRequest.drainTo(
bufferedRequest,
config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
}
// remove all request that searchIndex < startIndex
Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
while (iterator.hasNext()) {
IndexedConsensusRequest request = iterator.next();
if (request.getSearchIndex() < startIndex) {
iterator.remove();
} else {
break;
}
}
}
// This condition will be executed in several scenarios:
// 1. restart
// 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed
// up. To prevent inconsistency here, we use the synchronized logic when calculate value of
// `maxIndexWhenBufferedRequestEmpty`
if (bufferedRequest.isEmpty()) {
endIndex = constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, logBatches);
batch = new PendingBatch(startIndex, endIndex, logBatches);
logger.debug(
"{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch);
} else {
// Notice that prev searchIndex >= startIndex
Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
IndexedConsensusRequest prev = iterator.next();
// Prevents gap between logs. For example, some requests are not written into the queue when
// the queue is full. In this case, requests need to be loaded from the WAL
endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
batch = new PendingBatch(startIndex, endIndex, logBatches);
logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch);
return batch;
}
constructBatchIndexedFromConsensusRequest(prev, logBatches);
endIndex = prev.getSearchIndex();
iterator.remove();
while (iterator.hasNext()
&& logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) {
IndexedConsensusRequest current = iterator.next();
// Prevents gap between logs. For example, some logs are not written into the queue when
// the queue is full. In this case, requests need to be loaded from the WAL
if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
endIndex =
constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), logBatches);
if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
batch = new PendingBatch(startIndex, endIndex, logBatches);
logger.debug(
"gap {} : accumulated a {} from queue and wal when gap",
impl.getThisNode().getGroupId(),
batch);
return batch;
}
}
constructBatchIndexedFromConsensusRequest(current, logBatches);
endIndex = current.getSearchIndex();
prev = current;
// We might not be able to remove all the elements in the bufferedRequest in the
// current function, but that's fine, we'll continue processing these elements in the
// bufferedRequest the next time we go into the function, they're never lost
iterator.remove();
}
batch = new PendingBatch(startIndex, endIndex, logBatches);
logger.debug(
"{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batch);
}
return batch;
}
public void sendBatchAsync(PendingBatch batch, DispatchLogHandler handler) {
try {
AsyncMultiLeaderServiceClient client = clientManager.borrowClient(peer.getEndpoint());
TSyncLogReq req =
new TSyncLogReq(
selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
logger.debug(
"Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
batch.getStartIndex(),
batch.getEndIndex(),
peer.getGroupId().convertToTConsensusGroupId());
client.syncLog(req, handler);
} catch (IOException | TException e) {
logger.error("Can not sync logs to peer {} because", peer, e);
}
}
public SyncStatus getSyncStatus() {
return syncStatus;
}
private long constructBatchFromWAL(
long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
logger.debug(
String.format(
"DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d",
peer.getGroupId().getId(), peer.getEndpoint().getIp(), currentIndex, maxIndex));
// targetIndex is the index of request that we need to find
long targetIndex = currentIndex;
// Even if there is no WAL files, these code won't produce error.
walEntryiterator.skipTo(targetIndex);
while (targetIndex < maxIndex
&& logBatches.size() < config.getReplication().getMaxRequestPerBatch()) {
logger.debug("construct from WAL for one Entry, index : {}", targetIndex);
try {
walEntryiterator.waitForNextReady();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("wait for next WAL entry is interrupted");
}
IndexedConsensusRequest data = walEntryiterator.next();
if (targetIndex > data.getSearchIndex()) {
// if the index of request is smaller than currentIndex, then continue
logger.warn(
"search for one Entry which index is {}, but find a smaller one, index : {}",
targetIndex,
data.getSearchIndex());
continue;
} else if (targetIndex < data.getSearchIndex()) {
logger.warn(
"search for one Entry which index is {}, but find a larger one, index : {}",
targetIndex,
data.getSearchIndex());
if (data.getSearchIndex() >= maxIndex) {
// if the index of request is larger than maxIndex, then finish
break;
}
}
targetIndex = data.getSearchIndex() + 1;
// construct request from wal
for (IConsensusRequest innerRequest : data.getRequests()) {
logBatches.add(
new TLogBatch(innerRequest.serializeToByteBuffer(), data.getSearchIndex(), true));
}
}
return logBatches.size() > 0
? logBatches.get(logBatches.size() - 1).searchIndex
: currentIndex;
}
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, List<TLogBatch> logBatches) {
for (ByteBuffer innerRequest : request.getSerializedRequests()) {
logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false));
}
}
}
}