blob: d84794ab60a59645b38594fe3f5c340a25d125e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.local;
import accord.api.*;
import accord.api.ProgressLog;
import accord.api.DataStore;
import accord.local.CommandStores.RangesForEpochHolder;
import accord.utils.async.AsyncChain;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
/**
* Single threaded internal shard of accord transaction metadata
*/
public abstract class CommandStore implements AgentExecutor
{
public interface Factory
{
CommandStore create(int id,
NodeTimeService time,
Agent agent,
DataStore store,
ProgressLog.Factory progressLogFactory,
RangesForEpochHolder rangesForEpoch);
}
private static final ThreadLocal<CommandStore> CURRENT_STORE = new ThreadLocal<>();
protected final int id;
protected final NodeTimeService time;
protected final Agent agent;
protected final DataStore store;
protected final ProgressLog progressLog;
protected final RangesForEpochHolder rangesForEpochHolder;
protected CommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpochHolder)
{
this.id = id;
this.time = time;
this.agent = agent;
this.store = store;
this.progressLog = progressLogFactory.create(this);
this.rangesForEpochHolder = rangesForEpochHolder;
}
public int id()
{
return id;
}
@Override
public Agent agent()
{
return agent;
}
public abstract boolean inStore();
public abstract AsyncChain<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer);
public abstract <T> AsyncChain<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> apply);
public abstract void shutdown();
protected void unsafeRunIn(Runnable fn)
{
CommandStore prev = maybeCurrent();
CURRENT_STORE.set(this);
try
{
fn.run();
}
finally
{
if (prev == null) CURRENT_STORE.remove();
else CURRENT_STORE.set(prev);
}
}
protected <T> T unsafeRunIn(Callable<T> fn) throws Exception
{
CommandStore prev = maybeCurrent();
CURRENT_STORE.set(this);
try
{
return fn.call();
}
finally
{
if (prev == null) CURRENT_STORE.remove();
else CURRENT_STORE.set(prev);
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"id=" + id +
'}';
}
@Nullable
public static CommandStore maybeCurrent()
{
return CURRENT_STORE.get();
}
public static CommandStore current()
{
CommandStore cs = maybeCurrent();
if (cs == null)
throw new IllegalStateException("Attempted to access current CommandStore, but not running in a CommandStore");
return cs;
}
protected static void register(CommandStore store)
{
if (!store.inStore())
throw new IllegalStateException("Unable to register a CommandStore when not running in it; store " + store);
CURRENT_STORE.set(store);
}
public static void checkInStore()
{
CommandStore store = maybeCurrent();
if (store == null) throw new IllegalStateException("Expected to be running in a CommandStore but is not");
}
public static void checkNotInStore()
{
CommandStore store = maybeCurrent();
if (store != null)
throw new IllegalStateException("Expected to not be running in a CommandStore, but running in " + store);
}
}