blob: b885ae8424f0e97e70f5e372bb115fe7e1b14d50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.accord;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.Journal;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.local.CommandStores;
import accord.local.NodeCommandStoreService;
import accord.local.SequentialAsyncExecutor;
import accord.local.ShardDistributor;
import accord.utils.RandomSource;
import org.apache.cassandra.cache.CacheSize;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Shutdownable;
import org.apache.cassandra.config.AccordSpec.QueueShardModel;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory;
import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD;
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueSubmissionModel;
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITHOUT_LOCK;
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
import static org.apache.cassandra.service.accord.AccordExecutor.constant;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
public class AccordCommandStores extends CommandStores implements CacheSize, Shutdownable
{
private final AccordExecutor[] executors;
private final int mask;
private long cacheSize, workingSetSize;
private int maxQueuedLoads, maxQueuedRangeLoads;
private boolean shrinkingOn;
AccordCommandStores(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random,
ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenerFactory,
Journal journal, AccordExecutor[] executors)
{
super(node, agent, store, random, journal, shardDistributor, progressLogFactory, listenerFactory,
AccordCommandStore.factory(id -> executors[id % executors.length]));
this.executors = executors;
this.mask = Integer.highestOneBit(executors.length) - 1;
cacheSize = DatabaseDescriptor.getAccordCacheSizeInMiB() << 20;
workingSetSize = DatabaseDescriptor.getAccordWorkingSetSizeInMiB() << 20;
maxQueuedLoads = DatabaseDescriptor.getAccordMaxQueuedLoadCount();
maxQueuedRangeLoads = DatabaseDescriptor.getAccordMaxQueuedRangeLoadCount();
shrinkingOn = DatabaseDescriptor.getAccordCacheShrinkingOn();
refreshCapacities();
ScheduledExecutors.scheduledFastTasks.scheduleWithFixedDelay(() -> {
for (AccordExecutor executor : executors)
{
executor.executeDirectlyWithLock(() -> {
executor.cacheExclusive().processNoEvictQueue();
});
}
}, 1L, 1L, TimeUnit.SECONDS);
}
static Factory factory()
{
return (NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) -> {
AccordExecutor[] executors = new AccordExecutor[getAccordQueueShardCount()];
AccordExecutorFactory factory;
int maxThreads = Integer.MAX_VALUE;
switch (getAccordQueueSubmissionModel())
{
default: throw new AssertionError("Unhandled QueueSubmissionModel: " + getAccordQueueSubmissionModel());
case SYNC: factory = AccordExecutorSyncSubmit::new; break;
case SEMI_SYNC: factory = AccordExecutorSemiSyncSubmit::new; break;
case ASYNC: factory = AccordExecutorAsyncSubmit::new; break;
case EXEC_ST:
factory = AccordExecutorSimple::new;
maxThreads = 1;
break;
}
for (int id = 0; id < executors.length; id++)
{
QueueShardModel shardModel = DatabaseDescriptor.getAccordQueueShardModel();
String baseName = AccordExecutor.class.getSimpleName() + '[' + id;
int threads = Math.min(maxThreads, Math.max(DatabaseDescriptor.getAccordConcurrentOps() / getAccordQueueShardCount(), 1));
switch (shardModel)
{
case THREAD_PER_SHARD:
case THREAD_PER_SHARD_SYNC_QUEUE:
executors[id] = factory.get(id, shardModel == THREAD_PER_SHARD ? RUN_WITHOUT_LOCK : RUN_WITH_LOCK, 1, constant(baseName + ']'), agent);
break;
case THREAD_POOL_PER_SHARD:
executors[id] = factory.get(id, RUN_WITHOUT_LOCK, threads, num -> baseName + ',' + num + ']', agent);
break;
}
}
return new AccordCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, journal, executors);
};
}
@Override
public SequentialAsyncExecutor someSequentialExecutor()
{
int idx = ((int) Thread.currentThread().getId()) & mask;
return executors[idx].newSequentialExecutor();
}
public synchronized void setCapacity(long bytes)
{
cacheSize = bytes;
refreshCapacities();
}
public synchronized void setWorkingSetSize(long bytes)
{
workingSetSize = bytes;
refreshCapacities();
}
public synchronized void setCapacityAndWorkingSetSize(long newCacheSize, long newWorkingSetSize)
{
cacheSize = newCacheSize;
workingSetSize = newWorkingSetSize;
refreshCapacities();
}
public synchronized void setMaxQueuedLoads(int total, int range)
{
maxQueuedLoads = total;
maxQueuedRangeLoads = range;
refreshCapacities();
}
public synchronized void setShrinking(boolean on)
{
shrinkingOn = on;
}
@Override
public long capacity()
{
return cacheSize;
}
@Override
public int size()
{
int size = 0;
for (AccordExecutor executor : executors)
size += executor.size();
return size;
}
@Override
public long weightedSize()
{
long size = 0;
for (AccordExecutor executor : executors)
size += executor.weightedSize();
return size;
}
synchronized void refreshCapacities()
{
long capacityPerExecutor = cacheSize / executors.length;
long workingSetPerExecutor = workingSetSize < 0 ? Long.MAX_VALUE : workingSetSize / executors.length;
int maxLoadsPerExecutor = (maxQueuedLoads + executors.length - 1) / executors.length;
int maxRangeLoadsPerExecutor = (maxQueuedRangeLoads + executors.length - 1) / executors.length;
for (AccordExecutor executor : executors)
{
executor.executeDirectlyWithLock(() -> {
executor.setCapacity(capacityPerExecutor);
executor.setWorkingSetSize(workingSetPerExecutor);
executor.setMaxQueuedLoads(maxLoadsPerExecutor, maxRangeLoadsPerExecutor);
executor.cacheExclusive().setShrinkingOn(shrinkingOn);
});
}
}
public List<AccordExecutor> executors()
{
return Arrays.asList(executors.clone());
}
public void waitForQuiescence()
{
for (AccordExecutor executor : this.executors)
executor.waitForQuiescence();
}
@Override
public boolean isTerminated()
{
return Stream.of(executors).allMatch(AccordExecutor::isTerminated);
}
@Override
public synchronized void shutdown()
{
super.shutdown();
for (AccordExecutor executor : executors)
executor.shutdown();
}
@Override
public Object shutdownNow()
{
shutdown();
return null;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException
{
long deadline = nanoTime() + units.toNanos(timeout);
for (AccordExecutor executor : executors)
{
long wait = Math.max(1, deadline - nanoTime());
if (!executor.awaitTermination(wait, TimeUnit.NANOSECONDS))
return false;
}
return true;
}
}