blob: 823ec149e9525ae281eb67bc8dacbc83f7b6f01a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.pagememory.mv;
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.datapage.DataPageReader;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteCursor;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.Nullable;
/**
* Abstract implementation of {@link MvPartitionStorage} using Page Memory.
*
* @see MvPartitionStorage
*/
public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitionStorage {
private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
private static final Predicate<BinaryRow> MATCH_ALL = row -> true;
private static final Predicate<Timestamp> ALWAYS_LOAD_VALUE = timestamp -> true;
private final int partitionId;
private final int groupId;
private final VersionChainTree versionChainTree;
protected final RowVersionFreeList rowVersionFreeList;
private final DataPageReader rowVersionDataPageReader;
/**
* Constructor.
*
* @param partitionId Partition id.
* @param tableView Table configuration.
* @param pageMemory Page memory.
* @param rowVersionFreeList Free list for {@link RowVersion}.
* @param versionChainTree Table tree for {@link VersionChain}.
*/
protected AbstractPageMemoryMvPartitionStorage(
int partitionId,
TableView tableView,
PageMemory pageMemory,
RowVersionFreeList rowVersionFreeList,
VersionChainTree versionChainTree
) {
this.partitionId = partitionId;
this.rowVersionFreeList = rowVersionFreeList;
this.versionChainTree = versionChainTree;
groupId = tableView.tableId();
rowVersionDataPageReader = new DataPageReader(pageMemory, groupId, IoStatisticsHolderNoOp.INSTANCE);
}
/** {@inheritDoc} */
@Override
public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
VersionChain versionChain = findVersionChain(rowId);
if (versionChain == null) {
return null;
}
return findLatestRowVersion(versionChain, txId, MATCH_ALL);
}
/** {@inheritDoc} */
@Override
public @Nullable BinaryRow read(RowId rowId, Timestamp timestamp) throws StorageException {
VersionChain versionChain = findVersionChain(rowId);
if (versionChain == null) {
return null;
}
return findRowVersionByTimestamp(versionChain, timestamp);
}
private @Nullable VersionChain findVersionChain(RowId rowId) {
try {
return versionChainTree.findOne(new VersionChainKey(rowId));
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Version chain lookup failed", e);
}
}
private @Nullable ByteBufferRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
RowVersion rowVersion = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
if (!keyFilter.test(row)) {
return null;
}
throwIfChainBelongsToAnotherTx(versionChain, txId);
return row;
}
private RowVersion readRowVersion(long nextLink, Predicate<Timestamp> loadValue) {
ReadRowVersion read = new ReadRowVersion(partitionId);
try {
rowVersionDataPageReader.traverse(nextLink, read, loadValue);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Row version lookup failed");
}
return read.result();
}
private void throwIfChainBelongsToAnotherTx(VersionChain versionChain, UUID txId) {
if (versionChain.transactionId() != null && !txId.equals(versionChain.transactionId())) {
throw new TxIdMismatchException(txId, versionChain.transactionId());
}
}
private @Nullable ByteBufferRow rowVersionToBinaryRow(RowVersion rowVersion) {
if (rowVersion.isTombstone()) {
return null;
}
return new ByteBufferRow(rowVersion.value());
}
private @Nullable ByteBufferRow findRowVersionInChain(
VersionChain versionChain,
@Nullable UUID transactionId,
@Nullable Timestamp timestamp,
Predicate<BinaryRow> keyFilter
) {
assert transactionId != null ^ timestamp != null;
if (transactionId != null) {
return findLatestRowVersion(versionChain, transactionId, keyFilter);
} else {
ByteBufferRow row = findRowVersionByTimestamp(versionChain, timestamp);
return keyFilter.test(row) ? row : null;
}
}
private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain versionChain, Timestamp timestamp) {
if (!versionChain.hasCommittedVersions()) {
return null;
}
long newestCommittedLink = versionChain.newestCommittedLink();
ScanVersionChainByTimestamp scanByTimestamp = new ScanVersionChainByTimestamp(partitionId);
try {
rowVersionDataPageReader.traverse(newestCommittedLink, scanByTimestamp, timestamp);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Cannot search for a row version", e);
}
return scanByTimestamp.result();
}
/** {@inheritDoc} */
@Override
public RowId insert(BinaryRow row, UUID txId) throws StorageException {
RowId rowId = new RowId(partitionId);
addWrite(rowId, row, txId);
return rowId;
}
private RowVersion insertRowVersion(@Nullable BinaryRow row, long nextPartitionlessLink) {
// TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
byte[] rowBytes = row == null ? TOMBSTONE_PAYLOAD : row.bytes();
RowVersion rowVersion = new RowVersion(partitionId, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
insertRowVersion(rowVersion);
return rowVersion;
}
private void insertRowVersion(RowVersion rowVersion) {
try {
rowVersionFreeList.insertDataRow(rowVersion);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Cannot store a row version", e);
}
}
/** {@inheritDoc} */
@Override
public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId) throws TxIdMismatchException, StorageException {
VersionChain currentChain = findVersionChain(rowId);
if (currentChain == null) {
RowVersion newVersion = insertRowVersion(row, RowVersion.NULL_LINK);
VersionChain versionChain = new VersionChain(rowId, txId, newVersion.link(), RowVersion.NULL_LINK);
updateVersionChain(versionChain);
return null;
}
throwIfChainBelongsToAnotherTx(currentChain, txId);
RowVersion newVersion = insertRowVersion(row, currentChain.newestCommittedLink());
BinaryRow res = null;
if (currentChain.isUncommitted()) {
RowVersion currentVersion = readRowVersion(currentChain.headLink(), ALWAYS_LOAD_VALUE);
res = rowVersionToBinaryRow(currentVersion);
// as we replace an uncommitted version with new one, we need to remove old uncommitted version
removeRowVersion(currentVersion);
}
VersionChain chainReplacement = new VersionChain(rowId, txId, newVersion.link(), newVersion.nextLink());
updateVersionChain(chainReplacement);
return res;
}
/** {@inheritDoc} */
@Override
public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
VersionChain currentVersionChain = findVersionChain(rowId);
if (currentVersionChain == null || currentVersionChain.transactionId() == null) {
// Row doesn't exist or the chain doesn't contain an uncommitted write intent.
return null;
}
RowVersion latestVersion = readRowVersion(currentVersionChain.headLink(), ALWAYS_LOAD_VALUE);
assert latestVersion.isUncommitted();
removeRowVersion(latestVersion);
if (latestVersion.hasNextLink()) {
// Next can be safely replaced with any value (like 0), because this field is only used when there
// is some uncommitted value, but when we add an uncommitted value, we 'fix' such placeholder value
// (like 0) by replacing it with a valid value.
VersionChain versionChainReplacement = new VersionChain(rowId, null, latestVersion.nextLink(), RowVersion.NULL_LINK);
updateVersionChain(versionChainReplacement);
} else {
// it was the only version, let's remove the chain as well
removeVersionChain(currentVersionChain);
}
return rowVersionToBinaryRow(latestVersion);
}
private void removeVersionChain(VersionChain currentVersionChain) {
try {
versionChainTree.remove(currentVersionChain);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Cannot remove chain version", e);
}
}
/** {@inheritDoc} */
@Override
public void commitWrite(RowId rowId, Timestamp timestamp) throws StorageException {
VersionChain currentVersionChain = findVersionChain(rowId);
if (currentVersionChain == null || currentVersionChain.transactionId() == null) {
// Row doesn't exist or the chain doesn't contain an uncommitted write intent.
return;
}
long chainLink = currentVersionChain.headLink();
try {
rowVersionFreeList.updateTimestamp(chainLink, timestamp);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Cannot update timestamp", e);
}
try {
VersionChain updatedVersionChain = new VersionChain(
currentVersionChain.rowId(),
null,
currentVersionChain.headLink(),
currentVersionChain.nextLink()
);
versionChainTree.putx(updatedVersionChain);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Cannot update transaction ID", e);
}
}
private void removeRowVersion(RowVersion currentVersion) {
try {
rowVersionFreeList.removeDataRowByLink(currentVersion.link());
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Cannot update row version", e);
}
}
private void updateVersionChain(VersionChain newVersionChain) {
try {
versionChainTree.putx(newVersionChain);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Cannot update version chain");
}
}
/** {@inheritDoc} */
@Override
public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
return internalScan(keyFilter, txId, null);
}
/** {@inheritDoc} */
@Override
public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException {
return internalScan(keyFilter, null, timestamp);
}
private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, @Nullable UUID transactionId, @Nullable Timestamp timestamp) {
assert transactionId != null ^ timestamp != null;
IgniteCursor<VersionChain> treeCursor;
try {
treeCursor = versionChainTree.find(null, null);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Find failed", e);
}
return new ScanCursor(treeCursor, keyFilter, transactionId, timestamp);
}
/** {@inheritDoc} */
@Override
public long rowsCount() {
try {
return versionChainTree.size();
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error occurred while fetching the size.", e);
}
}
/** {@inheritDoc} */
@Override
public void forEach(BiConsumer<RowId, BinaryRow> consumer) {
// No-op. Nothing to recover for a volatile storage. See usages and a comment about PK index rebuild.
}
/** {@inheritDoc} */
@Override
public void close() {
versionChainTree.close();
}
/**
* Removes all data from this storage and frees all associated resources.
*
* @throws StorageException If failed to destroy the data or storage is already stopped.
*/
public void destroy() {
// TODO: IGNITE-17132 Implement it
}
private class ScanCursor implements Cursor<BinaryRow> {
private final IgniteCursor<VersionChain> treeCursor;
private final Predicate<BinaryRow> keyFilter;
private final @Nullable UUID transactionId;
private final @Nullable Timestamp timestamp;
private BinaryRow nextRow = null;
private boolean iterationExhausted = false;
public ScanCursor(
IgniteCursor<VersionChain> treeCursor,
Predicate<BinaryRow> keyFilter,
@Nullable UUID transactionId,
@Nullable Timestamp timestamp
) {
this.treeCursor = treeCursor;
this.keyFilter = keyFilter;
this.transactionId = transactionId;
this.timestamp = timestamp;
}
/** {@inheritDoc} */
@Override
public boolean hasNext() {
if (nextRow != null) {
return true;
}
if (iterationExhausted) {
return false;
}
while (true) {
boolean positionedToNext = tryAdvanceTreeCursor();
if (!positionedToNext) {
iterationExhausted = true;
return false;
}
VersionChain chain = getCurrentChainFromTreeCursor();
ByteBufferRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter);
if (row != null) {
nextRow = row;
return true;
}
}
}
private boolean tryAdvanceTreeCursor() {
try {
return treeCursor.next();
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error when trying to advance tree cursor", e);
}
}
private VersionChain getCurrentChainFromTreeCursor() {
try {
return treeCursor.get();
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Failed to get element from tree cursor", e);
}
}
/** {@inheritDoc} */
@Override
public BinaryRow next() {
if (!hasNext()) {
throw new NoSuchElementException("The cursor is exhausted");
}
assert nextRow != null;
BinaryRow row = nextRow;
nextRow = null;
return row;
}
/** {@inheritDoc} */
@Override
public void close() {
// no-op
}
}
}