IGNITE-5322 - WAL iterator improvements
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java
deleted file mode 100644
index a82f604..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagemem.wal.record;
-
-/**
- *
- */
-public class StoreOperationRecord extends WALRecord {
- /**
- * Store operation type.
- */
- public enum StoreOperationType {
- /** */
- ENTRY_CREATE,
-
- /** */
- INDEX_PUT,
-
- /** */
- INDEX_REMOVE;
-
- /** */
- private static final StoreOperationType[] VALS = StoreOperationType.values();
-
- /** */
- public static StoreOperationType fromOrdinal(int ord) {
- return ord < 0 || ord >= VALS.length ? null : VALS[ord];
- }
- }
-
- /** */
- private StoreOperationType opType;
-
- /** */
- private int cacheId;
-
- /** */
- private long link;
-
- /** */
- private int idxId;
-
- /** {@inheritDoc} */
- @Override public RecordType type() {
- return RecordType.STORE_OPERATION_RECORD;
- }
-
- /**
- * @return Cache ID.
- */
- public int cacheId() {
- return cacheId;
- }
-
- /**
- * @return Link to data.
- */
- public long link() {
- return link;
- }
-
- /**
- * @return Index ID.
- */
- public int indexId() {
- return idxId;
- }
-
- /**
- * @return Operation type.
- */
- public StoreOperationType operationType() {
- return opType;
- }
-
- /**
- * @param opType Operation type.
- */
- public void operationType(StoreOperationType opType) {
- this.opType = opType;
- }
-
- /**
- * @param cacheId Cache ID.
- */
- public void cacheId(int cacheId) {
- this.cacheId = cacheId;
- }
-
- /**
- * @param link Link.
- */
- public void link(long link) {
- this.link = link;
- }
-
- /**
- * @param idxId Index ID.
- */
- public void indexId(int idxId) {
- this.idxId = idxId;
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 142f0ee..b76bcc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.pagemem.wal.record;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -38,9 +39,6 @@
DATA_RECORD,
/** */
- STORE_OPERATION_RECORD,
-
- /** */
CHECKPOINT_RECORD,
/** */
@@ -186,7 +184,7 @@
private WALRecord prev;
/** */
- private long pos;
+ private WALPointer pos;
/**
* @param chainSize Chain size in bytes.
@@ -219,15 +217,15 @@
/**
* @return Position in file.
*/
- public long position() {
+ public WALPointer position() {
return pos;
}
/**
* @param pos Position in file.
*/
- public void position(long pos) {
- assert pos >= 0: pos;
+ public void position(WALPointer pos) {
+ assert pos != null;
this.pos = pos;
}
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
index a78ba27..c57f9cb 100755
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
@@ -1232,7 +1232,7 @@
}
}
- ByteBuffer buf = ByteBuffer.allocate(16);
+ ByteBuffer buf = ByteBuffer.allocate(20);
buf.order(ByteOrder.nativeOrder());
if (startFile != null)
@@ -1260,7 +1260,7 @@
buf.flip();
- return new FileWALPointer(buf.getInt(), buf.getInt(), buf.getInt());
+ return new FileWALPointer(buf.getLong(), buf.getInt(), buf.getInt());
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to read checkpoint pointer from marker file: " +
@@ -1728,7 +1728,7 @@
tmpWriteBuf.rewind();
- tmpWriteBuf.putInt(filePtr.index());
+ tmpWriteBuf.putLong(filePtr.index());
tmpWriteBuf.putInt(filePtr.fileOffset());
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java
index 1102054..36df2e7 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java
@@ -25,7 +25,7 @@
*/
public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
/** */
- private final int idx;
+ private final long idx;
/** */
private final int fileOffset;
@@ -37,7 +37,7 @@
* @param idx File timestamp index.
* @param fileOffset Offset in file, from the beginning.
*/
- public FileWALPointer(int idx, int fileOffset, int len) {
+ public FileWALPointer(long idx, int fileOffset, int len) {
this.idx = idx;
this.fileOffset = fileOffset;
this.len = len;
@@ -46,7 +46,7 @@
/**
* @return Timestamp index.
*/
- public int index() {
+ public long index() {
return idx;
}
@@ -64,6 +64,13 @@
return len;
}
+ /**
+ * @param len Record length.
+ */
+ public void length(int len) {
+ this.len = len;
+ }
+
/** {@inheritDoc} */
@Override public WALPointer next() {
if (len == 0)
@@ -89,7 +96,7 @@
/** {@inheritDoc} */
@Override public int hashCode() {
- int result = idx;
+ int result = (int)(idx ^ (idx >>> 32));
result = 31 * result + fileOffset;
@@ -98,7 +105,7 @@
/** {@inheritDoc} */
@Override public int compareTo(FileWALPointer o) {
- int res = Integer.compare(idx, o.idx);
+ int res = Long.compare(idx, o.idx);
return res == 0 ? Integer.compare(fileOffset, o.fileOffset) : res;
}
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
index f8b18ef..4b79308 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
@@ -192,7 +192,6 @@
assert dbCfg != null : "WAL should not be created if persistence is disabled.";
this.dbCfg = dbCfg;
- this.igCfg = igCfg;
maxWalSegmentSize = dbCfg.getWalSegmentSize();
@@ -443,7 +442,7 @@
archiver0.release(((FileWALPointer)start).index());
}
- private boolean hasIndex(int absIdx) {
+ private boolean hasIndex(long absIdx) {
String name = FileDescriptor.fileName(absIdx, serializer.version());
boolean inArchive = new File(walArchiveDir, name).exists();
@@ -509,7 +508,7 @@
* @param consId Local node consistent ID.
* @param msg File description to print out on successful initialization.
* @return Initialized directory.
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException If failed to initialize directory.
*/
private File initDirectory(String cfg, String defDir, String consId, String msg) throws IgniteCheckedException {
File dir;
@@ -568,11 +567,11 @@
* @throws IgniteCheckedException If failed to initialize WAL write handle.
*/
private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException {
- int absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
+ long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
archiver.currentWalIndex(absIdx);
- int segNo = absIdx % dbCfg.getWalSegments();
+ long segNo = absIdx % dbCfg.getWalSegments();
File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo, serializer.version()));
@@ -625,7 +624,7 @@
* @throws StorageException If IO exception occurred.
* @throws IgniteCheckedException If failed.
*/
- private FileWriteHandle initNextWriteHandle(int curIdx) throws StorageException, IgniteCheckedException {
+ private FileWriteHandle initNextWriteHandle(long curIdx) throws StorageException, IgniteCheckedException {
try {
File nextFile = pollNextFile(curIdx);
@@ -756,11 +755,11 @@
* @return File ready for use as new WAL segment.
* @throws IgniteCheckedException If failed.
*/
- private File pollNextFile(int curIdx) throws IgniteCheckedException {
+ private File pollNextFile(long curIdx) throws IgniteCheckedException {
// Signal to archiver that we are done with the segment and it can be archived.
- int absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx);
+ long absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx);
- int segmentIdx = absNextIdx % dbCfg.getWalSegments();
+ long segmentIdx = absNextIdx % dbCfg.getWalSegments();
return new File(walWorkDir, FileDescriptor.fileName(segmentIdx, serializer.version()));
}
@@ -827,22 +826,22 @@
* Absolute current segment index WAL Manger writes to. Guarded by <code>this</code>.
* Incremented during rollover. Also may be directly set if WAL is resuming logging after start.
*/
- private int curAbsWalIdx = -1;
+ private long curAbsWalIdx = -1;
/** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */
- private int lastAbsArchivedIdx = -1;
+ private long lastAbsArchivedIdx = -1;
/** current thread stopping advice */
private volatile boolean stopped;
/** */
- private NavigableMap<Integer, Integer> reserved = new TreeMap<>();
+ private NavigableMap<Long, Integer> reserved = new TreeMap<>();
/**
* Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may
* come from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
*/
- private Map<Integer, Integer> locked = new HashMap<>();
+ private Map<Long, Integer> locked = new HashMap<>();
/**
*
@@ -869,7 +868,7 @@
/**
* @param curAbsWalIdx Current absolute WAL segment index.
*/
- private void currentWalIndex(int curAbsWalIdx) {
+ private void currentWalIndex(long curAbsWalIdx) {
synchronized (this) {
this.curAbsWalIdx = curAbsWalIdx;
@@ -880,7 +879,7 @@
/**
* @param absIdx Index for reservation.
*/
- private synchronized void reserve(int absIdx) {
+ private synchronized void reserve(long absIdx) {
Integer cur = reserved.get(absIdx);
if (cur == null)
@@ -893,14 +892,14 @@
* @param absIdx Index for reservation.
* @return {@code True} if index is reserved.
*/
- private synchronized boolean reserved(int absIdx) {
+ private synchronized boolean reserved(long absIdx) {
return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != null;
}
/**
* @param absIdx Reserved index.
*/
- private synchronized void release(int absIdx) {
+ private synchronized void release(long absIdx) {
Integer cur = reserved.get(absIdx);
assert cur != null && cur >= 1 : cur;
@@ -937,7 +936,7 @@
}
while (!Thread.currentThread().isInterrupted() && !stopped) {
- int toArchive;
+ long toArchive;
synchronized (this) {
assert lastAbsArchivedIdx <= curAbsWalIdx : "lastArchived=" + lastAbsArchivedIdx +
@@ -991,7 +990,7 @@
* @return Next index (curIdx+1) when it is ready to be written.
* @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread).
*/
- private int nextAbsoluteSegmentIndex(int curIdx) throws IgniteCheckedException {
+ private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException {
try {
synchronized (this) {
if (cleanException != null)
@@ -1022,7 +1021,7 @@
* @return {@code True} if can read, {@code false} if work segment
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
- private boolean checkCanReadArchiveOrReserveWorkSegment(int absIdx) {
+ private boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
synchronized (this) {
if (lastAbsArchivedIdx >= absIdx)
return true;
@@ -1044,7 +1043,7 @@
* @param absIdx Segment absolute index.
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
- private void releaseWorkSegment(int absIdx) {
+ private void releaseWorkSegment(long absIdx) {
synchronized (this) {
Integer cur = locked.get(absIdx);
@@ -1070,8 +1069,8 @@
/**
* @param absIdx Absolute index to archive.
*/
- private File archiveSegment(int absIdx) throws IgniteCheckedException {
- int segIdx = absIdx % dbCfg.getWalSegments();
+ private File archiveSegment(long absIdx) throws IgniteCheckedException {
+ long segIdx = absIdx % dbCfg.getWalSegments();
File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx, serializer.version()));
@@ -1187,7 +1186,7 @@
protected final File file;
/** Absolute WAL segment file index */
- protected final int idx;
+ protected final long idx;
/** */
protected final int ver;
@@ -1203,7 +1202,7 @@
* @param file File.
* @param idx Absolute WAL segment file index.
*/
- private FileDescriptor(File file, Integer idx) {
+ private FileDescriptor(File file, Long idx) {
this.file = file;
String fileName = file.getName();
@@ -1218,7 +1217,7 @@
int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length();
if (idx == null)
- this.idx = Integer.parseInt(fileName.substring(0, v));
+ this.idx = Long.parseLong(fileName.substring(0, v));
else
this.idx = idx;
@@ -1280,7 +1279,7 @@
/** {@inheritDoc} */
@Override public int hashCode() {
- return idx;
+ return (int)(idx ^ (idx >>> 32));
}
}
@@ -1295,7 +1294,7 @@
protected FileChannel ch;
/** */
- protected final int idx;
+ protected final long idx;
/** */
protected String gridName;
@@ -1304,7 +1303,7 @@
* @param file File.
* @param idx Index.
*/
- private FileHandle(RandomAccessFile file, int idx, String gridName) {
+ private FileHandle(RandomAccessFile file, long idx, String gridName) {
this.file = file;
this.idx = idx;
this.gridName = gridName;
@@ -1333,7 +1332,7 @@
*/
private ReadFileHandle(
RandomAccessFile file,
- int idx,
+ long idx,
String gridName,
RecordSerializer ser,
FileInput in
@@ -1413,7 +1412,7 @@
*/
private FileWriteHandle(
RandomAccessFile file,
- int idx,
+ long idx,
String gridName,
long pos,
long maxSegmentSize,
@@ -1428,7 +1427,7 @@
this.maxSegmentSize = maxSegmentSize;
this.serializer = serializer;
- head.set(new FakeRecord(pos));
+ head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0)));
written = pos;
lastFsyncPos = pos;
}
@@ -1469,10 +1468,13 @@
rec.chainSize(newChainSize);
rec.previous(h);
- rec.position(nextPos);
+
+ FileWALPointer ptr = new FileWALPointer(idx, (int)nextPos, rec.size());
+
+ rec.position(ptr);
if (head.compareAndSet(h, rec))
- return new FileWALPointer(idx, (int)rec.position(), rec.size());
+ return ptr;
}
}
@@ -1481,7 +1483,7 @@
* @return Position for the next record.
*/
private long nextPosition(WALRecord rec) {
- return rec.position() + rec.size();
+ return recordOffset(rec) + rec.size();
}
/**
@@ -1501,7 +1503,7 @@
expWritten = ptr.fileOffset();
}
else // We read head position before the flush because otherwise we can get wrong position.
- expWritten = head.get().position();
+ expWritten = recordOffset(head.get());
if (flush(ptr))
return;
@@ -1565,7 +1567,7 @@
* @return Chain begin position.
*/
private long chainBeginPosition(WALRecord h) {
- return h.position() + h.size() - h.chainSize();
+ return recordOffset(h) + h.size() - h.chainSize();
}
/**
@@ -1583,7 +1585,7 @@
// Fail-fast before CAS.
checkEnvironment();
- if (!head.compareAndSet(expHead, new FakeRecord(nextPosition(expHead))))
+ if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0))))
return false;
// At this point we grabbed the piece of WAL chain.
@@ -1658,7 +1660,7 @@
buf.rewind();
buf.limit(limit);
- return head.position();
+ return recordOffset(head);
}
/**
@@ -1951,6 +1953,20 @@
}
/**
+ * Gets WAL record offset relative to the WAL segment file beginning.
+ *
+ * @param rec WAL record.
+ * @return File offset.
+ */
+ private static int recordOffset(WALRecord rec) {
+ FileWALPointer ptr = (FileWALPointer)rec.position();
+
+ assert ptr != null;
+
+ return ptr.fileOffset();
+ }
+
+ /**
* Fake record is zero-sized record, which is not stored into file.
* Fake record is used for storing position in file {@link WALRecord#position()}.
* Fake record is allowed to have no previous record.
@@ -1959,7 +1975,7 @@
/**
* @param pos Position.
*/
- FakeRecord(long pos) {
+ FakeRecord(FileWALPointer pos) {
position(pos);
}
@@ -2009,7 +2025,7 @@
private IgniteBiTuple<WALPointer, WALRecord> curRec;
/** */
- private int curIdx = -1;
+ private long curIdx = -1;
/** */
private ReadFileHandle curHandle;
@@ -2115,7 +2131,7 @@
}
if (curIdx == -1) {
- int lastArchived = descs[descs.length - 1].idx;
+ long lastArchived = descs[descs.length - 1].idx;
if (lastArchived > start.index())
throw new IgniteCheckedException("WAL history is corrupted (segment is missing): " + start);
@@ -2159,9 +2175,9 @@
}
/**
- * @throws IgniteCheckedException If failed.
+ *
*/
- private void advanceRecord() throws IgniteCheckedException {
+ private void advanceRecord() {
try {
ReadFileHandle hnd = curHandle;
@@ -2170,15 +2186,21 @@
int pos = (int)hnd.in.position();
- WALRecord rec = ser.readRecord(hnd.in);
+ FileWALPointer ptr = new FileWALPointer(hnd.idx, pos, 0);
- WALPointer ptr = new FileWALPointer(hnd.idx, pos, rec.size());
+ WALRecord rec = ser.readRecord(hnd.in, ptr);
- curRec = new IgniteBiTuple<>(ptr, rec);
+ ptr.length(rec.size());
+
+ curRec = new IgniteBiTuple<WALPointer, WALRecord>(ptr, rec);
}
}
catch (IOException | IgniteCheckedException e) {
- // TODO: verify that wrapped IntegrityException is acceptable in this case.
+ if (!(e instanceof SegmentEofException)) {
+ if (log.isInfoEnabled())
+ log.info("Stopping WAL iteration due to an exception: " + e.getMessage());
+ }
+
curRec = null;
}
}
@@ -2213,7 +2235,7 @@
FileDescriptor.fileName(curIdx, serializer.version())));
}
else {
- int workIdx = curIdx % dbCfg.getWalSegments();
+ long workIdx = curIdx % dbCfg.getWalSegments();
fd = new FileDescriptor(
new File(walWorkDir, FileDescriptor.fileName(workIdx, serializer.version())),
@@ -2257,9 +2279,11 @@
try {
RecordSerializer ser = forVersion(cctx, desc.ver);
- FileInput in = new FileInput(rf.getChannel(), buf);
+ FileChannel channel = rf.getChannel();
+ FileInput in = new FileInput(channel, buf);
- WALRecord rec = ser.readRecord(in);
+ WALRecord rec = ser.readRecord(in,
+ new FileWALPointer(desc.idx, (int)channel.position(), 0));
if (rec == null)
return null;
@@ -2314,14 +2338,14 @@
* archived yet. In this case the corresponding work segment is reserved (will not be deleted until
* release).
*/
- private boolean canReadArchiveOrReserveWork(int absIdx) {
+ private boolean canReadArchiveOrReserveWork(long absIdx) {
return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx);
}
/**
* @param absIdx Absolute index to release.
*/
- private void releaseWorkSegment(int absIdx) {
+ private void releaseWorkSegment(long absIdx) {
if (archiver != null)
archiver.releaseWorkSegment(absIdx);
}
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
index e3a972a..c929789 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
/**
@@ -47,5 +48,5 @@
* @param in Data input to read data from.
* @return Read entry.
*/
- public WALRecord readRecord(FileInput in) throws IOException, IgniteCheckedException;
+ public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;
}
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
index f67f617..442c08d 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
@@ -30,6 +30,7 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -37,8 +38,6 @@
import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
-import org.apache.ignite.internal.pagemem.wal.record.StoreOperationRecord;
-import org.apache.ignite.internal.pagemem.wal.record.StoreOperationRecord.StoreOperationType;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
@@ -97,6 +96,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.typedef.F;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
@@ -140,6 +140,8 @@
buf.put((byte)(record.type().ordinal() + 1));
+ putPosition(buf, (FileWALPointer)record.position());
+
switch (record.type()) {
case PAGE_RECORD:
PageSnapshot snap = (PageSnapshot)record;
@@ -150,16 +152,6 @@
break;
- case STORE_OPERATION_RECORD:
- StoreOperationRecord storeRec = (StoreOperationRecord)record;
-
- buf.put((byte)storeRec.operationType().ordinal());
- buf.putInt(storeRec.cacheId());
- buf.putLong(storeRec.link());
- buf.putInt(storeRec.indexId());
-
- break;
-
case MEMORY_RECOVERY:
MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)record;
@@ -217,7 +209,7 @@
buf.put(walPtr == null ? (byte)0 : 1);
if (walPtr != null) {
- buf.putInt(walPtr.index());
+ buf.putLong(walPtr.index());
buf.putInt(walPtr.fileOffset());
buf.putInt(walPtr.length());
}
@@ -651,13 +643,13 @@
}
/** {@inheritDoc} */
- @Override public WALRecord readRecord(FileInput in0) throws IOException, IgniteCheckedException {
+ @Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws IOException, IgniteCheckedException {
long startPos = -1;
try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) {
startPos = in0.position();
- WALRecord res = readRecord(in);
+ WALRecord res = readRecord(in, expPtr);
assert res != null;
@@ -676,12 +668,18 @@
/**
* @param in In.
*/
- private WALRecord readRecord(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
+ private WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
int type = in.readUnsignedByte();
if (type == 0)
throw new SegmentEofException("Reached logical end of the segment", null);
+ FileWALPointer ptr = readPosition(in);
+
+ if (!F.eq(ptr, expPtr))
+ throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr +
+ ", readPtr=" + ptr + ']', null);
+
RecordType recType = RecordType.fromOrdinal(type - 1);
if (recType == null)
@@ -702,18 +700,6 @@
break;
- case STORE_OPERATION_RECORD:
- StoreOperationRecord storeRec = new StoreOperationRecord();
-
- storeRec.operationType(StoreOperationType.fromOrdinal(in.readByte() & 0xFF));
- storeRec.cacheId(in.readInt());
- storeRec.link(in.readLong());
- storeRec.indexId(in.readInt());
-
- res = storeRec;
-
- break;
-
case CHECKPOINT_RECORD:
long msb = in.readLong();
long lsb = in.readLong();
@@ -1217,16 +1203,15 @@
/** {@inheritDoc} */
@SuppressWarnings("CastConflictsWithInstanceof")
@Override public int size(WALRecord record) throws IgniteCheckedException {
+ int commonFields = /* Type */1 + /* Pointer */12 + /*CRC*/4;
+
switch (record.type()) {
case PAGE_RECORD:
assert record instanceof PageSnapshot;
PageSnapshot pageRec = (PageSnapshot)record;
- return pageRec.pageData().length + 12 + 1 + 4;
-
- case STORE_OPERATION_RECORD:
- return 18 + 4;
+ return commonFields + pageRec.pageData().length + 12;
case CHECKPOINT_RECORD:
CheckpointRecord cpRec = (CheckpointRecord)record;
@@ -1238,147 +1223,146 @@
FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark();
- return 19 + cacheStatesSize + (walPtr == null ? 0 : 12) + 4;
+ return commonFields + 18 + cacheStatesSize + (walPtr == null ? 0 : 16);
case META_PAGE_INIT:
- return 1 + /*cache ID*/4 + /*page ID*/8 + /*ioType*/2 + /*ioVer*/2 + /*tree root*/8 + /*reuse root*/8 + /*CRC*/4;
+ return commonFields + /*cache ID*/4 + /*page ID*/8 + /*ioType*/2 + /*ioVer*/2 + /*tree root*/8 + /*reuse root*/8;
case PARTITION_META_PAGE_UPDATE_COUNTERS:
- return 1 + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*state*/ 1
- + /*allocatedIdxCandidate*/ 4 + /*CRC*/4;
+ return commonFields + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*state*/ 1
+ + /*allocatedIdxCandidate*/ 4;
case MEMORY_RECOVERY:
- return 1 + 8 + 4;
+ return commonFields + 8;
case PARTITION_DESTROY:
- return 1 + /*cacheId*/4 + /*partId*/4 + /*CRC*/4;
+ return commonFields + /*cacheId*/4 + /*partId*/4;
case DATA_RECORD:
DataRecord dataRec = (DataRecord)record;
- return 5 + dataSize(dataRec) + 4;
+ return commonFields + 4 + dataSize(dataRec);
case HEADER_RECORD:
- return 13 + 4;
+ return commonFields + 12;
case DATA_PAGE_INSERT_RECORD:
DataPageInsertRecord diRec = (DataPageInsertRecord)record;
- return 1 + 4 + 8 + 2 +
- diRec.payload().length + 4;
+ return commonFields + 4 + 8 + 2 + diRec.payload().length;
case DATA_PAGE_UPDATE_RECORD:
DataPageUpdateRecord uRec = (DataPageUpdateRecord)record;
- return 1 + 4 + 8 + 2 + 4 +
- uRec.payload().length + 4;
+ return commonFields + 4 + 8 + 2 + 4 +
+ uRec.payload().length;
case DATA_PAGE_INSERT_FRAGMENT_RECORD:
final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record;
- return 1 + 4 + 8 + 8 + 4 + difRec.payloadSize() + 4;
+ return commonFields + 4 + 8 + 8 + 4 + difRec.payloadSize();
case DATA_PAGE_REMOVE_RECORD:
- return 1 + 4 + 8 + 1 + 4;
+ return commonFields + 4 + 8 + 1;
case DATA_PAGE_SET_FREE_LIST_PAGE:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case INIT_NEW_PAGE_RECORD:
- return 1 + 4 + 8 + 2 + 2 + 8 + 4;
+ return commonFields + 4 + 8 + 2 + 2 + 8;
case BTREE_META_PAGE_INIT_ROOT:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case BTREE_META_PAGE_INIT_ROOT2:
- return 1 + 4 + 8 + 8 + 4 + 2;
+ return commonFields + 4 + 8 + 8 + 2;
case BTREE_META_PAGE_ADD_ROOT:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case BTREE_META_PAGE_CUT_ROOT:
- return 1 + 4 + 8 + 4;
+ return commonFields + 4 + 8;
case BTREE_INIT_NEW_ROOT:
NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record;
- return 1 + 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize() + 4;
+ return commonFields + 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize();
case BTREE_PAGE_RECYCLE:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case BTREE_PAGE_INSERT:
InsertRecord<?> inRec = (InsertRecord<?>)record;
- return 1 + 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize() + 4;
+ return commonFields + 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize();
case BTREE_FIX_LEFTMOST_CHILD:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case BTREE_FIX_COUNT:
- return 1 + 4 + 8 + 2 + 4;
+ return commonFields + 4 + 8 + 2;
case BTREE_PAGE_REPLACE:
ReplaceRecord<?> rRec = (ReplaceRecord<?>)record;
- return 1 + 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize() + 4;
+ return commonFields + 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize();
case BTREE_PAGE_REMOVE:
- return 1 + 4 + 8 + 2 + 2 + 4;
+ return commonFields + 4 + 8 + 2 + 2;
case BTREE_PAGE_INNER_REPLACE:
- return 1 + 4 + 8 + 2 + 8 + 2 + 8 + 4;
+ return commonFields + 4 + 8 + 2 + 8 + 2 + 8;
case BTREE_FORWARD_PAGE_SPLIT:
- return 1 + 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2 + 4;
+ return commonFields + 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2;
case BTREE_EXISTING_PAGE_SPLIT:
- return 1 + 4 + 8 + 2 + 8 + 4;
+ return commonFields + 4 + 8 + 2 + 8;
case BTREE_PAGE_MERGE:
- return 1 + 4 + 8 + 8 + 2 + 8 + 1 + 4;
+ return commonFields + 4 + 8 + 8 + 2 + 8 + 1;
case BTREE_FIX_REMOVE_ID:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case PAGES_LIST_SET_NEXT:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case PAGES_LIST_SET_PREVIOUS:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case PAGES_LIST_INIT_NEW_PAGE:
- return 1 + 4 + 8 + 4 + 4 + 8 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 4 + 4 + 8 + 8 + 8;
case PAGES_LIST_ADD_PAGE:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case PAGES_LIST_REMOVE_PAGE:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case TRACKING_PAGE_DELTA:
- return 1 + 4 + 8 + 8 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8 + 8 + 8;
case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
- return 1 + 4 + 8 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8 + 8;
case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
- return 1 + 4 + 8 + 8 + 4;
+ return commonFields + 4 + 8 + 8;
case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
- return 1 + 4 + 8 + 4 + 4;
+ return commonFields + 4 + 8 + 4;
case PART_META_UPDATE_STATE:
- return /*Type*/ 1 + /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8 + /*CRC*/4;
+ return commonFields + /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8;
case PAGE_LIST_META_RESET_COUNT_RECORD:
- return /*Type*/ 1 + /*cacheId*/ 4 + /*pageId*/ 8 + /*CRC*/4;
+ return commonFields + /*cacheId*/ 4 + /*pageId*/ 8;
case SWITCH_SEGMENT_RECORD:
- return /*Type*/ 1 + /*CRC*/4;
+ return commonFields;
default:
throw new UnsupportedOperationException("Type: " + record.type());
@@ -1386,6 +1370,27 @@
}
/**
+ * @param buf Byte buffer to serialize version to.
+ * @param ptr File WAL pointer to write.
+ */
+ private void putPosition(ByteBuffer buf, FileWALPointer ptr) {
+ buf.putLong(ptr.index());
+ buf.putInt(ptr.fileOffset());
+ }
+
+ /**
+ * @param in Data input to read pointer from.
+ * @return Read file WAL pointer.
+ * @throws IOException If failed to write.
+ */
+ private FileWALPointer readPosition(DataInput in) throws IOException {
+ long idx = in.readLong();
+ int fileOffset = in.readInt();
+
+ return new FileWALPointer(idx, fileOffset, 0);
+ }
+
+ /**
* @param dataRec Data record to serialize.
* @return Full data record size.
* @throws IgniteCheckedException If failed to obtain the length of one of the entries.
diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
index bdf333c..225a9d2 100644
--- a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
+++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
@@ -464,7 +464,7 @@
walSegmentSize = 2 * 1024 * 1024;
- final long endTime = System.currentTimeMillis() + 3 * 60 * 1000;
+ final long endTime = System.currentTimeMillis() + 2 * 60 * 1000;
try {
IgniteEx ignite = startGrid(1);