| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.consensus.multileader; |
| |
| import org.apache.iotdb.common.rpc.thrift.TEndPoint; |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.commons.client.IClientManager; |
| import org.apache.iotdb.consensus.IStateMachine; |
| import org.apache.iotdb.consensus.common.DataSet; |
| import org.apache.iotdb.consensus.common.Peer; |
| import org.apache.iotdb.consensus.common.request.IConsensusRequest; |
| import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; |
| import org.apache.iotdb.consensus.config.MultiLeaderConfig; |
| import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException; |
| import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient; |
| import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient; |
| import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher; |
| import org.apache.iotdb.consensus.multileader.snapshot.SnapshotFragmentReader; |
| import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq; |
| import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes; |
| import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq; |
| import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes; |
| import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq; |
| import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes; |
| import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelReq; |
| import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelRes; |
| import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq; |
| import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentRes; |
| import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq; |
| import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes; |
| import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader; |
| import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| import org.apache.iotdb.tsfile.utils.PublicBAOS; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.nio.file.StandardOpenOption; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| public class MultiLeaderServerImpl { |
| |
| private static final String CONFIGURATION_FILE_NAME = "configuration.dat"; |
| private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp"; |
| private static final String SNAPSHOT_DIR_NAME = "snapshot"; |
| |
| private final Logger logger = LoggerFactory.getLogger(MultiLeaderServerImpl.class); |
| |
| private final Peer thisNode; |
| private final IStateMachine stateMachine; |
| private final Lock stateMachineLock = new ReentrantLock(); |
| private final Condition stateMachineCondition = stateMachineLock.newCondition(); |
| private final String storageDir; |
| private final List<Peer> configuration; |
| private final AtomicLong index; |
| private final LogDispatcher logDispatcher; |
| private final MultiLeaderConfig config; |
| private final ConsensusReqReader reader; |
| private boolean active; |
| private String latestSnapshotId; |
| private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager; |
| |
| public MultiLeaderServerImpl( |
| String storageDir, |
| Peer thisNode, |
| List<Peer> configuration, |
| IStateMachine stateMachine, |
| IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager, |
| IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager, |
| MultiLeaderConfig config) { |
| this.active = true; |
| this.storageDir = storageDir; |
| this.thisNode = thisNode; |
| this.stateMachine = stateMachine; |
| this.syncClientManager = syncClientManager; |
| this.configuration = configuration; |
| if (configuration.isEmpty()) { |
| recoverConfiguration(); |
| } else { |
| persistConfiguration(); |
| } |
| this.config = config; |
| this.logDispatcher = new LogDispatcher(this, clientManager); |
| reader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan()); |
| long currentSearchIndex = reader.getCurrentSearchIndex(); |
| if (1 == configuration.size()) { |
| // only one configuration means single replica. |
| reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE); |
| } |
| this.index = new AtomicLong(currentSearchIndex); |
| } |
| |
| public IStateMachine getStateMachine() { |
| return stateMachine; |
| } |
| |
| public void start() { |
| stateMachine.start(); |
| logDispatcher.start(); |
| } |
| |
| public void stop() { |
| logDispatcher.stop(); |
| stateMachine.stop(); |
| } |
| |
| /** |
| * records the index of the log and writes locally, and then asynchronous replication is performed |
| */ |
| public TSStatus write(IConsensusRequest request) { |
| stateMachineLock.lock(); |
| try { |
| if (needBlockWrite()) { |
| logger.info( |
| "[Throttle Down] index:{}, safeIndex:{}", |
| getIndex(), |
| getCurrentSafelyDeletedSearchIndex()); |
| try { |
| boolean timeout = |
| !stateMachineCondition.await( |
| config.getReplication().getThrottleTimeOutMs(), TimeUnit.MILLISECONDS); |
| if (timeout) { |
| return RpcUtils.getStatus( |
| TSStatusCode.WRITE_PROCESS_REJECT, |
| "Reject write because there are too many requests need to process"); |
| } |
| } catch (InterruptedException e) { |
| logger.error("Failed to throttle down because ", e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| IndexedConsensusRequest indexedConsensusRequest = |
| buildIndexedConsensusRequestForLocalRequest(request); |
| if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) { |
| logger.info( |
| "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}", |
| thisNode.getGroupId(), |
| getCurrentSafelyDeletedSearchIndex(), |
| indexedConsensusRequest.getSearchIndex()); |
| } |
| // TODO wal and memtable |
| TSStatus result = stateMachine.write(indexedConsensusRequest); |
| if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| // The index is used when constructing batch in LogDispatcher. If its value |
| // increases but the corresponding request does not exist or is not put into |
| // the queue, the dispatcher will try to find the request in WAL. This behavior |
| // is not expected and will slow down the preparation speed for batch. |
| // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are |
| // in one transaction. |
| synchronized (index) { |
| logDispatcher.offer(indexedConsensusRequest); |
| index.incrementAndGet(); |
| } |
| } else { |
| logger.debug( |
| "{}: write operation failed. searchIndex: {}. Code: {}", |
| thisNode.getGroupId(), |
| indexedConsensusRequest.getSearchIndex(), |
| result.getCode()); |
| } |
| return result; |
| } finally { |
| stateMachineLock.unlock(); |
| } |
| } |
| |
| public DataSet read(IConsensusRequest request) { |
| return stateMachine.read(request); |
| } |
| |
| public void takeSnapshot() throws ConsensusGroupAddPeerException { |
| try { |
| latestSnapshotId = |
| String.format( |
| "%s_%s_%d", |
| SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), System.currentTimeMillis()); |
| File snapshotDir = new File(storageDir, latestSnapshotId); |
| if (snapshotDir.exists()) { |
| FileUtils.deleteDirectory(snapshotDir); |
| } |
| if (!snapshotDir.mkdirs()) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("%s: cannot mkdir for snapshot", thisNode.getGroupId())); |
| } |
| if (!stateMachine.takeSnapshot(snapshotDir)) { |
| throw new ConsensusGroupAddPeerException("unknown error when taking snapshot"); |
| } |
| } catch (IOException e) { |
| throw new ConsensusGroupAddPeerException("error when taking snapshot", e); |
| } |
| } |
| |
| public void transitSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException { |
| File snapshotDir = new File(storageDir, latestSnapshotId); |
| List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir); |
| System.out.println(snapshotPaths); |
| try (SyncMultiLeaderServiceClient client = |
| syncClientManager.borrowClient(targetPeer.getEndpoint())) { |
| for (Path path : snapshotPaths) { |
| SnapshotFragmentReader reader = new SnapshotFragmentReader(latestSnapshotId, path); |
| while (reader.hasNext()) { |
| TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq(); |
| req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId()); |
| // receiveSnapshotFragment(latestSnapshotId, req.filePath, req.fileChunk); |
| TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req); |
| if (!isSuccess(res.getStatus())) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("error when sending snapshot fragment to %s", targetPeer)); |
| } |
| } |
| reader.close(); |
| } |
| } catch (IOException | TException e) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("error when send snapshot file to %s", targetPeer), e); |
| } |
| } |
| |
| public void receiveSnapshotFragment( |
| String snapshotId, String originalFilePath, ByteBuffer fileChunk) |
| throws ConsensusGroupAddPeerException { |
| try { |
| String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath); |
| File targetFile = new File(storageDir, targetFilePath); |
| Path parentDir = Paths.get(targetFile.getParent()); |
| if (!Files.exists(parentDir)) { |
| Files.createDirectories(parentDir); |
| } |
| Files.write( |
| Paths.get(targetFile.getAbsolutePath()), |
| fileChunk.array(), |
| StandardOpenOption.CREATE, |
| StandardOpenOption.APPEND); |
| } catch (IOException e) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("error when receiving snapshot %s", snapshotId), e); |
| } |
| } |
| |
| private String calculateSnapshotPath(String snapshotId, String originalFilePath) |
| throws ConsensusGroupAddPeerException { |
| if (!originalFilePath.contains(snapshotId)) { |
| throw new ConsensusGroupAddPeerException( |
| String.format( |
| "invalid snapshot file. snapshotId: %s, filePath: %s", snapshotId, originalFilePath)); |
| } |
| return originalFilePath.substring(originalFilePath.indexOf(snapshotId)); |
| } |
| |
| public void loadSnapshot(String snapshotId) { |
| // TODO: (xingtanzjr) throw exception if the snapshot load failed |
| stateMachine.loadSnapshot(new File(storageDir, snapshotId)); |
| } |
| |
| public void inactivePeer(Peer peer) throws ConsensusGroupAddPeerException { |
| try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) { |
| TInactivatePeerRes res = |
| client.inactivatePeer( |
| new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId())); |
| if (!isSuccess(res.status)) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("error when inactivating %s. %s", peer, res.getStatus())); |
| } |
| } catch (IOException | TException e) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("error when inactivating %s", peer), e); |
| } |
| } |
| |
| public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupAddPeerException { |
| try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) { |
| TTriggerSnapshotLoadRes res = |
| client.triggerSnapshotLoad( |
| new TTriggerSnapshotLoadReq( |
| thisNode.getGroupId().convertToTConsensusGroupId(), latestSnapshotId)); |
| if (!isSuccess(res.status)) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("error when triggering snapshot load %s. %s", peer, res.getStatus())); |
| } |
| } catch (IOException | TException e) { |
| throw new ConsensusGroupAddPeerException(String.format("error when activating %s", peer), e); |
| } |
| } |
| |
| public void activePeer(Peer peer) throws ConsensusGroupAddPeerException { |
| try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) { |
| TActivatePeerRes res = |
| client.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId())); |
| if (!isSuccess(res.status)) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("error when activating %s. %s", peer, res.getStatus())); |
| } |
| } catch (IOException | TException e) { |
| throw new ConsensusGroupAddPeerException(String.format("error when activating %s", peer), e); |
| } |
| } |
| |
| public void notifyPeersToBuildSyncLogChannel(Peer targetPeer) |
| throws ConsensusGroupAddPeerException { |
| // The configuration will be modified during iterating because we will add the targetPeer to |
| // configuration |
| List<Peer> currentMembers = new ArrayList<>(this.configuration); |
| logger.info( |
| "[MultiLeaderConsensus] notify current peers to build sync log. group member: {}, target: {}", |
| currentMembers, |
| targetPeer); |
| for (Peer peer : currentMembers) { |
| logger.info("[MultiLeaderConsensus] build sync log channel from {}", peer); |
| if (peer.equals(thisNode)) { |
| // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the |
| // snapshot produced by thisNode |
| buildSyncLogChannel(targetPeer, index.get()); |
| } else { |
| // use RPC to tell other peers to build sync log channel to target peer |
| try (SyncMultiLeaderServiceClient client = |
| syncClientManager.borrowClient(peer.getEndpoint())) { |
| TBuildSyncLogChannelRes res = |
| client.buildSyncLogChannel( |
| new TBuildSyncLogChannelReq( |
| targetPeer.getGroupId().convertToTConsensusGroupId(), |
| targetPeer.getEndpoint())); |
| if (!isSuccess(res.status)) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("build sync log channel failed from %s to %s", peer, targetPeer)); |
| } |
| } catch (IOException | TException e) { |
| // We use a simple way to deal with the connection issue when notifying other nodes to |
| // build sync log. If the un-responsible peer is the peer which will be removed, we cannot |
| // suspend the operation and need to skip it. In order to keep the mechanism works fine, |
| // we will skip the peer which cannot be reached. |
| // If following error message appears, the un-responsible peer should be removed manually |
| // after current operation |
| // TODO: (xingtanzjr) design more reliable way for MultiLeaderConsensus |
| logger.error( |
| "cannot notify {} to build sync log channel. Please check the status of this node manually", |
| peer, |
| e); |
| } |
| } |
| } |
| } |
| |
| public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer) |
| throws ConsensusGroupAddPeerException { |
| // The configuration will be modified during iterating because we will add the targetPeer to |
| // configuration |
| List<Peer> currentMembers = new ArrayList<>(this.configuration); |
| for (Peer peer : currentMembers) { |
| if (peer.equals(targetPeer)) { |
| // if the targetPeer is the same as current peer, skip it because removing itself is illegal |
| continue; |
| } |
| if (peer.equals(thisNode)) { |
| removeSyncLogChannel(targetPeer); |
| } else { |
| // use RPC to tell other peers to build sync log channel to target peer |
| try (SyncMultiLeaderServiceClient client = |
| syncClientManager.borrowClient(peer.getEndpoint())) { |
| TRemoveSyncLogChannelRes res = |
| client.removeSyncLogChannel( |
| new TRemoveSyncLogChannelReq( |
| targetPeer.getGroupId().convertToTConsensusGroupId(), |
| targetPeer.getEndpoint())); |
| if (!isSuccess(res.status)) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("remove sync log channel failed from %s to %s", peer, targetPeer)); |
| } |
| } catch (IOException | TException e) { |
| throw new ConsensusGroupAddPeerException( |
| String.format("error when removing sync log channel to %s", peer), e); |
| } |
| } |
| } |
| } |
| |
| private boolean isSuccess(TSStatus status) { |
| return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode(); |
| } |
| |
| /** build SyncLog channel with safeIndex as the default initial sync index */ |
| public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException { |
| buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex()); |
| } |
| |
| public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex) |
| throws ConsensusGroupAddPeerException { |
| // step 1, build sync channel in LogDispatcher |
| logger.info( |
| "[MultiLeaderConsensus] build sync log channel to {} with initialSyncIndex {}", |
| targetPeer, |
| initialSyncIndex); |
| logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex); |
| // step 2, update configuration |
| configuration.add(targetPeer); |
| logger.info("[MultiLeaderConsensus] persist new configuration: {}", configuration); |
| persistConfigurationUpdate(); |
| } |
| |
| public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException { |
| try { |
| logDispatcher.removeLogDispatcherThread(targetPeer); |
| logger.info("[MultiLeaderConsensus] log dispatcher to {} removed and cleanup", targetPeer); |
| configuration.remove(targetPeer); |
| persistConfigurationUpdate(); |
| logger.info("[MultiLeaderConsensus] configuration updated to {}", this.configuration); |
| } catch (IOException e) { |
| throw new ConsensusGroupAddPeerException("error when remove LogDispatcherThread", e); |
| } |
| } |
| |
| public void persistConfiguration() { |
| try (PublicBAOS publicBAOS = new PublicBAOS(); |
| DataOutputStream outputStream = new DataOutputStream(publicBAOS)) { |
| serializeConfigurationTo(outputStream); |
| Files.write( |
| Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()), |
| publicBAOS.getBuf()); |
| } catch (IOException e) { |
| // TODO: (xingtanzjr) need to handle the IOException because the MultiLeaderConsensus won't |
| // work expectedly |
| // if the exception occurs |
| logger.error("Unexpected error occurs when persisting configuration", e); |
| } |
| } |
| |
| public void persistConfigurationUpdate() throws ConsensusGroupAddPeerException { |
| try (PublicBAOS publicBAOS = new PublicBAOS(); |
| DataOutputStream outputStream = new DataOutputStream(publicBAOS)) { |
| serializeConfigurationTo(outputStream); |
| Path tmpConfigurationPath = |
| Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath()); |
| Path configurationPath = |
| Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()); |
| Files.write(tmpConfigurationPath, publicBAOS.getBuf()); |
| Files.delete(configurationPath); |
| Files.move(tmpConfigurationPath, configurationPath); |
| } catch (IOException e) { |
| throw new ConsensusGroupAddPeerException( |
| "Unexpected error occurs when update configuration", e); |
| } |
| } |
| |
| private void serializeConfigurationTo(DataOutputStream outputStream) throws IOException { |
| outputStream.writeInt(configuration.size()); |
| for (Peer peer : configuration) { |
| peer.serialize(outputStream); |
| } |
| } |
| |
| public void recoverConfiguration() { |
| ByteBuffer buffer; |
| try { |
| Path tmpConfigurationPath = |
| Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath()); |
| Path configurationPath = |
| Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()); |
| // If the tmpConfigurationPath exists, it means the `persistConfigurationUpdate` is |
| // interrupted |
| // unexpectedly, we need substitute configuration with tmpConfiguration file |
| if (Files.exists(tmpConfigurationPath)) { |
| if (Files.exists(configurationPath)) { |
| Files.delete(configurationPath); |
| } |
| Files.move(tmpConfigurationPath, configurationPath); |
| } |
| buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath)); |
| int size = buffer.getInt(); |
| for (int i = 0; i < size; i++) { |
| configuration.add(Peer.deserialize(buffer)); |
| } |
| logger.info("Recover multiLeader, configuration: {}", configuration); |
| } catch (IOException e) { |
| logger.error("Unexpected error occurs when recovering configuration", e); |
| } |
| } |
| |
| public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest( |
| IConsensusRequest request) { |
| return new IndexedConsensusRequest(index.get() + 1, Collections.singletonList(request)); |
| } |
| |
| public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest( |
| long syncIndex, List<IConsensusRequest> requests) { |
| return new IndexedConsensusRequest( |
| ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests); |
| } |
| |
| /** |
| * In the case of multiple copies, the minimum synchronization index is selected. In the case of |
| * single copies, the current index is selected |
| */ |
| public long getCurrentSafelyDeletedSearchIndex() { |
| return logDispatcher.getMinSyncIndex().orElseGet(index::get); |
| } |
| |
| public String getStorageDir() { |
| return storageDir; |
| } |
| |
| public Peer getThisNode() { |
| return thisNode; |
| } |
| |
| public List<Peer> getConfiguration() { |
| return configuration; |
| } |
| |
| public long getIndex() { |
| return index.get(); |
| } |
| |
| public MultiLeaderConfig getConfig() { |
| return config; |
| } |
| |
| public boolean needBlockWrite() { |
| return reader.getTotalSize() > config.getReplication().getWalThrottleThreshold(); |
| } |
| |
| public boolean unblockWrite() { |
| return reader.getTotalSize() < config.getReplication().getWalThrottleThreshold(); |
| } |
| |
| public void signal() { |
| stateMachineLock.lock(); |
| try { |
| stateMachineCondition.signalAll(); |
| } finally { |
| stateMachineLock.unlock(); |
| } |
| } |
| |
| public AtomicLong getIndexObject() { |
| return index; |
| } |
| |
| public boolean isReadOnly() { |
| return stateMachine.isReadOnly(); |
| } |
| |
| public boolean isActive() { |
| return active; |
| } |
| |
| public void setActive(boolean active) { |
| logger.info("set {} active status to {}", this.thisNode, active); |
| this.active = active; |
| } |
| } |