blob: 6309004d6871cad778e67e647ed367ee918987eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.wal.record.RecordUtils;
import org.apache.ignite.testframework.wal.record.UnsupportedWalRecord;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
*
*/
public class ByteBufferWalIteratorTest extends GridCommonAbstractTest {
/** Cache name. */
private static final String CACHE_NAME = GridCommonAbstractTest.DEFAULT_CACHE_NAME;
/** */
private IgniteEx ig;
/** */
private GridCacheSharedContext<Object, Object> sharedCtx;
/** */
private GridCacheContext<Object, Object> cctx;
/** */
private RecordSerializer serializer;
/** */
private int idx;
/** */
private @Nullable IgniteInternalCache<Object, Object> cache;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cleanPersistenceDir();
idx = new Random().nextInt();
ig = startGrid(0);
ig.cluster().state(ClusterState.ACTIVE);
sharedCtx = ig.context().cache().context();
cache = sharedCtx.cache().cache(CACHE_NAME);
cctx = cache.context();
RecordSerializerFactory serializerFactory = new RecordSerializerFactoryImpl(sharedCtx);
serializer = serializerFactory.createSerializer(RecordSerializerFactory.LATEST_SERIALIZER_VERSION);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
super.afterTest();
cleanPersistenceDir();
}
/** */
private void writeRecord(ByteBuffer byteBuf,
WALRecord walRecord) throws IgniteCheckedException {
log.info("Writing " + walRecord.type());
int segment = idx;
int fileOff = byteBuf.position();
int size = serializer.size(walRecord);
walRecord.size(size);
WALPointer walPointer = new WALPointer(segment, fileOff, size);
walRecord.position(walPointer);
serializer.writeRecord(walRecord, byteBuf);
}
/** */
private static boolean dataEntriesEqual(DataEntry x, DataEntry y) {
if (x == y)
return true;
if (x == null || y == null)
return false;
return x.cacheId() == y.cacheId()
&& x.op() == y.op()
&& Objects.equals(x.key(), y.key());
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME));
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
)
);
return cfg;
}
/** */
@Test
public void testDataRecordsRead() throws Exception {
ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder());
final int cnt = 10;
List<DataEntry> entries = generateEntries(cctx, cnt);
for (int i = 0; i < entries.size(); i++)
writeRecord(byteBuf, new DataRecord(entries.get(i)));
byteBuf.flip();
WALIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer(idx, 0, 0));
Iterator<DataEntry> dataEntriesIter = entries.iterator();
while (walIter.hasNext()) {
assertTrue(dataEntriesIter.hasNext());
WALRecord record = walIter.next().get2();
assertTrue(record instanceof DataRecord);
DataEntry dataEntry = dataEntriesIter.next();
assertTrue(dataEntriesEqual(
((DataRecord)record).get(0),
dataEntry));
}
assertFalse(dataEntriesIter.hasNext());
}
/** */
@Test
public void testWalRecordsRead() throws Exception {
ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder());
List<WALRecord> records = Arrays.stream(WALRecord.RecordType.values())
.filter(t -> t != WALRecord.RecordType.SWITCH_SEGMENT_RECORD)
.map(RecordUtils::buildWalRecord)
.filter(Objects::nonNull)
.filter(r -> !(r instanceof UnsupportedWalRecord))
.collect(Collectors.toList());
for (WALRecord record : records)
writeRecord(byteBuf, record);
byteBuf.flip();
WALIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer(idx, 0, 0));
Iterator<WALRecord> recordsIter = records.iterator();
while (walIter.hasNext()) {
assertTrue(recordsIter.hasNext());
WALRecord actualRec = walIter.next().get2();
WALRecord expectedRec = recordsIter.next();
assertTrue("Records of type " + expectedRec.type() + " are different:\n" +
"\tExpected:\t" + expectedRec + "\n" +
"\tActual :\t" + actualRec,
recordsEqual(
expectedRec,
actualRec));
}
assertFalse(recordsIter.hasNext());
}
/** */
private boolean recordsEqual(WALRecord x, WALRecord y) {
if (x == y)
return true;
if (x == null || y == null)
return false;
log.info("Comparing " + x.type() + " and " + y.type());
return x.type() == y.type()
&& Objects.equals(x.position(), y.position())
&& x.size() == y.size()
&& (!(x instanceof TimeStampRecord) || ((TimeStampRecord)x).timestamp() == ((TimeStampRecord)y).timestamp());
}
/** */
@Test
public void testReadFiltered() throws Exception {
ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder());
List<WALRecord> physicalRecords = Arrays.stream(WALRecord.RecordType.values())
.filter(t -> t.purpose() == WALRecord.RecordPurpose.PHYSICAL)
.map(RecordUtils::buildWalRecord)
.filter(Objects::nonNull)
.filter(r -> !(r instanceof UnsupportedWalRecord))
.collect(Collectors.toList());
final int cnt = physicalRecords.size();
List<DataEntry> entries = generateEntries(cctx, cnt);
for (int i = 0; i < entries.size(); i++) {
writeRecord(byteBuf, new DataRecord(entries.get(i)));
writeRecord(byteBuf, physicalRecords.get(i));
}
byteBuf.flip();
RecordSerializer serializer = new RecordSerializerFactoryImpl(
sharedCtx,
(t, p) -> t.purpose() == WALRecord.RecordPurpose.LOGICAL
)
.createSerializer(RecordSerializerFactory.LATEST_SERIALIZER_VERSION);
WALIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer(idx, 0, 0));
Iterator<DataEntry> dataEntriesIter = entries.iterator();
while (walIter.hasNext()) {
assertTrue(dataEntriesIter.hasNext());
WALRecord record = walIter.next().get2();
assertTrue(record instanceof DataRecord);
DataEntry dataEntry = dataEntriesIter.next();
assertTrue(dataEntriesEqual(
((DataRecord)record).get(0),
dataEntry));
}
assertFalse(dataEntriesIter.hasNext());
}
/** */
private List<DataEntry> generateEntries(GridCacheContext<Object, Object> cctx, int cnt) {
List<DataEntry> entries = new ArrayList<>(cnt);
for (int i = 0; i < cnt; i++) {
GridCacheOperation op = i % 2 == 0 ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
KeyCacheObject key = cctx.toCacheKeyObject(i);
CacheObject val = null;
if (op != GridCacheOperation.DELETE)
val = cctx.toCacheObject("value-" + i);
entries.add(
new DataEntry(cctx.cacheId(), key, val, op, null, cctx.cache().nextVersion(),
0L,
cctx.affinity().partition(i), i, DataEntry.EMPTY_FLAGS));
}
return entries;
}
/** */
@Test
public void testBrokenTail() throws Exception {
ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder());
List<DataEntry> entries = generateEntries(cctx, 3);
for (int i = 0; i < 2; i++)
writeRecord(byteBuf, new DataRecord(entries.get(i)));
int position1 = byteBuf.position();
writeRecord(byteBuf, new DataRecord(entries.get(2)));
int position2 = byteBuf.position();
byteBuf.flip();
byteBuf.limit((position1 + position2) >> 1);
WALIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer(idx, 0, 0));
assertTrue(walIter.hasNext());
walIter.next();
assertTrue(walIter.hasNext());
walIter.next();
try {
walIter.hasNext();
fail("hasNext() expected to fail");
}
catch (IgniteException e) {
assertTrue(X.hasCause(e, IOException.class));
}
}
/** */
@Test
public void testEmptyBuffer() throws Exception {
ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder());
byteBuf.flip();
WALIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer(idx, 0, 0));
assertFalse(walIter.hasNext());
try {
walIter.next();
fail("next() expected to fail");
}
catch (NoSuchElementException ignored) {
// This is expected.
}
}
/** */
@Test
public void testWalSegmentReadFromDisk() throws Exception {
FileDescriptor[] archiveFiles = generateWalFiles(20, 10_000);
for (int i = 0; i < archiveFiles.length; i++)
checkIteratorFromDisk(archiveFiles[i]);
}
/** */
private void checkIteratorFromDisk(FileDescriptor fd) throws IOException, IgniteCheckedException {
log.info("Checking " + fd.file());
ByteBuffer byteBuf = loadFile(fd);
checkByteBuffer(byteBuf, false, true, (int)fd.idx(), 0);
}
/** */
private void checkByteBuffer(ByteBuffer byteBuf, boolean adaptTest, boolean hasHdr, int idx, int pos) throws IgniteCheckedException {
log.info("Bytes count " + byteBuf.limit());
int p0 = hasHdr ? 29 : 0;
int shift = adaptTest ? -1 : 0;
WALPointer ptr = hasHdr ? null : new WALPointer(idx, pos, 0);
ByteBufferWalIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, ptr);
Map<WALRecord.RecordType, Integer> counts = new EnumMap<>(WALRecord.RecordType.class);
while (walIter.hasNext()) {
int p1 = byteBuf.position();
IgniteBiTuple<WALPointer, WALRecord> next = walIter.next();
if (log.isDebugEnabled())
log.debug("Got " + next.get2().type() + " at " + next.get1());
if (shift >= 0)
assertEquals("WalPointer offset check failed", p0 + shift, next.get1().fileOffset());
else
shift = next.get1().fileOffset() - p0;
assertEquals("WalPointer length check failed", p1 - p0, next.get1().length());
assertEquals("WalPointers comparison failed", next.get1(), next.get2().position());
assertEquals("WalPointers length comparison failed", next.get1().length(), next.get2().position().length());
p0 = p1;
counts.merge(next.get2().type(), 1, Integer::sum);
assertTrue(next != null);
}
assertFalse("ByteBuffer has some unprocessed bytes", byteBuf.hasRemaining());
printStats(counts);
}
/** */
private void printStats(Map<WALRecord.RecordType, Integer> counts) {
if (counts.isEmpty()) {
log.info("No record");
return;
}
ArrayList<WALRecord.RecordType> types = new ArrayList<>(counts.keySet());
types.sort((x, y) -> -counts.get(x).compareTo(counts.get(y)));
int len = types.stream().map(x -> x.toString().length()).max(Integer::compare).orElse(0);
char[] spaces = new char[len];
Arrays.fill(spaces, ' ');
StringBuilder sb = new StringBuilder("Statistics:");
types.forEach(x -> sb.append("\n\t")
.append(x)
.append(spaces, 0, len - x.toString().length())
.append("\t")
.append(counts.get(x)));
log.info(sb.toString());
}
/** */
private ByteBuffer loadFile(FileDescriptor fd) throws IOException {
File file = fd.file();
int size = (int)file.length();
FileInputStream fileInputStream = new FileInputStream(file);
final byte[] bytes = new byte[size];
int length = fileInputStream.read(bytes);
assertTrue(length == size);
ByteBuffer byteBuf = ByteBuffer.wrap(bytes);
byteBuf.order(ByteOrder.nativeOrder());
return byteBuf;
}
/** */
private FileDescriptor[] generateWalFiles(int files, int size) throws IgniteCheckedException {
Random random = new Random();
IgniteCacheDatabaseSharedManager sharedMgr = ig.context().cache().context().database();
IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
for (int fileNo = 0; fileNo < files; fileNo++) {
for (int i = 0; i < size; i++) {
switch (random.nextInt(2)) {
case 0:
cache.put(random.nextInt(100), "Cache value " + random.nextInt());
break;
case 1:
cache.remove(random.nextInt(100));
break;
}
}
sharedMgr.checkpointReadLock();
try {
walMgr.log(new SnapshotRecord(fileNo, false), RolloverType.NEXT_SEGMENT);
}
finally {
sharedMgr.checkpointReadUnlock();
}
}
while (true) {
FileDescriptor[] archiveFiles = ((FileWriteAheadLogManager)walMgr).walArchiveFiles();
if (archiveFiles.length >= files)
return archiveFiles;
LockSupport.parkNanos(10_000_000);
}
}
/** */
@Test
public void testPartialWalSegmentReadFromDisk() throws Exception {
FileDescriptor[] archiveFiles = generateWalFiles(1, 100);
for (int i = 0; i < archiveFiles.length; i++)
checkPartialIteratorFromDisk(archiveFiles[i]);
}
/** */
private void checkPartialIteratorFromDisk(FileDescriptor fd) throws IOException, IgniteCheckedException {
log.info("Checking " + fd.file());
ByteBuffer byteBuf = loadFile(fd);
log.info("Bytes count " + byteBuf.limit());
List<Integer> positions = new ArrayList<>();
positions.add(byteBuf.position());
ByteBufferWalIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer((int)fd.idx(), 0, 0));
positions.add(byteBuf.position());
positions.addAll(
StreamSupport.stream(walIter.spliterator(), false)
.map(x -> byteBuf.position())
.collect(Collectors.toList()));
Random random = new Random();
int size = positions.size();
assertTrue("Size shouild be at least 10 for this test", size >= 10);
int n1 = (int)((0.1 + 0.4 * random.nextDouble()) * size);
int n2 = (int)((0.5 + 0.4 * random.nextDouble()) * size);
// With header.
checkByteBufferPart(byteBuf, positions, 0, n1, true, (int)fd.idx());
// Middle part.
checkByteBufferPart(byteBuf, positions, n1, n2, false, (int)fd.idx());
// Empty buffer.
checkByteBufferPart(byteBuf, positions, n2, n2, false, (int)fd.idx());
// With tail.
checkByteBufferPart(byteBuf, positions, n2, size - 1, false, (int)fd.idx());
}
/** */
private void checkByteBufferPart(ByteBuffer byteBuf, List<Integer> positions, int fromRec, int toRec,
boolean hasHdr, int idx)
throws IgniteCheckedException {
int fromPos = positions.get(fromRec);
int toPos = positions.get(toRec);
log.info(("Checking ByteBuffer from " + fromRec + "(" + fromPos + ") to " + toRec + "(" + toPos + ")"));
int len = toPos - fromPos;
byteBuf.position(fromPos).limit(toPos);
byte[] arr = byteBuf.array();
byteBuf = ByteBuffer.allocate(len).order(ByteOrder.nativeOrder());
System.arraycopy(arr, fromPos, byteBuf.array(), 0, len);
int pos = 0;
if (byteBuf.limit() > 12) {
byteBuf.position(9);
pos = byteBuf.getInt();
byteBuf.position(0);
}
checkByteBuffer(byteBuf, true, hasHdr, idx, pos);
}
}