| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.mvcc.txlog; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.pagemem.PageIdUtils; |
| import org.apache.ignite.internal.pagemem.PageMemory; |
| import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; |
| import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener; |
| import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; |
| import org.apache.ignite.internal.util.IgniteTree; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; |
| import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; |
| import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA; |
| import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA; |
| |
| /** |
| * |
| */ |
| public class TxLog implements DbCheckpointListener { |
| /** */ |
| public static final String TX_LOG_CACHE_NAME = "TxLog"; |
| |
| /** */ |
| public static final int TX_LOG_CACHE_ID = CU.cacheId(TX_LOG_CACHE_NAME); |
| |
| /** */ |
| private static final TxKey LOWEST = new TxKey(0, 0); |
| |
| /** */ |
| private final IgniteCacheDatabaseSharedManager mgr; |
| |
| /** */ |
| private ReuseListImpl reuseList; |
| |
| /** */ |
| private TxLogTree tree; |
| |
| /** */ |
| private ConcurrentMap<TxKey, Sync> keyMap = new ConcurrentHashMap<>(); |
| |
| /** |
| * |
| * @param ctx Kernal context. |
| * @param mgr Database shared manager. |
| */ |
| public TxLog(GridKernalContext ctx, IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException { |
| this.mgr = mgr; |
| |
| init(ctx); |
| } |
| |
| /** |
| * |
| * @param ctx Kernal context. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void init(GridKernalContext ctx) throws IgniteCheckedException { |
| if (CU.isPersistenceEnabled(ctx.config())) { |
| mgr.checkpointReadLock(); |
| |
| try { |
| IgniteWriteAheadLogManager wal = ctx.cache().context().wal(); |
| PageMemoryEx pageMemory = (PageMemoryEx)mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory(); |
| |
| long metaId = pageMemory.metaPageId(TX_LOG_CACHE_ID); |
| long metaPage = pageMemory.acquirePage(TX_LOG_CACHE_ID, metaId); |
| |
| long treeRoot, reuseListRoot; |
| |
| boolean isNew = false; |
| |
| try { |
| long pageAddr = pageMemory.writeLock(TX_LOG_CACHE_ID, metaId, metaPage); |
| |
| try { |
| if (PageIO.getType(pageAddr) != PageIO.T_META) { |
| // Initialize new page. |
| PageMetaIO io = PageMetaIO.VERSIONS.latest(); |
| |
| io.initNewPage(pageAddr, metaId, pageMemory.pageSize()); |
| |
| treeRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, INDEX_PARTITION, PageMemory.FLAG_IDX); |
| reuseListRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, INDEX_PARTITION, PageMemory.FLAG_IDX); |
| |
| assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_IDX; |
| assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_IDX; |
| |
| io.setTreeRoot(pageAddr, treeRoot); |
| io.setReuseListRoot(pageAddr, reuseListRoot); |
| |
| if (PageHandler.isWalDeltaRecordNeeded(pageMemory, TX_LOG_CACHE_ID, metaId, metaPage, wal, null)) |
| wal.log(new MetaPageInitRecord( |
| TX_LOG_CACHE_ID, |
| metaId, |
| io.getType(), |
| io.getVersion(), |
| treeRoot, |
| reuseListRoot |
| )); |
| |
| isNew = true; |
| } |
| else { |
| PageMetaIO io = PageIO.getPageIO(pageAddr); |
| |
| treeRoot = io.getTreeRoot(pageAddr); |
| reuseListRoot = io.getReuseListRoot(pageAddr); |
| |
| assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_IDX : |
| U.hexLong(treeRoot) + ", TX_LOG_CACHE_ID=" + TX_LOG_CACHE_ID; |
| assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_IDX : |
| U.hexLong(reuseListRoot) + ", TX_LOG_CACHE_ID=" + TX_LOG_CACHE_ID; |
| } |
| } |
| finally { |
| pageMemory.writeUnlock(TX_LOG_CACHE_ID, metaId, metaPage, null, isNew); |
| } |
| } |
| finally { |
| pageMemory.releasePage(TX_LOG_CACHE_ID, metaId, metaPage); |
| } |
| |
| reuseList = new ReuseListImpl( |
| TX_LOG_CACHE_ID, |
| TX_LOG_CACHE_NAME, |
| pageMemory, |
| wal, |
| reuseListRoot, |
| isNew); |
| |
| tree = new TxLogTree(pageMemory, wal, treeRoot, reuseList, ctx.failure(), isNew); |
| |
| ((GridCacheDatabaseSharedManager)mgr).addCheckpointListener(this); |
| } |
| finally { |
| mgr.checkpointReadUnlock(); |
| } |
| } |
| else { |
| PageMemory pageMemory = mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory(); |
| ReuseList reuseList1 = mgr.reuseList(TX_LOG_CACHE_NAME); |
| |
| long treeRoot; |
| |
| if ((treeRoot = reuseList1.takeRecycledPage()) == 0L) |
| treeRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, INDEX_PARTITION, FLAG_IDX); |
| |
| tree = new TxLogTree(pageMemory, null, treeRoot, reuseList1, ctx.failure(), true); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { |
| Executor executor = ctx.executor(); |
| |
| if (executor == null) |
| reuseList.saveMetadata(); |
| else { |
| executor.execute(() -> { |
| try { |
| reuseList.saveMetadata(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * |
| * @param major Major version. |
| * @param minor Minor version. |
| * @return Transaction state for given version. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public byte get(long major, long minor) throws IgniteCheckedException { |
| return get(new TxKey(major, minor)); |
| } |
| |
| /** |
| * |
| * @param key Transaction key. |
| * @return Transaction state for given version. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public byte get(TxKey key) throws IgniteCheckedException { |
| TxRow row = tree.findOne(key); |
| |
| return row == null ? TxState.NA : row.state(); |
| } |
| |
| /** |
| * |
| * @param key TxKey. |
| * @param state Transaction state for given version. |
| * @param primary Flag if this is a primary node. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void put(TxKey key, byte state, boolean primary) throws IgniteCheckedException { |
| assert mgr.checkpointLockIsHeldByThread(); |
| |
| Sync sync = syncObject(key); |
| |
| try { |
| synchronized (sync) { |
| tree.invoke(key, null, new TxLogUpdateClosure(key.major(), key.minor(), state, primary)); |
| } |
| } finally { |
| evict(key, sync); |
| } |
| } |
| |
| /** |
| * Removes all records less or equals to the given version. |
| * |
| * @param major Major version. |
| * @param minor Minor version. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void removeUntil(long major, long minor) throws IgniteCheckedException { |
| TraversingClosure clo = new TraversingClosure(major, minor); |
| |
| tree.iterate(LOWEST, clo, clo); |
| |
| if (clo.rows != null) { |
| mgr.checkpointReadLock(); |
| |
| try { |
| for (TxKey row : clo.rows) |
| remove(row); |
| } |
| finally { |
| mgr.checkpointReadUnlock(); |
| } |
| } |
| } |
| |
| /** */ |
| private void remove(TxKey key) throws IgniteCheckedException { |
| Sync sync = syncObject(key); |
| |
| try { |
| synchronized (sync) { |
| tree.removex(key); |
| } |
| } |
| finally { |
| evict(key, sync); |
| } |
| } |
| |
| /** */ |
| private Sync syncObject(TxKey key) { |
| Sync sync = keyMap.get(key); |
| |
| while (true) { |
| if (sync == null) { |
| Sync old = keyMap.putIfAbsent(key, sync = new Sync()); |
| |
| if (old == null) |
| return sync; |
| else |
| sync = old; |
| } |
| else { |
| int cntr = sync.counter; |
| |
| while (cntr > 0) { |
| if (sync.casCounter(cntr, cntr + 1)) |
| return sync; |
| |
| cntr = sync.counter; |
| } |
| |
| sync = keyMap.get(key); |
| } |
| } |
| } |
| |
| /** */ |
| private void evict(TxKey key, Sync sync) { |
| assert sync != null; |
| |
| int cntr = sync.counter; |
| |
| while (true) { |
| assert cntr > 0; |
| |
| if (!sync.casCounter(cntr, cntr - 1)) { |
| cntr = sync.counter; |
| |
| continue; |
| } |
| |
| if (cntr == 1) { |
| boolean removed = keyMap.remove(key, sync); |
| |
| assert removed; |
| } |
| |
| break; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class TraversingClosure extends TxKey implements BPlusTree.TreeRowClosure<TxKey, TxRow> { |
| /** */ |
| private List<TxKey> rows; |
| |
| /** |
| * |
| * @param major Major version. |
| * @param minor Minor version. |
| */ |
| TraversingClosure(long major, long minor) { |
| super(major, minor); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean apply(BPlusTree<TxKey, TxRow> tree, BPlusIO<TxKey> io, long pageAddr, |
| int idx) throws IgniteCheckedException { |
| |
| if (rows == null) |
| rows = new ArrayList<>(); |
| |
| TxLogIO logIO = (TxLogIO)io; |
| int offset = io.offset(idx); |
| |
| rows.add(new TxKey(logIO.getMajor(pageAddr, offset), logIO.getMinor(pageAddr, offset))); |
| |
| return true; |
| } |
| } |
| |
| /** */ |
| private static class Sync { |
| /** */ |
| private static final AtomicIntegerFieldUpdater<Sync> UPD = AtomicIntegerFieldUpdater.newUpdater(Sync.class, "counter"); |
| |
| /** */ |
| volatile int counter = 1; |
| |
| /** */ |
| boolean casCounter(int old, int upd) { |
| return UPD.compareAndSet(this, old, upd); |
| } |
| } |
| |
| /** |
| * TxLog update closure. |
| */ |
| private static final class TxLogUpdateClosure implements IgniteTree.InvokeClosure<TxRow> { |
| /** */ |
| private final long major; |
| |
| /** */ |
| private final long minor; |
| |
| /** */ |
| private final byte newState; |
| |
| /** */ |
| private final boolean primary; |
| |
| /** */ |
| private IgniteTree.OperationType treeOp; |
| |
| /** |
| * |
| * @param major Coordinator version. |
| * @param minor Counter. |
| * @param newState New Tx newState. |
| * @param primary Flag if this is primary node. |
| */ |
| TxLogUpdateClosure(long major, long minor, byte newState, boolean primary) { |
| assert major > MVCC_CRD_COUNTER_NA && minor > MVCC_COUNTER_NA && newState != TxState.NA; |
| this.major = major; |
| this.minor = minor; |
| this.newState = newState; |
| this.primary = primary; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void call(@Nullable TxRow row) { |
| if (row == null) { |
| valid(); |
| |
| return; |
| } |
| |
| byte currState = row.state(); |
| |
| switch (currState) { |
| case TxState.NA: |
| checkNa(currState); |
| |
| break; |
| |
| case TxState.PREPARED: |
| checkPrepared(currState); |
| |
| break; |
| |
| case TxState.COMMITTED: |
| checkCommitted(currState); |
| |
| break; |
| |
| case TxState.ABORTED: |
| checkAborted(currState); |
| |
| break; |
| |
| default: |
| throw new IllegalStateException("Unknown tx state: " + currState); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public TxRow newRow() { |
| return treeOp == IgniteTree.OperationType.PUT ? new TxRow(major, minor, newState) : null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteTree.OperationType operationType() { |
| return treeOp; |
| } |
| |
| /** |
| * Checks update possibility for {@code TxState.NA} tx status. |
| * |
| * @param currState Current tx state. |
| */ |
| private void checkNa(byte currState) { |
| switch (newState) { |
| case TxState.ABORTED: |
| case TxState.PREPARED: |
| valid(); |
| |
| break; |
| |
| case TxState.COMMITTED: |
| invalid(currState); // TODO IGNITE-8445 |
| |
| break; |
| |
| default: |
| invalid(currState); |
| } |
| } |
| |
| /** |
| * Checks update possibility for {@code TxState.PREPARED} status. |
| * |
| * @param currState Current tx state. |
| */ |
| private void checkPrepared(byte currState) { |
| switch (newState) { |
| case TxState.ABORTED: |
| case TxState.COMMITTED: |
| valid(); |
| |
| break; |
| |
| case TxState.PREPARED: |
| ignore(); |
| |
| break; |
| |
| default: |
| invalid(currState); |
| } |
| } |
| |
| /** |
| * Checks update possibility for {@code TxState.COMMITTED} status. |
| * |
| * @param currState Current tx state. |
| */ |
| private void checkCommitted(byte currState) { |
| switch (newState) { |
| case TxState.COMMITTED: |
| ignore(); |
| |
| break; |
| |
| case TxState.PREPARED: |
| if (primary) |
| ignore(); // In case when remote tx has updated the current state before. |
| else |
| invalid(currState); |
| |
| break; |
| |
| default: |
| invalid(currState); |
| } |
| } |
| |
| /** |
| * Checks update possibility for {@code TxState.ABORTED} status. |
| * |
| * @param currState Current tx state. |
| */ |
| private void checkAborted(byte currState) { |
| switch (newState) { |
| case TxState.ABORTED: |
| ignore(); |
| |
| break; |
| |
| case TxState.PREPARED: |
| if (primary) |
| ignore(); // In case when remote tx has updated the current state before. |
| else |
| invalid(currState); |
| |
| break; |
| |
| default: |
| invalid(currState); |
| } |
| } |
| |
| /** |
| * Action for valid tx status update. |
| */ |
| private void valid() { |
| assert treeOp == null; |
| |
| treeOp = IgniteTree.OperationType.PUT; |
| } |
| |
| /** |
| * Action for invalid tx status update. |
| */ |
| private void invalid(byte currState) { |
| assert treeOp == null; |
| |
| throw new IllegalStateException("Unexpected new transaction state. [currState=" + |
| currState + ", newState=" + newState + ", cntr=" + minor +']'); |
| } |
| |
| /** |
| * Action for ignoring tx status update. |
| */ |
| private void ignore() { |
| assert treeOp == null; |
| |
| treeOp = IgniteTree.OperationType.NOOP; |
| } |
| } |
| } |