blob: 0294826265017dcc601015ff8e819f0236203b94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.tx.storage.state;
import static org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE_ACCESS;
import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsTo;
import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.worker.ThreadAssertingCursor;
import org.apache.ignite.internal.worker.ThreadAssertions;
import org.jetbrains.annotations.Nullable;
/**
* {@link TxStateStorage} that performs thread assertions when doing read/write operations.
*
* @see ThreadAssertions
*/
public class ThreadAssertingTxStateStorage implements TxStateStorage {
private final TxStateStorage storage;
/** Constructor. */
public ThreadAssertingTxStateStorage(TxStateStorage storage) {
this.storage = storage;
}
@Override
public @Nullable TxMeta get(UUID txId) {
assertThreadAllowsToRead();
return storage.get(txId);
}
@Override
public void put(UUID txId, TxMeta txMeta) {
assertThreadAllowsToWrite();
storage.put(txId, txMeta);
}
@Override
public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex, long commandTerm) {
assertThreadAllowsToWrite();
return storage.compareAndSet(txId, txStateExpected, txMeta, commandIndex, commandTerm);
}
@Override
public void remove(UUID txId, long commandIndex, long commandTerm) {
assertThreadAllowsToWrite();
storage.remove(txId, commandIndex, commandTerm);
}
@Override
public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
assertThreadAllowsTo(TX_STATE_STORAGE_ACCESS);
return new ThreadAssertingCursor<>(storage.scan());
}
@Override
public CompletableFuture<Void> flush() {
assertThreadAllowsToWrite();
return storage.flush();
}
@Override
public long lastAppliedIndex() {
return storage.lastAppliedIndex();
}
@Override
public long lastAppliedTerm() {
return storage.lastAppliedTerm();
}
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) {
assertThreadAllowsToWrite();
storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
}
@Override
public void close() {
storage.close();
}
@Override
public void destroy() {
assertThreadAllowsToWrite();
storage.destroy();
}
@Override
public CompletableFuture<Void> startRebalance() {
assertThreadAllowsToWrite();
return storage.startRebalance();
}
@Override
public CompletableFuture<Void> abortRebalance() {
assertThreadAllowsToWrite();
return storage.abortRebalance();
}
@Override
public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
assertThreadAllowsToWrite();
return storage.finishRebalance(lastAppliedIndex, lastAppliedTerm);
}
@Override
public CompletableFuture<Void> clear() {
assertThreadAllowsToWrite();
return storage.clear();
}
}