| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.service.accord; |
| |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Stream; |
| |
| import accord.api.Agent; |
| import accord.api.DataStore; |
| import accord.api.Journal; |
| import accord.api.LocalListeners; |
| import accord.api.ProgressLog; |
| import accord.local.CommandStores; |
| import accord.local.NodeCommandStoreService; |
| import accord.local.SequentialAsyncExecutor; |
| import accord.local.ShardDistributor; |
| import accord.utils.RandomSource; |
| |
| import org.apache.cassandra.cache.CacheSize; |
| import org.apache.cassandra.concurrent.ScheduledExecutors; |
| import org.apache.cassandra.concurrent.Shutdownable; |
| import org.apache.cassandra.config.AccordSpec.QueueShardModel; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory; |
| |
| import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD; |
| import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount; |
| import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueSubmissionModel; |
| import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITHOUT_LOCK; |
| import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK; |
| import static org.apache.cassandra.service.accord.AccordExecutor.constant; |
| import static org.apache.cassandra.utils.Clock.Global.nanoTime; |
| |
| public class AccordCommandStores extends CommandStores implements CacheSize, Shutdownable |
| { |
| private final AccordExecutor[] executors; |
| private final int mask; |
| |
| private long cacheSize, workingSetSize; |
| private int maxQueuedLoads, maxQueuedRangeLoads; |
| private boolean shrinkingOn; |
| |
| AccordCommandStores(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random, |
| ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenerFactory, |
| Journal journal, AccordExecutor[] executors) |
| { |
| super(node, agent, store, random, journal, shardDistributor, progressLogFactory, listenerFactory, |
| AccordCommandStore.factory(id -> executors[id % executors.length])); |
| this.executors = executors; |
| this.mask = Integer.highestOneBit(executors.length) - 1; |
| cacheSize = DatabaseDescriptor.getAccordCacheSizeInMiB() << 20; |
| workingSetSize = DatabaseDescriptor.getAccordWorkingSetSizeInMiB() << 20; |
| maxQueuedLoads = DatabaseDescriptor.getAccordMaxQueuedLoadCount(); |
| maxQueuedRangeLoads = DatabaseDescriptor.getAccordMaxQueuedRangeLoadCount(); |
| shrinkingOn = DatabaseDescriptor.getAccordCacheShrinkingOn(); |
| refreshCapacities(); |
| ScheduledExecutors.scheduledFastTasks.scheduleWithFixedDelay(() -> { |
| for (AccordExecutor executor : executors) |
| { |
| executor.executeDirectlyWithLock(() -> { |
| executor.cacheExclusive().processNoEvictQueue(); |
| }); |
| } |
| }, 1L, 1L, TimeUnit.SECONDS); |
| } |
| |
| static Factory factory() |
| { |
| return (NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) -> { |
| AccordExecutor[] executors = new AccordExecutor[getAccordQueueShardCount()]; |
| AccordExecutorFactory factory; |
| int maxThreads = Integer.MAX_VALUE; |
| switch (getAccordQueueSubmissionModel()) |
| { |
| default: throw new AssertionError("Unhandled QueueSubmissionModel: " + getAccordQueueSubmissionModel()); |
| case SYNC: factory = AccordExecutorSyncSubmit::new; break; |
| case SEMI_SYNC: factory = AccordExecutorSemiSyncSubmit::new; break; |
| case ASYNC: factory = AccordExecutorAsyncSubmit::new; break; |
| case EXEC_ST: |
| factory = AccordExecutorSimple::new; |
| maxThreads = 1; |
| break; |
| } |
| |
| for (int id = 0; id < executors.length; id++) |
| { |
| QueueShardModel shardModel = DatabaseDescriptor.getAccordQueueShardModel(); |
| String baseName = AccordExecutor.class.getSimpleName() + '[' + id; |
| int threads = Math.min(maxThreads, Math.max(DatabaseDescriptor.getAccordConcurrentOps() / getAccordQueueShardCount(), 1)); |
| switch (shardModel) |
| { |
| case THREAD_PER_SHARD: |
| case THREAD_PER_SHARD_SYNC_QUEUE: |
| executors[id] = factory.get(id, shardModel == THREAD_PER_SHARD ? RUN_WITHOUT_LOCK : RUN_WITH_LOCK, 1, constant(baseName + ']'), agent); |
| break; |
| case THREAD_POOL_PER_SHARD: |
| executors[id] = factory.get(id, RUN_WITHOUT_LOCK, threads, num -> baseName + ',' + num + ']', agent); |
| break; |
| } |
| } |
| |
| return new AccordCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, journal, executors); |
| }; |
| } |
| |
| @Override |
| public SequentialAsyncExecutor someSequentialExecutor() |
| { |
| int idx = ((int) Thread.currentThread().getId()) & mask; |
| return executors[idx].newSequentialExecutor(); |
| } |
| |
| public synchronized void setCapacity(long bytes) |
| { |
| cacheSize = bytes; |
| refreshCapacities(); |
| } |
| |
| public synchronized void setWorkingSetSize(long bytes) |
| { |
| workingSetSize = bytes; |
| refreshCapacities(); |
| } |
| |
| public synchronized void setCapacityAndWorkingSetSize(long newCacheSize, long newWorkingSetSize) |
| { |
| cacheSize = newCacheSize; |
| workingSetSize = newWorkingSetSize; |
| refreshCapacities(); |
| } |
| |
| public synchronized void setMaxQueuedLoads(int total, int range) |
| { |
| maxQueuedLoads = total; |
| maxQueuedRangeLoads = range; |
| refreshCapacities(); |
| } |
| |
| public synchronized void setShrinking(boolean on) |
| { |
| shrinkingOn = on; |
| } |
| |
| @Override |
| public long capacity() |
| { |
| return cacheSize; |
| } |
| |
| @Override |
| public int size() |
| { |
| int size = 0; |
| for (AccordExecutor executor : executors) |
| size += executor.size(); |
| return size; |
| } |
| |
| @Override |
| public long weightedSize() |
| { |
| long size = 0; |
| for (AccordExecutor executor : executors) |
| size += executor.weightedSize(); |
| return size; |
| } |
| |
| synchronized void refreshCapacities() |
| { |
| long capacityPerExecutor = cacheSize / executors.length; |
| long workingSetPerExecutor = workingSetSize < 0 ? Long.MAX_VALUE : workingSetSize / executors.length; |
| int maxLoadsPerExecutor = (maxQueuedLoads + executors.length - 1) / executors.length; |
| int maxRangeLoadsPerExecutor = (maxQueuedRangeLoads + executors.length - 1) / executors.length; |
| for (AccordExecutor executor : executors) |
| { |
| executor.executeDirectlyWithLock(() -> { |
| executor.setCapacity(capacityPerExecutor); |
| executor.setWorkingSetSize(workingSetPerExecutor); |
| executor.setMaxQueuedLoads(maxLoadsPerExecutor, maxRangeLoadsPerExecutor); |
| executor.cacheExclusive().setShrinkingOn(shrinkingOn); |
| }); |
| } |
| } |
| |
| public List<AccordExecutor> executors() |
| { |
| return Arrays.asList(executors.clone()); |
| } |
| |
| public void waitForQuiescence() |
| { |
| for (AccordExecutor executor : this.executors) |
| executor.waitForQuiescence(); |
| } |
| |
| @Override |
| public boolean isTerminated() |
| { |
| return Stream.of(executors).allMatch(AccordExecutor::isTerminated); |
| } |
| |
| @Override |
| public synchronized void shutdown() |
| { |
| super.shutdown(); |
| for (AccordExecutor executor : executors) |
| executor.shutdown(); |
| } |
| |
| @Override |
| public Object shutdownNow() |
| { |
| shutdown(); |
| return null; |
| } |
| |
| @Override |
| public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException |
| { |
| long deadline = nanoTime() + units.toNanos(timeout); |
| for (AccordExecutor executor : executors) |
| { |
| long wait = Math.max(1, deadline - nanoTime()); |
| if (!executor.awaitTermination(wait, TimeUnit.NANOSECONDS)) |
| return false; |
| } |
| return true; |
| } |
| } |