blob: cd70efe655b0520a0670d3de582c932e1df4c46b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
import org.apache.iotdb.consensus.iot.thrift.TLogEntry;
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesReq;
import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/** Manage all asynchronous replication threads and corresponding async clients */
public class LogDispatcher {
private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
private final IoTConsensusServerImpl impl;
private final List<LogDispatcherThread> threads;
private final String selfPeerId;
private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
private ExecutorService executorService;
private boolean stopped = false;
private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
private final AtomicLong logEntriesFromQueue = new AtomicLong(0);
public LogDispatcher(
IoTConsensusServerImpl impl,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) {
this.impl = impl;
this.selfPeerId = impl.getThisNode().getEndpoint().toString();
this.clientManager = clientManager;
this.threads =
impl.getConfiguration().stream()
.filter(x -> !Objects.equals(x, impl.getThisNode()))
.map(x -> new LogDispatcherThread(x, impl.getConfig(), DEFAULT_INITIAL_SYNC_INDEX))
.collect(Collectors.toList());
if (!threads.isEmpty()) {
initLogSyncThreadPool();
}
}
private void initLogSyncThreadPool() {
// We use cached thread pool here because each LogDispatcherThread will occupy one thread.
// And every LogDispatcherThread won't release its thread in this pool because it won't stop
// unless LogDispatcher stop.
// Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
this.executorService =
IoTDBThreadPoolFactory.newCachedThreadPool(
"LogDispatcher-" + impl.getThisNode().getGroupId());
}
public synchronized void start() {
if (!threads.isEmpty()) {
threads.forEach(executorService::submit);
}
}
public synchronized void stop() {
if (!threads.isEmpty()) {
threads.forEach(LogDispatcherThread::stop);
executorService.shutdownNow();
int timeout = 10;
try {
if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
logger.error("Unable to shutdown LogDispatcher service after {} seconds", timeout);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Unexpected Interruption when closing LogDispatcher service ");
}
}
stopped = true;
}
public synchronized void addLogDispatcherThread(Peer peer, long initialSyncIndex) {
if (stopped) {
return;
}
LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex);
threads.add(thread);
// If the initial replica is 1, the executorService won't be initialized. And when adding
// dispatcher thread, the executorService should be initialized manually
if (this.executorService == null) {
initLogSyncThreadPool();
}
executorService.submit(thread);
}
public synchronized void removeLogDispatcherThread(Peer peer) throws IOException {
if (stopped) {
return;
}
int threadIndex = -1;
for (int i = 0; i < threads.size(); i++) {
if (threads.get(i).peer.equals(peer)) {
threadIndex = i;
break;
}
}
if (threadIndex == -1) {
return;
}
threads.get(threadIndex).stop();
threads.get(threadIndex).cleanup();
threads.remove(threadIndex);
}
public synchronized OptionalLong getMinSyncIndex() {
return threads.stream().mapToLong(LogDispatcherThread::getCurrentSyncIndex).min();
}
public void offer(IndexedConsensusRequest request) {
// we don't need to serialize and offer request when replicaNum is 1.
if (!threads.isEmpty()) {
request.buildSerializedRequests();
synchronized (this) {
threads.forEach(
thread -> {
logger.debug(
"{}->{}: Push a log to the queue, where the queue length is {}",
impl.getThisNode().getGroupId(),
thread.getPeer().getEndpoint().getIp(),
thread.getPendingEntriesSize());
if (!thread.offer(request)) {
logger.debug(
"{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
impl.getThisNode().getGroupId(),
thread.getPeer(),
request.getSearchIndex());
}
});
}
}
}
public long getLogEntriesFromWAL() {
return logEntriesFromWAL.get();
}
public long getLogEntriesFromQueue() {
return logEntriesFromQueue.get();
}
public class LogDispatcherThread implements Runnable {
private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
private static final long START_INDEX = 1;
private final IoTConsensusConfig config;
private final Peer peer;
private final IndexController controller;
// A sliding window class that manages asynchronous pendingBatches
private final SyncStatus syncStatus;
// A queue used to receive asynchronous replication requests
private final BlockingQueue<IndexedConsensusRequest> pendingEntries;
// A container used to cache requests, whose size changes dynamically
private final List<IndexedConsensusRequest> bufferedEntries = new LinkedList<>();
// A reader management class that gets requests from the DataRegion
private final ConsensusReqReader reader =
(ConsensusReqReader) impl.getStateMachine().read(new GetConsensusReqReaderPlan());
private final IoTConsensusMemoryManager iotConsensusMemoryManager =
IoTConsensusMemoryManager.getInstance();
private volatile boolean stopped = false;
private final ConsensusReqReader.ReqIterator walEntryIterator;
private final LogDispatcherThreadMetrics metrics;
public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long initialSyncIndex) {
this.peer = peer;
this.config = config;
this.pendingEntries = new ArrayBlockingQueue<>(config.getReplication().getMaxQueueLength());
this.controller =
new IndexController(
impl.getStorageDir(),
peer,
initialSyncIndex,
config.getReplication().getCheckpointGap());
this.syncStatus = new SyncStatus(controller, config, impl::getSearchIndex);
this.walEntryIterator = reader.getReqIterator(START_INDEX);
this.metrics = new LogDispatcherThreadMetrics(this);
}
public IndexController getController() {
return controller;
}
public long getCurrentSyncIndex() {
return controller.getCurrentIndex();
}
public Peer getPeer() {
return peer;
}
public IoTConsensusConfig getConfig() {
return config;
}
public int getPendingEntriesSize() {
return pendingEntries.size();
}
public int getBufferRequestSize() {
return bufferedEntries.size();
}
/** try to offer a request into queue with memory control. */
public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
return false;
}
boolean success;
try {
success = pendingEntries.offer(indexedConsensusRequest);
} catch (Throwable t) {
// If exception occurs during request offer, the reserved memory should be released
iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize(), true);
throw t;
}
if (!success) {
// If offer failed, the reserved memory should be released
iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize(), true);
}
return success;
}
/** try to remove a request from queue with memory control. */
private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) {
iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize(), true);
}
public void stop() {
stopped = true;
long requestSize = 0;
for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
requestSize += indexedConsensusRequest.getSerializedSize();
}
pendingEntries.clear();
iotConsensusMemoryManager.free(requestSize, true);
requestSize = 0;
for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
requestSize += indexedConsensusRequest.getSerializedSize();
}
iotConsensusMemoryManager.free(requestSize, true);
syncStatus.free();
MetricService.getInstance().removeMetricSet(metrics);
}
public void cleanup() throws IOException {
this.controller.cleanupVersionFiles();
}
public boolean isStopped() {
return stopped;
}
@Override
public void run() {
logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
MetricService.getInstance().addMetricSet(metrics);
try {
Batch batch;
while (!Thread.interrupted() && !stopped) {
long startTime = System.nanoTime();
while ((batch = getBatch()).isEmpty()) {
// we may block here if there is no requests in the queue
IndexedConsensusRequest request =
pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
if (request != null) {
bufferedEntries.add(request);
}
}
MetricService.getInstance()
.getOrCreateHistogram(
Metric.STAGE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
Metric.IOT_CONSENSUS.toString(),
Tag.TYPE.toString(),
"constructBatch",
Tag.REGION.toString(),
peer.getGroupId().toString())
.update((System.nanoTime() - startTime) / batch.getLogEntries().size());
// we may block here if the synchronization pipeline is full
syncStatus.addNextBatch(batch);
logEntriesFromWAL.addAndGet(batch.getLogEntriesNumFromWAL());
logEntriesFromQueue.addAndGet(
batch.getLogEntries().size() - batch.getLogEntriesNumFromWAL());
// sends batch asynchronously and migrates the retry logic into the callback handler
sendBatchAsync(batch, new DispatchLogHandler(this, batch));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Unexpected error in logDispatcher for peer {}", peer, e);
}
logger.info("{}: Dispatcher for {} exits", impl.getThisNode(), peer);
}
public void updateSafelyDeletedSearchIndex() {
// update safely deleted search index to delete outdated info,
// indicating that insert nodes whose search index are before this value can be deleted
// safely
long currentSafelyDeletedSearchIndex = impl.getCurrentSafelyDeletedSearchIndex();
reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
// notify
if (impl.unblockWrite()) {
impl.signal();
}
}
public Batch getBatch() {
long startIndex = syncStatus.getNextSendingIndex();
long maxIndex;
synchronized (impl.getIndexObject()) {
maxIndex = impl.getSearchIndex() + 1;
logger.debug(
"{}: startIndex: {}, maxIndex: {}, pendingEntries size: {}, bufferedEntries size: {}",
impl.getThisNode().getGroupId(),
startIndex,
maxIndex,
getPendingEntriesSize(),
bufferedEntries.size());
// Use drainTo instead of poll to reduce lock overhead
pendingEntries.drainTo(
bufferedEntries,
config.getReplication().getMaxLogEntriesNumPerBatch() - bufferedEntries.size());
}
// remove all request that searchIndex < startIndex
Iterator<IndexedConsensusRequest> iterator = bufferedEntries.iterator();
while (iterator.hasNext()) {
IndexedConsensusRequest request = iterator.next();
if (request.getSearchIndex() < startIndex) {
iterator.remove();
releaseReservedMemory(request);
} else {
break;
}
}
Batch batches = new Batch(config);
// This condition will be executed in several scenarios:
// 1. restart
// 2. The getBatch() is invoked immediately at the moment the PendingEntries are consumed
// up. To prevent inconsistency here, we use the synchronized logic when calculate value of
// `maxIndex`
if (bufferedEntries.isEmpty()) {
constructBatchFromWAL(startIndex, maxIndex, batches);
batches.buildIndex();
logger.debug(
"{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batches);
} else {
// Notice that prev searchIndex >= startIndex
iterator = bufferedEntries.iterator();
IndexedConsensusRequest prev = iterator.next();
// Prevents gap between logs. For example, some requests are not written into the queue when
// the queue is full. In this case, requests need to be loaded from the WAL
if (startIndex != prev.getSearchIndex()) {
constructBatchFromWAL(startIndex, prev.getSearchIndex(), batches);
if (!batches.canAccumulate()) {
batches.buildIndex();
logger.debug(
"{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batches);
return batches;
}
}
constructBatchIndexedFromConsensusRequest(prev, batches);
iterator.remove();
releaseReservedMemory(prev);
if (!batches.canAccumulate()) {
batches.buildIndex();
logger.debug(
"{} : accumulated a {} from queue", impl.getThisNode().getGroupId(), batches);
return batches;
}
while (iterator.hasNext() && batches.canAccumulate()) {
IndexedConsensusRequest current = iterator.next();
// Prevents gap between logs. For example, some logs are not written into the queue when
// the queue is full. In this case, requests need to be loaded from the WAL
if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
constructBatchFromWAL(prev.getSearchIndex() + 1, current.getSearchIndex(), batches);
if (!batches.canAccumulate()) {
batches.buildIndex();
logger.debug(
"gap {} : accumulated a {} from queue and wal when gap",
impl.getThisNode().getGroupId(),
batches);
return batches;
}
}
constructBatchIndexedFromConsensusRequest(current, batches);
prev = current;
// We might not be able to remove all the elements in the bufferedEntries in the
// current function, but that's fine, we'll continue processing these elements in the
// bufferedEntries the next time we go into the function, they're never lost
iterator.remove();
releaseReservedMemory(current);
}
batches.buildIndex();
logger.debug(
"{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batches);
}
return batches;
}
public void sendBatchAsync(Batch batch, DispatchLogHandler handler) {
try {
AsyncIoTConsensusServiceClient client = clientManager.borrowClient(peer.getEndpoint());
TSyncLogEntriesReq req =
new TSyncLogEntriesReq(
selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), batch.getLogEntries());
logger.debug(
"Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
batch.getStartIndex(),
batch.getEndIndex(),
peer.getGroupId().convertToTConsensusGroupId());
client.syncLogEntries(req, handler);
} catch (Exception e) {
logger.error("Can not sync logs to peer {} because", peer, e);
handler.onError(e);
}
}
public SyncStatus getSyncStatus() {
return syncStatus;
}
private void constructBatchFromWAL(long currentIndex, long maxIndex, Batch logBatches) {
logger.debug(
String.format(
"DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d",
peer.getGroupId().getId(), peer.getEndpoint().getIp(), currentIndex, maxIndex));
// targetIndex is the index of request that we need to find
long targetIndex = currentIndex;
// Even if there is no WAL files, these code won't produce error.
walEntryIterator.skipTo(targetIndex);
while (targetIndex < maxIndex && logBatches.canAccumulate()) {
logger.debug("construct from WAL for one Entry, index : {}", targetIndex);
try {
walEntryIterator.waitForNextReady();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("wait for next WAL entry is interrupted");
}
IndexedConsensusRequest data = walEntryIterator.next();
if (data.getSearchIndex() < targetIndex) {
// if the index of request is smaller than currentIndex, then continue
logger.warn(
"search for one Entry which index is {}, but find a smaller one, index : {}",
targetIndex,
data.getSearchIndex());
continue;
} else if (data.getSearchIndex() > targetIndex) {
logger.warn(
"search for one Entry which index is {}, but find a larger one, index : {}",
targetIndex,
data.getSearchIndex());
if (data.getSearchIndex() >= maxIndex) {
// if the index of request is larger than maxIndex, then finish
break;
}
}
targetIndex = data.getSearchIndex() + 1;
data.buildSerializedRequests();
// construct request from wal
logBatches.addTLogEntry(
new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), true));
}
}
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, Batch logBatches) {
logBatches.addTLogEntry(
new TLogEntry(request.getSerializedRequests(), request.getSearchIndex(), false));
}
}
}