IGNITE-19675 [IEP-104] Implement WALIterator over ByteBuffer (#10798)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
index e077e5a..915353e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
@@ -16,12 +16,9 @@
 */
 package org.apache.ignite.internal.pagemem.wal.record;
 
-import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
-
 /**
  * Special type of WAL record. Shouldn't be stored in file.
- * Returned by deserializer if next record is not matched by filter. Automatically handled by
- * {@link AbstractWalRecordsIterator}.
+ * Returned by deserializer if next record is not matched by filter.
  */
 public class FilteredRecord extends WALRecord {
     /** Instance. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractFileWalRecordsIterator.java
similarity index 91%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractFileWalRecordsIterator.java
index 926bce8..4dcb55e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractFileWalRecordsIterator.java
@@ -25,7 +25,6 @@
 import java.util.Optional;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
@@ -36,7 +35,6 @@
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader;
-import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -49,23 +47,11 @@
  * Iterator over WAL segments. This abstract class provides most functionality for reading records in log. Subclasses
  * are to override segment switching functionality
  */
-public abstract class AbstractWalRecordsIterator
-    extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> implements WALIterator {
+public abstract class AbstractFileWalRecordsIterator extends AbstractWalRecordsIteratorAdapter {
     /** */
     private static final long serialVersionUID = 0L;
 
     /**
-     * Current record preloaded, to be returned on next()<br> Normally this should be not null because advance() method
-     * should already prepare some value<br>
-     */
-    protected IgniteBiTuple<WALPointer, WALRecord> curRec;
-
-    /**
-     * The exception which can be thrown during reading next record. It holds until the next calling of next record.
-     */
-    private IgniteCheckedException curException;
-
-    /**
      * Current WAL segment absolute index. <br> Determined as lowest number of file at start, is changed during advance
      * segment
      */
@@ -108,7 +94,7 @@
      * @param initialReadBufferSize buffer for reading records size.
      * @param segmentFileInputFactory Factory to provide I/O interfaces for read primitives with files.
      */
-    protected AbstractWalRecordsIterator(
+    protected AbstractFileWalRecordsIterator(
         @NotNull final IgniteLogger log,
         @NotNull final GridCacheSharedContext sharedCtx,
         @NotNull final RecordSerializerFactory serializerFactory,
@@ -125,31 +111,6 @@
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException {
-        if (curException != null)
-            throw curException;
-
-        IgniteBiTuple<WALPointer, WALRecord> ret = curRec;
-
-        try {
-            advance();
-        }
-        catch (IgniteCheckedException e) {
-            curException = e;
-        }
-
-        return ret;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean onHasNext() throws IgniteCheckedException {
-        if (curException != null)
-            throw curException;
-
-        return curRec != null;
-    }
-
-    /** {@inheritDoc} */
     @Override protected void onClose() throws IgniteCheckedException {
         try {
             buf.close();
@@ -167,7 +128,7 @@
      *
      * @throws IgniteCheckedException If failed.
      */
-    protected void advance() throws IgniteCheckedException {
+    @Override protected void advance() throws IgniteCheckedException {
         if (curRec != null)
             lastRead = curRec.get1();
 
@@ -192,8 +153,6 @@
                 }
             }
             catch (WalSegmentTailReachedException e) {
-                AbstractReadFileHandle currWalSegment = this.currWalSegment;
-
                 IgniteCheckedException e0 = validateTailReachedException(e, currWalSegment);
 
                 if (e0 != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIteratorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIteratorAdapter.java
new file mode 100644
index 0000000..96b19eb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIteratorAdapter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Iterator over WAL segments. This abstract class provides most functionality for reading records.
+ */
+public abstract class AbstractWalRecordsIteratorAdapter
+    extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> implements WALIterator {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Current record preloaded, to be returned on next()<br> Normally this should be not null because advance() method
+     * should already prepare some value<br>
+     */
+    protected IgniteBiTuple<WALPointer, WALRecord> curRec;
+
+    /**
+     * The exception which can be thrown during reading next record. It holds until the next calling of next record.
+     */
+    private IgniteCheckedException curErr;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException {
+        if (curErr != null)
+            throw curErr;
+
+        IgniteBiTuple<WALPointer, WALRecord> ret = curRec;
+
+        try {
+            advance();
+        }
+        catch (IgniteCheckedException e) {
+            curErr = e;
+        }
+
+        return ret;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean onHasNext() throws IgniteCheckedException {
+        if (curErr != null)
+            throw curErr;
+
+        return curRec != null;
+    }
+
+    /**
+     * Switches records iterator to the next record. <ul> <li>{@link #curRec} will be updated.</li> </ul>
+     *
+     * {@code advance()} runs a step ahead {@link #next()}
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract void advance() throws IgniteCheckedException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java
index 6384032..6efa610 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java
@@ -36,4 +36,14 @@
      * @throws IOException If failed.
      */
     public void ensure(int requested) throws IOException;
+
+    /**
+     * @return Position in the stream.
+     */
+    public long position();
+
+    /**
+     * Size.
+     */
+    public long size() throws IOException;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInputImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInputImpl.java
index 2351ea7..a68adfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInputImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInputImpl.java
@@ -170,4 +170,18 @@
     @Override public String readUTF() throws IOException {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public long position() {
+        return buf.position();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public long size() throws IOException {
+        return buf.limit();
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java
new file mode 100644
index 0000000..2e11c86
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.HEADER_RECORD;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+
+/** Byte Buffer WAL Iterator */
+public class ByteBufferWalIterator extends AbstractWalRecordsIteratorAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final RecordSerializer serializer;
+
+    /** */
+    private final ByteBufferBackedDataInputImpl dataInput;
+
+    /** */
+    private WALPointer expWalPtr;
+
+    /** */
+    public ByteBufferWalIterator(
+        GridCacheSharedContext<?, ?> cctx,
+        ByteBuffer byteBuf,
+        int ver,
+        WALPointer walPointer
+    ) throws IgniteCheckedException {
+        this(cctx, byteBuf, ver, walPointer, null);
+    }
+
+    /** */
+    public ByteBufferWalIterator(
+        GridCacheSharedContext<?, ?> cctx,
+        ByteBuffer byteBuf,
+        int ver,
+        WALPointer expWalPtr,
+        IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter
+    ) throws IgniteCheckedException {
+        serializer = new RecordSerializerFactoryImpl(cctx, readTypeFilter).createSerializer(ver);
+
+        dataInput = new ByteBufferBackedDataInputImpl();
+
+        dataInput.buffer(byteBuf);
+
+        this.expWalPtr = expWalPtr;
+
+        advance();
+    }
+
+    /** */
+    private IgniteBiTuple<WALPointer, WALRecord> advanceRecord() throws IgniteCheckedException {
+        if (!dataInput.buffer().hasRemaining())
+            return null;
+
+        IgniteBiTuple<WALPointer, WALRecord> result;
+
+        try {
+            if (curRec == null)
+                skipHeader();
+
+            WALRecord rec = serializer.readRecord(dataInput, expWalPtr);
+
+            result = new IgniteBiTuple<>(rec.position(), rec);
+
+            expWalPtr = new WALPointer(expWalPtr.index(), expWalPtr.fileOffset() + rec.size(), 0);
+        }
+        catch (SegmentEofException e) {
+            return null;
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        return result;
+    }
+
+    /** */
+    private void skipHeader() throws IOException {
+        int position = dataInput.buffer().position();
+
+        int type = dataInput.readUnsignedByte();
+
+        WALRecord.RecordType recType = WALRecord.RecordType.fromIndex(type - 1);
+
+        if (recType == HEADER_RECORD) {
+            dataInput.buffer().position(position + HEADER_RECORD_SIZE);
+
+            expWalPtr = new WALPointer(expWalPtr.index(), expWalPtr.fileOffset() + HEADER_RECORD_SIZE, 0);
+        }
+        else
+            dataInput.buffer().position(position);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void advance() throws IgniteCheckedException {
+        do
+            curRec = advanceRecord();
+        while (curRec != null && curRec.get2().type() == null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Optional<WALPointer> lastRead() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/Crc32CheckingDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/Crc32CheckingDataInput.java
new file mode 100644
index 0000000..b55a461
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/Crc32CheckingDataInput.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+
+/**
+ * Checking of CRC32.
+ */
+public class Crc32CheckingDataInput extends ByteBufferBackedDataInputImpl implements AutoCloseable {
+    /** */
+    private final FastCrc crc = new FastCrc();
+
+    /**
+     * Last calc position.
+     */
+    private int lastCalcPosition;
+
+    /**
+     * Skip crc check.
+     */
+    private final boolean skipCheck;
+
+    /** */
+    private final ByteBufferBackedDataInput delegate;
+
+    /** */
+    public Crc32CheckingDataInput(ByteBufferBackedDataInput delegate, boolean skipCheck) {
+        this.delegate = delegate;
+
+        lastCalcPosition = buffer().position();
+
+        this.skipCheck = skipCheck;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void ensure(int requested) throws IOException {
+        int available = buffer().remaining();
+
+        if (available >= requested)
+            return;
+
+        updateCrc();
+
+        delegate.ensure(requested);
+
+        lastCalcPosition = 0;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void close() throws Exception {
+        updateCrc();
+
+        int val = crc.getValue();
+
+        int writtenCrc = readInt();
+
+        if ((val ^ writtenCrc) != 0 && !skipCheck) {
+            // If it is a last message we will skip it (EOF will be thrown).
+            ensure(5);
+
+            throw new IgniteDataIntegrityViolationException(
+                "val: " + val + " writtenCrc: " + writtenCrc
+            );
+        }
+    }
+
+    /** */
+    private void updateCrc() {
+        if (skipCheck)
+            return;
+
+        int oldPos = buffer().position();
+
+        buffer().position(lastCalcPosition);
+
+        crc.update(delegate.buffer(), oldPos - lastCalcPosition);
+
+        lastCalcPosition = oldPos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int skipBytes(int n) throws IOException {
+        ensure(n);
+
+        int skipped = Math.min(buffer().remaining(), n);
+
+        buffer().position(buffer().position() + skipped);
+
+        return skipped;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void readFully(byte[] b) throws IOException {
+        ensure(b.length);
+
+        buffer().get(b);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void readFully(byte[] b, int off, int len) throws IOException {
+        ensure(len);
+
+        buffer().get(b, off, len);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public byte readByte() throws IOException {
+        ensure(1);
+
+        return buffer().get();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public short readShort() throws IOException {
+        ensure(2);
+
+        return buffer().getShort();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public char readChar() throws IOException {
+        ensure(2);
+
+        return buffer().getChar();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public int readInt() throws IOException {
+        ensure(4);
+
+        return buffer().getInt();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public long readLong() throws IOException {
+        ensure(8);
+
+        return buffer().getLong();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public float readFloat() throws IOException {
+        ensure(4);
+
+        return buffer().getFloat();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public double readDouble() throws IOException {
+        ensure(8);
+
+        return buffer().getDouble();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer buffer() {
+        return delegate.buffer();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
index 2f34b57..26fb01d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
@@ -32,7 +32,7 @@
 /**
  * WAL file descriptor.
  */
-public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRecordsIterator.AbstractFileDescriptor {
+public class FileDescriptor implements Comparable<FileDescriptor>, AbstractFileWalRecordsIterator.AbstractFileDescriptor {
     /** file extension of WAL segment. */
     private static final String WAL_SEGMENT_FILE_EXT = ".wal";
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 5470e23..4e5a935 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -2857,7 +2857,7 @@
     /**
      *
      */
-    public static class ReadFileHandle extends AbstractFileHandle implements AbstractWalRecordsIterator.AbstractReadFileHandle {
+    public static class ReadFileHandle extends AbstractFileHandle implements AbstractFileWalRecordsIterator.AbstractReadFileHandle {
         /** Entry serializer. */
         RecordSerializer ser;
 
@@ -2923,7 +2923,7 @@
     /**
      * Iterator over WAL-log.
      */
-    private static class RecordsIterator extends AbstractWalRecordsIterator {
+    private static class RecordsIterator extends AbstractFileWalRecordsIterator {
         /** */
         private static final long serialVersionUID = 0L;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
index fcb2d43..6eaa8b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
@@ -39,7 +39,7 @@
  * Iterates over logical records of one WAL segment from archive. Used for WAL archive compression.
  * Doesn't deserialize actual record data, returns {@link MarshalledRecord} instances instead.
  */
-public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsIterator {
+public class SingleSegmentLogicalRecordsIterator extends AbstractFileWalRecordsIterator {
     /** Serial version uid. */
     private static final long serialVersionUID = 0L;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java
index 07c4917..33b5b7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java
@@ -18,13 +18,9 @@
 package org.apache.ignite.internal.processors.cache.persistence.wal.io;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
-import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
-import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * File input, backed by byte buffer file input.
@@ -41,219 +37,8 @@
      */
     void seek(long pos) throws IOException;
 
-    /**
-     * @return Position in the stream.
-     */
-    long position();
-
-    /**
-     * @param skipCheck If CRC check should be skipped.
-     * @return autoclosable fileInput, after its closing crc32 will be calculated and compared with saved one
-     */
-    SimpleFileInput.Crc32CheckingFileInput startRead(boolean skipCheck);
-
-    /**
-     * Checking of CRC32.
-     */
-    public class Crc32CheckingFileInput implements ByteBufferBackedDataInput, AutoCloseable {
-        /** */
-        private final FastCrc crc = new FastCrc();
-
-        /** Last calc position. */
-        private int lastCalcPosition;
-
-        /** Skip crc check. */
-        private boolean skipCheck;
-
-        /** */
-        private FileInput delegate;
-
-        /**
-         */
-        public Crc32CheckingFileInput(FileInput delegate, boolean skipCheck) {
-            this.delegate = delegate;
-            this.lastCalcPosition = delegate.buffer().position();
-            this.skipCheck = skipCheck;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void ensure(int requested) throws IOException {
-            int available = buffer().remaining();
-
-            if (available >= requested)
-                return;
-
-            updateCrc();
-
-            delegate.ensure(requested);
-
-            lastCalcPosition = 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws Exception {
-            updateCrc();
-
-            int val = crc.getValue();
-
-            int writtenCrc = this.readInt();
-
-            if ((val ^ writtenCrc) != 0 && !skipCheck) {
-                // If it last message we will skip it (EOF will be thrown).
-                ensure(5);
-
-                throw new IgniteDataIntegrityViolationException(
-                    "val: " + val + " writtenCrc: " + writtenCrc
-                );
-            }
-        }
-
-        /**
-         *
-         */
-        private void updateCrc() {
-            if (skipCheck)
-                return;
-
-            int oldPos = buffer().position();
-
-            buffer().position(lastCalcPosition);
-
-            crc.update(delegate.buffer(), oldPos - lastCalcPosition);
-
-            lastCalcPosition = oldPos;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int skipBytes(int n) throws IOException {
-            ensure(n);
-
-            int skipped = Math.min(buffer().remaining(), n);
-
-            buffer().position(buffer().position() + skipped);
-
-            return skipped;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public void readFully(@NotNull byte[] b) throws IOException {
-            ensure(b.length);
-
-            buffer().get(b);
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException {
-            ensure(len);
-
-            buffer().get(b, off, len);
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public boolean readBoolean() throws IOException {
-            return readByte() == 1;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public byte readByte() throws IOException {
-            ensure(1);
-
-            return buffer().get();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public int readUnsignedByte() throws IOException {
-            return readByte() & 0xFF;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public short readShort() throws IOException {
-            ensure(2);
-
-            return buffer().getShort();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public int readUnsignedShort() throws IOException {
-            return readShort() & 0xFFFF;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public char readChar() throws IOException {
-            ensure(2);
-
-            return buffer().getChar();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public int readInt() throws IOException {
-            ensure(4);
-
-            return buffer().getInt();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public long readLong() throws IOException {
-            ensure(8);
-
-            return buffer().getLong();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public float readFloat() throws IOException {
-            ensure(4);
-
-            return buffer().getFloat();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public double readDouble() throws IOException {
-            ensure(8);
-
-            return buffer().getDouble();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public String readLine() throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public String readUTF() throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public ByteBuffer buffer() {
-            return delegate.buffer();
-        }
+    /** {@inheritDoc} */
+    @Override public default long size() throws IOException {
+        return io().size();
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java
index 57b8589..9c847ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java
@@ -262,12 +262,4 @@
     @Override public String readUTF() throws IOException {
         throw new UnsupportedOperationException();
     }
-
-    /**
-     * @param skipCheck If CRC check should be skipped.
-     * @return autoclosable fileInput, after its closing crc will be calculated and compared with saved one
-     */
-    @Override public Crc32CheckingFileInput startRead(boolean skipCheck) {
-        return new Crc32CheckingFileInput(this, skipCheck);
-    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index df6ca77..ea49988 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -40,7 +40,7 @@
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
+import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractFileWalRecordsIterator;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
@@ -70,7 +70,7 @@
  * WAL reader iterator, for creation in standalone WAL reader tool Operates over one directory, does not provide start
  * and end boundaries
  */
-class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
+class StandaloneWalRecordsIterator extends AbstractFileWalRecordsIterator {
     /** Record buffer size */
     public static final int DFLT_BUF_SIZE = 2 * 1024 * 1024;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
index 9227df1..d6396fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
@@ -21,8 +21,8 @@
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
 
 /**
  * Record serializer.
@@ -54,7 +54,7 @@
      * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file
      * @return Read entry.
      */
-    public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;
+    public WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;
 
     /**
      * Flag to write (or not) wal pointer to record
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index 295e174..39f86e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -31,14 +31,13 @@
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
+import org.apache.ignite.internal.processors.cache.persistence.wal.Crc32CheckingDataInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleFileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -226,7 +225,7 @@
     }
 
     /** {@inheritDoc} */
-    @Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws IOException, IgniteCheckedException {
+    @Override public WALRecord readRecord(ByteBufferBackedDataInput in0, WALPointer expPtr) throws IOException, IgniteCheckedException {
         return readWithCrc(in0, expPtr, recordIO);
     }
 
@@ -362,13 +361,13 @@
      * @throws IgniteCheckedException If it's unable to read record.
      */
     static WALRecord readWithCrc(
-        FileInput in0,
+        ByteBufferBackedDataInput in0,
         WALPointer expPtr,
         RecordIO reader
     ) throws EOFException, IgniteCheckedException {
         long startPos = -1;
 
-        try (SimpleFileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) {
+        try (Crc32CheckingDataInput in = new Crc32CheckingDataInput(in0, skipCrc)) {
             startPos = in0.position();
 
             WALRecord res = reader.readWithHeaders(in, expPtr);
@@ -386,7 +385,7 @@
             long size = -1;
 
             try {
-                size = in0.io().size();
+                size = in0.size();
             }
             catch (IOException ignore) {
                 // It just for information. Fail calculate file size.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index 60c68c1..e0c8ff6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -30,7 +30,6 @@
 import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.F;
@@ -245,7 +244,7 @@
     }
 
     /** {@inheritDoc} */
-    @Override public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
+    @Override public WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
         return RecordV1Serializer.readWithCrc(in, expPtr, recordIO);
     }
 
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 9909ea7..c360034 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1233,8 +1233,8 @@
 org.apache.ignite.internal.processors.cache.persistence.tree.CorruptedTreeException
 org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO$EntryPart
 org.apache.ignite.internal.processors.cache.persistence.tree.reuse.LongListReuseBag
-org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator
-org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator$StartSeekingFilter
+org.apache.ignite.internal.processors.cache.persistence.wal.AbstractFileWalRecordsIterator
+org.apache.ignite.internal.processors.cache.persistence.wal.AbstractFileWalRecordsIterator$StartSeekingFilter
 org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager$1
 org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager$FileCompressorWorker$1
 org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager$RecordsIterator
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
index 5326f17..7e0666b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
@@ -26,9 +26,9 @@
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
+import org.apache.ignite.internal.processors.cache.persistence.wal.Crc32CheckingDataInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleFileInput;
 import org.junit.After;
 import org.junit.Before;
@@ -209,7 +209,7 @@
         fileInput.io().position(0);
 
         for (int i = 0; i < 1024 / 16; i++) {
-            try (FileInput.Crc32CheckingFileInput in = fileInput.startRead(false)) {
+            try (Crc32CheckingDataInput in = new Crc32CheckingDataInput(fileInput, false)) {
                 in.readInt();
                 in.readInt();
                 in.readInt();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java
new file mode 100644
index 0000000..cd6fd9f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java
@@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
+import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.wal.record.RecordUtils;
+import org.apache.ignite.testframework.wal.record.UnsupportedWalRecord;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ByteBufferWalIteratorTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE_NAME = GridCommonAbstractTest.DEFAULT_CACHE_NAME;
+
+    /** */
+    private IgniteEx ig;
+
+    /** */
+    private GridCacheSharedContext<Object, Object> sharedCtx;
+
+    /** */
+    private GridCacheContext<Object, Object> cctx;
+
+    /** */
+    private RecordSerializer serializer;
+
+    /** */
+    private int idx;
+
+    /** */
+    private @Nullable IgniteInternalCache<Object, Object> cache;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+
+        idx = new Random().nextInt();
+
+        ig = startGrid(0);
+
+        ig.cluster().state(ClusterState.ACTIVE);
+
+        sharedCtx = ig.context().cache().context();
+
+        cache = sharedCtx.cache().cache(CACHE_NAME);
+
+        cctx = cache.context();
+
+        RecordSerializerFactory serializerFactory = new RecordSerializerFactoryImpl(sharedCtx);
+
+        serializer = serializerFactory.createSerializer(RecordSerializerFactory.LATEST_SERIALIZER_VERSION);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    private void writeRecord(ByteBuffer byteBuf,
+        WALRecord walRecord) throws IgniteCheckedException {
+        log.info("Writing " + walRecord.type());
+
+        int segment = idx;
+
+        int fileOff = byteBuf.position();
+
+        int size = serializer.size(walRecord);
+
+        walRecord.size(size);
+
+        WALPointer walPointer = new WALPointer(segment, fileOff, size);
+
+        walRecord.position(walPointer);
+
+        serializer.writeRecord(walRecord, byteBuf);
+    }
+
+    /** */
+    private static boolean dataEntriesEqual(DataEntry x, DataEntry y) {
+        if (x == y)
+            return true;
+
+        if (x == null || y == null)
+            return false;
+
+        return x.cacheId() == y.cacheId()
+            && x.op() == y.op()
+            && Objects.equals(x.key(), y.key());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME));
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                )
+        );
+
+        return cfg;
+    }
+
+    /** */
+    @Test
+    public void testDataRecordsRead() throws Exception {
+        ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder());
+
+        final int cnt = 10;
+
+        List<DataEntry> entries = generateEntries(cctx, cnt);
+
+        for (int i = 0; i < entries.size(); i++)
+            writeRecord(byteBuf, new DataRecord(entries.get(i)));
+
+        byteBuf.flip();
+
+        WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
+            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0));
+
+        Iterator<DataEntry> dataEntriesIter = entries.iterator();
+
+        while (walIter.hasNext()) {
+            assertTrue(dataEntriesIter.hasNext());
+
+            WALRecord record = walIter.next().get2();
+
+            assertTrue(record instanceof DataRecord);
+
+            DataEntry dataEntry = dataEntriesIter.next();
+
+            assertTrue(dataEntriesEqual(
+                ((DataRecord)record).get(0),
+                dataEntry));
+        }
+
+        assertFalse(dataEntriesIter.hasNext());
+    }
+
+    /** */
+    @Test
+    public void testWalRecordsRead() throws Exception {
+        ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder());
+
+        List<WALRecord> records = Arrays.stream(WALRecord.RecordType.values())
+            .filter(t -> t != WALRecord.RecordType.SWITCH_SEGMENT_RECORD)
+            .map(RecordUtils::buildWalRecord)
+            .filter(Objects::nonNull)
+            .filter(r -> !(r instanceof UnsupportedWalRecord))
+            .collect(Collectors.toList());
+
+        for (WALRecord record : records)
+            writeRecord(byteBuf, record);
+
+        byteBuf.flip();
+
+        WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
+            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0));
+
+        Iterator<WALRecord> recordsIter = records.iterator();
+
+        while (walIter.hasNext()) {
+            assertTrue(recordsIter.hasNext());
+
+            WALRecord actualRec = walIter.next().get2();
+
+            WALRecord expectedRec = recordsIter.next();
+
+            assertTrue("Records of type " + expectedRec.type() + " are different:\n" +
+                    "\tExpected:\t" + expectedRec + "\n" +
+                    "\tActual  :\t" + actualRec,
+                recordsEqual(
+                    expectedRec,
+                    actualRec));
+        }
+
+        assertFalse(recordsIter.hasNext());
+    }
+
+    /** */
+    private boolean recordsEqual(WALRecord x, WALRecord y) {
+        if (x == y)
+            return true;
+
+        if (x == null || y == null)
+            return false;
+
+        log.info("Comparing " + x.type() + " and " + y.type());
+
+        return x.type() == y.type()
+            && Objects.equals(x.position(), y.position())
+            && x.size() == y.size()
+            && (!(x instanceof TimeStampRecord) || ((TimeStampRecord)x).timestamp() == ((TimeStampRecord)y).timestamp());
+    }
+
+    /** */
+    @Test
+    public void testReadFiltered() throws Exception {
+        ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder());
+
+        List<WALRecord> physicalRecords = Arrays.stream(WALRecord.RecordType.values())
+            .filter(t -> t.purpose() == WALRecord.RecordPurpose.PHYSICAL)
+            .map(RecordUtils::buildWalRecord)
+            .filter(Objects::nonNull)
+            .filter(r -> !(r instanceof UnsupportedWalRecord))
+            .collect(Collectors.toList());
+
+        final int cnt = physicalRecords.size();
+
+        List<DataEntry> entries = generateEntries(cctx, cnt);
+
+        for (int i = 0; i < entries.size(); i++) {
+            writeRecord(byteBuf, new DataRecord(entries.get(i)));
+
+            writeRecord(byteBuf, physicalRecords.get(i));
+        }
+
+        byteBuf.flip();
+
+        WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
+            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0),
+            (t, p) -> t.purpose() == WALRecord.RecordPurpose.LOGICAL);
+
+        Iterator<DataEntry> dataEntriesIter = entries.iterator();
+
+        while (walIter.hasNext()) {
+            assertTrue(dataEntriesIter.hasNext());
+
+            WALRecord record = walIter.next().get2();
+
+            assertTrue(record instanceof DataRecord);
+
+            DataEntry dataEntry = dataEntriesIter.next();
+
+            assertTrue(dataEntriesEqual(
+                ((DataRecord)record).get(0),
+                dataEntry));
+        }
+
+        assertFalse(dataEntriesIter.hasNext());
+    }
+
+    /** */
+    private List<DataEntry> generateEntries(GridCacheContext<Object, Object> cctx, int cnt) {
+        List<DataEntry> entries = new ArrayList<>(cnt);
+
+        for (int i = 0; i < cnt; i++) {
+            GridCacheOperation op = i % 2 == 0 ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
+
+            KeyCacheObject key = cctx.toCacheKeyObject(i);
+
+            CacheObject val = null;
+
+            if (op != GridCacheOperation.DELETE)
+                val = cctx.toCacheObject("value-" + i);
+
+            entries.add(
+                new DataEntry(cctx.cacheId(), key, val, op, null, cctx.cache().nextVersion(),
+                    0L,
+                    cctx.affinity().partition(i), i, DataEntry.EMPTY_FLAGS));
+        }
+        return entries;
+    }
+
+    /** */
+    @Test
+    public void testBrokenTail() throws Exception {
+        ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder());
+
+        List<DataEntry> entries = generateEntries(cctx, 3);
+
+        for (int i = 0; i < 2; i++)
+            writeRecord(byteBuf, new DataRecord(entries.get(i)));
+
+        int position1 = byteBuf.position();
+
+        writeRecord(byteBuf, new DataRecord(entries.get(2)));
+
+        int position2 = byteBuf.position();
+
+        byteBuf.flip();
+
+        byteBuf.limit((position1 + position2) >> 1);
+
+        WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
+            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0));
+
+        assertTrue(walIter.hasNext());
+
+        walIter.next();
+
+        assertTrue(walIter.hasNext());
+
+        walIter.next();
+
+        try {
+            walIter.hasNext();
+
+            fail("hasNext() expected to fail");
+        }
+        catch (IgniteException e) {
+            assertTrue(X.hasCause(e, IOException.class));
+        }
+    }
+
+    /** */
+    @Test
+    public void testEmptyBuffer() throws Exception {
+        ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder());
+
+        byteBuf.flip();
+
+        WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
+            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0));
+
+        assertFalse(walIter.hasNext());
+
+        try {
+            walIter.next();
+
+            fail("next() expected to fail");
+        }
+        catch (NoSuchElementException ignored) {
+            // This is expected.
+        }
+    }
+
+    /** */
+    @Test
+    public void testWalSegmentReadFromDisk() throws Exception {
+        FileDescriptor[] archiveFiles = generateWalFiles(20, 10_000);
+
+        for (int i = 0; i < archiveFiles.length; i++)
+            checkIteratorFromDisk(archiveFiles[i]);
+    }
+
+    /** */
+    private void checkIteratorFromDisk(FileDescriptor fd) throws IOException, IgniteCheckedException {
+        log.info("Checking " + fd.file());
+
+        ByteBuffer byteBuf = loadFile(fd);
+
+        checkByteBuffer(byteBuf, false, true, (int)fd.idx(), 0);
+    }
+
+    /** */
+    private void checkByteBuffer(ByteBuffer byteBuf, boolean adaptTest, boolean hasHdr, int idx, int pos) throws IgniteCheckedException {
+        log.info("Bytes count " + byteBuf.limit());
+
+        int p0 = hasHdr ? 29 : 0;
+
+        int shift = adaptTest ? -1 : 0;
+
+        ByteBufferWalIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
+            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, pos, 0));
+
+        Map<WALRecord.RecordType, Integer> counts = new EnumMap<>(WALRecord.RecordType.class);
+
+        while (walIter.hasNext()) {
+            int p1 = byteBuf.position();
+
+            IgniteBiTuple<WALPointer, WALRecord> next = walIter.next();
+
+            if (log.isDebugEnabled())
+                log.debug("Got " + next.get2().type() + " at " + next.get1());
+
+            if (shift >= 0)
+                assertEquals("WalPointer offset check failed", p0 + shift, next.get1().fileOffset());
+            else
+                shift = next.get1().fileOffset() - p0;
+
+            assertEquals("WalPointer length check failed", p1 - p0, next.get1().length());
+
+            assertEquals("WalPointers comparison failed", next.get1(), next.get2().position());
+
+            assertEquals("WalPointers length comparison failed", next.get1().length(), next.get2().position().length());
+
+            p0 = p1;
+
+            counts.merge(next.get2().type(), 1, Integer::sum);
+
+            assertTrue(next != null);
+        }
+
+        assertFalse("ByteBuffer has some unprocessed bytes", byteBuf.hasRemaining());
+
+        printStats(counts);
+    }
+
+    /** */
+    private void printStats(Map<WALRecord.RecordType, Integer> counts) {
+        if (counts.isEmpty()) {
+            log.info("No record");
+            return;
+        }
+
+        ArrayList<WALRecord.RecordType> types = new ArrayList<>(counts.keySet());
+
+        types.sort((x, y) -> -counts.get(x).compareTo(counts.get(y)));
+
+        int len = types.stream().map(x -> x.toString().length()).max(Integer::compare).orElse(0);
+
+        char[] spaces = new char[len];
+
+        Arrays.fill(spaces, ' ');
+
+        StringBuilder sb = new StringBuilder("Statistics:");
+
+        types.forEach(x -> sb.append("\n\t")
+            .append(x)
+            .append(spaces, 0, len - x.toString().length())
+            .append("\t")
+            .append(counts.get(x)));
+
+        log.info(sb.toString());
+    }
+
+    /** */
+    private ByteBuffer loadFile(FileDescriptor fd) throws IOException {
+        File file = fd.file();
+
+        int size = (int)file.length();
+
+        FileInputStream fileInputStream = new FileInputStream(file);
+
+        final byte[] bytes = new byte[size];
+
+        int length = fileInputStream.read(bytes);
+
+        assertTrue(length == size);
+
+        ByteBuffer byteBuf = ByteBuffer.wrap(bytes);
+
+        byteBuf.order(ByteOrder.nativeOrder());
+
+        return byteBuf;
+    }
+
+    /** */
+    private FileDescriptor[] generateWalFiles(int files, int size) throws IgniteCheckedException {
+        Random random = new Random();
+
+        IgniteCacheDatabaseSharedManager sharedMgr = ig.context().cache().context().database();
+
+        IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+
+        for (int fileNo = 0; fileNo < files; fileNo++) {
+            for (int i = 0; i < size; i++) {
+                switch (random.nextInt(2)) {
+                    case 0:
+                        cache.put(random.nextInt(100), "Cache value " + random.nextInt());
+                        break;
+                    case 1:
+                        cache.remove(random.nextInt(100));
+                        break;
+                }
+            }
+
+            sharedMgr.checkpointReadLock();
+
+            try {
+                walMgr.log(new SnapshotRecord(fileNo, false), RolloverType.NEXT_SEGMENT);
+            }
+            finally {
+                sharedMgr.checkpointReadUnlock();
+            }
+        }
+
+        while (true) {
+            FileDescriptor[] archiveFiles = ((FileWriteAheadLogManager)walMgr).walArchiveFiles();
+
+            if (archiveFiles.length >= files)
+                return archiveFiles;
+
+            LockSupport.parkNanos(10_000_000);
+        }
+    }
+
+    /** */
+    @Test
+    public void testPartialWalSegmentReadFromDisk() throws Exception {
+        FileDescriptor[] archiveFiles = generateWalFiles(1, 100);
+
+        for (int i = 0; i < archiveFiles.length; i++)
+            checkPartialIteratorFromDisk(archiveFiles[i]);
+    }
+
+    /** */
+    private void checkPartialIteratorFromDisk(FileDescriptor fd) throws IOException, IgniteCheckedException {
+        log.info("Checking " + fd.file());
+
+        ByteBuffer byteBuf = loadFile(fd);
+
+        log.info("Bytes count " + byteBuf.limit());
+
+        List<Integer> positions = new ArrayList<>();
+
+        positions.add(byteBuf.position());
+
+        ByteBufferWalIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
+            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer((int)fd.idx(), 0, 0));
+
+        positions.add(byteBuf.position());
+
+        positions.addAll(
+            StreamSupport.stream(walIter.spliterator(), false)
+                .map(x -> byteBuf.position())
+                .collect(Collectors.toList()));
+
+        Random random = new Random();
+
+        int size = positions.size();
+
+        assertTrue("Size shouild be at least 10 for this test", size >= 10);
+
+        int n1 = (int)((0.1 + 0.4 * random.nextDouble()) * size);
+
+        int n2 = (int)((0.5 + 0.4 * random.nextDouble()) * size);
+
+        // With header.
+        checkByteBufferPart(byteBuf, positions, 0, n1, true, (int)fd.idx());
+
+        // Middle part.
+        checkByteBufferPart(byteBuf, positions, n1, n2, false, (int)fd.idx());
+
+        // Empty buffer.
+        checkByteBufferPart(byteBuf, positions, n2, n2, false, (int)fd.idx());
+
+        // With tail.
+        checkByteBufferPart(byteBuf, positions, n2, size - 1, false, (int)fd.idx());
+    }
+
+    /** */
+    private void checkByteBufferPart(ByteBuffer byteBuf, List<Integer> positions, int fromRec, int toRec,
+        boolean hasHdr, int idx)
+        throws IgniteCheckedException {
+        int fromPos = positions.get(fromRec);
+
+        int toPos = positions.get(toRec);
+
+        log.info(("Checking ByteBuffer from " + fromRec + "(" + fromPos + ") to " + toRec + "(" + toPos + ")"));
+
+        int len = toPos - fromPos;
+
+        byteBuf.position(fromPos).limit(toPos);
+
+        byte[] arr = byteBuf.array();
+
+        byteBuf = ByteBuffer.allocate(len).order(ByteOrder.nativeOrder());
+
+        System.arraycopy(arr, fromPos, byteBuf.array(), 0, len);
+
+        int pos = 0;
+
+        if (byteBuf.limit() > 12) {
+            byteBuf.position(9);
+
+            pos = byteBuf.getInt();
+
+            byteBuf.position(0);
+        }
+
+        checkByteBuffer(byteBuf, true, hasHdr, idx, pos);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
index c79381a..2ba0c57 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
@@ -42,6 +42,7 @@
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.UsedPagesMetricTestPersistence;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIOFreeSizeTest;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIOTest;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferWalIteratorTest;
 import org.apache.ignite.internal.processors.cache.persistence.wal.CpTriggeredWalDeltaConsistencyTest;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ExplicitWalDeltaConsistencyTest;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManagerSelfTest;
@@ -124,6 +125,8 @@
 
         GridTestUtils.addTestIfNeeded(suite, WalCompactionNotificationsTest.class, ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, ByteBufferWalIteratorTest.class, ignoredTests);
+
         return suite;
     }
 }