blob: 65568326f4ca1faf3dc25e3c120255f2eda6e476 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.tx.storage.state.rocksdb;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.util.Collections.reverse;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
/**
* RocksDb implementation of {@link TxStateTableStorage}.
*/
public class TxStateRocksDbTableStorage implements TxStateTableStorage {
/** Prefix length for the payload within a table. Consists of tableId (4 bytes) in Big Endian. */
static final int TABLE_PREFIX_SIZE_BYTES = Integer.BYTES;
/** Partition storages. */
private final AtomicReferenceArray<TxStateRocksDbStorage> storages;
/** Prevents double stopping the storage. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Table ID. */
final int id;
final TxStateRocksDbSharedStorage sharedStorage;
/**
* Constructor.
*
* @param id Table ID.
* @param partitions Count of partitions.
*/
public TxStateRocksDbTableStorage(
int id,
int partitions,
TxStateRocksDbSharedStorage sharedStorage
) {
this.id = id;
this.storages = new AtomicReferenceArray<>(partitions);
this.sharedStorage = sharedStorage;
}
/**
* Checks that a passed partition id is within the proper bounds.
*
* @param partitionId Partition id.
*/
private void checkPartitionId(int partitionId) {
if (partitionId < 0 || partitionId >= storages.length()) {
throw new IllegalArgumentException(S.toString(
"Unable to access partition with id outside of configured range",
"tableId", id, false,
"partitionId", partitionId, false,
"partitions", storages.length(), false
));
}
}
@Override
public TxStateStorage getOrCreateTxStateStorage(int partitionId) {
checkPartitionId(partitionId);
TxStateRocksDbStorage storage = storages.get(partitionId);
if (storage == null) {
storage = new TxStateRocksDbStorage(
partitionId,
this
);
storage.start();
}
storages.set(partitionId, storage);
return storage;
}
@Override
public @Nullable TxStateStorage getTxStateStorage(int partitionId) {
return storages.get(partitionId);
}
@Override
public void destroyTxStateStorage(int partitionId) {
checkPartitionId(partitionId);
TxStateStorage storage = storages.getAndSet(partitionId, null);
if (storage != null) {
storage.destroy();
}
}
@Override
public void start() {
}
@Override
public void stop() {
if (!stopGuard.compareAndSet(false, true)) {
return;
}
busyLock.block();
try {
List<AutoCloseable> resources = new ArrayList<>();
for (int i = 0; i < storages.length(); i++) {
TxStateStorage storage = storages.get(i);
if (storage != null) {
resources.add(storage::close);
}
}
reverse(resources);
closeAll(resources);
} catch (Exception e) {
throw new IgniteInternalException("Failed to stop transaction state storage of the table: " + id, e);
}
}
@Override
public void destroy() {
byte[] start = ByteBuffer.allocate(TABLE_PREFIX_SIZE_BYTES).order(BIG_ENDIAN).putInt(id).array();
byte[] end = ByteBuffer.allocate(TABLE_PREFIX_SIZE_BYTES).order(BIG_ENDIAN).putInt(id + 1).array();
try {
close();
sharedStorage.db().deleteRange(start, end);
} catch (Exception e) {
throw new IgniteInternalException("Failed to destroy the transaction state storage of the table: " + id, e);
}
}
@Override
public void close() {
stop();
}
}