blob: 74c7a6924afc9a7009fd38a81ea8ac7ca9f28f85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
import org.apache.ignite.internal.pagemem.wal.record.MarshalledDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.UnwrapMvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer.EncryptedDataEntry;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder.DFLT_HIGH_BOUND;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
/**
* WAL reader iterator, for creation in standalone WAL reader tool Operates over one directory, does not provide start
* and end boundaries
*/
class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
/** Record buffer size */
public static final int DFLT_BUF_SIZE = 2 * 1024 * 1024;
/** */
private static final long serialVersionUID = 0L;
/** Factory to provide I/O interfaces for read primitives with files. */
private static final SegmentFileInputFactory FILE_INPUT_FACTORY = new SimpleSegmentFileInputFactory();
/**
* File descriptors remained to scan.
* <code>null</code> value means directory scan mode
*/
@Nullable
private final List<FileDescriptor> walFileDescriptors;
/** */
private int curIdx = -1;
/** Keep binary. This flag disables converting of non primitive types (BinaryObjects) */
private boolean keepBinary;
/** Replay from bound include. */
private final WALPointer lowBound;
/** Replay to bound include */
private final WALPointer highBound;
/**
* Creates iterator in file-by-file iteration mode. Directory
*
* @param log Logger.
* @param sharedCtx Shared context. Cache processor is to be configured if Cache Object Key & Data Entry is
* required.
* @param ioFactory File I/O factory.
* @param keepBinary Keep binary. This flag disables converting of non primitive types (BinaryObjects will be used
* instead)
* @param walFiles Wal files.
*/
StandaloneWalRecordsIterator(
@NotNull IgniteLogger log,
@NotNull GridCacheSharedContext sharedCtx,
@NotNull FileIOFactory ioFactory,
@NotNull List<FileDescriptor> walFiles,
IgniteBiPredicate<RecordType, WALPointer> readTypeFilter,
WALPointer lowBound,
WALPointer highBound,
boolean keepBinary,
int initialReadBufferSize,
boolean strictBoundsCheck
) throws IgniteCheckedException {
super(
log,
sharedCtx,
new RecordSerializerFactoryImpl(sharedCtx, readTypeFilter),
ioFactory,
initialReadBufferSize,
FILE_INPUT_FACTORY
);
if (strictBoundsCheck)
strictCheck(walFiles, lowBound, highBound);
this.lowBound = lowBound;
this.highBound = highBound;
this.keepBinary = keepBinary;
walFileDescriptors = walFiles;
init(walFiles);
advance();
}
/**
* @param walFiles Wal files.
* @return printable indexes of segment files.
*/
private static String printIndexes(List<FileDescriptor> walFiles) {
return "[" + String.join(",", walFiles.stream().map(f -> Long.toString(f.idx())).collect(Collectors.toList())) + "]";
}
/**
* @param walFiles Wal files.
* @param lowBound Low bound.
* @param highBound High bound.
*
* @throws IgniteCheckedException if failed
*/
private static void strictCheck(
List<FileDescriptor> walFiles,
WALPointer lowBound,
WALPointer highBound
) throws IgniteCheckedException {
int idx = 0;
if (lowBound.index() > Long.MIN_VALUE) {
for (; idx < walFiles.size(); idx++) {
FileDescriptor desc = walFiles.get(idx);
assert desc != null;
if (desc.idx() == lowBound.index())
break;
}
}
if (idx == walFiles.size())
throw new StrictBoundsCheckException("Wal segments not in bounds. loBoundIndex=" + lowBound.index() +
", indexes=" + printIndexes(walFiles));
long curWalSegmIdx = walFiles.get(idx).idx();
for (; idx < walFiles.size() && curWalSegmIdx <= highBound.index(); idx++, curWalSegmIdx++) {
FileDescriptor desc = walFiles.get(idx);
assert desc != null;
if (curWalSegmIdx != desc.idx())
throw new StrictBoundsCheckException("Wal segment " + curWalSegmIdx + " not found in files " + printIndexes(walFiles));
}
if (highBound.index() < Long.MAX_VALUE && curWalSegmIdx <= highBound.index())
throw new StrictBoundsCheckException("Wal segments not in bounds. hiBoundIndex=" + highBound.index() +
", indexes=" + printIndexes(walFiles));
}
/**
* For directory mode sets oldest file as initial segment, for file by file mode, converts all files to descriptors
* and gets oldest as initial.
*
* @param walFiles files for file-by-file iteration mode
*/
private void init(List<FileDescriptor> walFiles) {
if (walFiles == null || walFiles.isEmpty())
return;
curWalSegmIdx = walFiles.get(curIdx + 1).idx() - 1;
if (log.isDebugEnabled())
log.debug("Initialized WAL cursor [curWalSegmIdx=" + curWalSegmIdx + ']');
}
/** {@inheritDoc} */
@Override protected IgniteCheckedException validateTailReachedException(
WalSegmentTailReachedException tailReachedException,
AbstractReadFileHandle currWalSegment
) {
FileDescriptor lastWALSegmentDesc = walFileDescriptors.get(walFileDescriptors.size() - 1);
// Iterator can not be empty.
assert lastWALSegmentDesc != null;
return lastWALSegmentDesc.idx() != currWalSegment.idx() ?
new IgniteCheckedException(
"WAL tail reached not in the last available segment, " +
"potentially corrupted segment, last available segment idx=" + lastWALSegmentDesc.idx() +
", path=" + lastWALSegmentDesc.file().getPath() +
", last read segment idx=" + currWalSegment.idx(), tailReachedException
) : null;
}
/** {@inheritDoc} */
@Override protected AbstractReadFileHandle advanceSegment(
@Nullable final AbstractReadFileHandle curWalSegment
) throws IgniteCheckedException {
if (curWalSegment != null)
curWalSegment.close();
FileDescriptor fd;
do {
curWalSegmIdx++;
curIdx++;
if (curIdx >= walFileDescriptors.size())
return null;
fd = walFileDescriptors.get(curIdx);
}
while (!checkBounds(fd.idx()));
if (log.isDebugEnabled())
log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file().getAbsolutePath() + ']');
assert fd != null;
curRec = null;
try {
WALPointer initPtr = null;
if (lowBound.index() == fd.idx())
initPtr = lowBound;
return initReadHandle(fd, initPtr);
}
catch (FileNotFoundException e) {
if (log.isInfoEnabled())
log.info("Missing WAL segment in the archive: " + e.getMessage());
return null;
}
}
/** {@inheritDoc} */
@Override protected IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
@Nullable AbstractReadFileHandle hnd
) throws IgniteCheckedException {
IgniteBiTuple<WALPointer, WALRecord> tup = super.advanceRecord(hnd);
if (tup == null)
return tup;
if (!checkBounds(tup.get1())) {
if (curRec != null) {
WALPointer prevRecPtr = curRec.get1();
// Fast stop condition, after high bound reached.
if (prevRecPtr != null && prevRecPtr.compareTo(highBound) > 0)
return null;
}
return new T2<>(tup.get1(), FilteredRecord.INSTANCE); // FilteredRecord for mark as filtered.
}
return tup;
}
/**
* @param ptr WAL pointer.
* @return {@code True} If pointer between low and high bounds. {@code False} if not.
*/
private boolean checkBounds(WALPointer ptr) {
return ptr.compareTo(lowBound) >= 0 && ptr.compareTo(highBound) <= 0;
}
/**
* @param idx WAL segment index.
* @return {@code True} If pointer between low and high bounds. {@code False} if not.
*/
private boolean checkBounds(long idx) {
return idx >= lowBound.index() && idx <= highBound.index();
}
/** {@inheritDoc} */
@Override protected AbstractReadFileHandle initReadHandle(
@NotNull AbstractFileDescriptor desc,
@Nullable WALPointer start
) throws IgniteCheckedException, FileNotFoundException {
AbstractFileDescriptor fd = desc;
SegmentIO fileIO = null;
SegmentHeader segmentHeader;
while (true) {
try {
fileIO = fd.toReadOnlyIO(ioFactory);
segmentHeader = readSegmentHeader(fileIO, FILE_INPUT_FACTORY);
break;
}
catch (IOException | IgniteCheckedException e) {
log.error("Failed to init segment curWalSegmIdx=" + curWalSegmIdx + ", curIdx=" + curIdx, e);
U.closeQuiet(fileIO);
curIdx++;
if (curIdx >= walFileDescriptors.size())
return null;
fd = walFileDescriptors.get(curIdx);
}
}
return initReadHandle(fd, start, fileIO, segmentHeader);
}
/** {@inheritDoc} */
@Override protected @NotNull WALRecord postProcessRecord(@NotNull final WALRecord rec) {
GridKernalContext kernalCtx = sharedCtx.kernalContext();
IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects();
if (processor != null && (rec.type() == RecordType.DATA_RECORD
|| rec.type() == RecordType.DATA_RECORD_V2
|| rec.type() == RecordType.MVCC_DATA_RECORD)) {
try {
return postProcessDataRecord((DataRecord)rec, kernalCtx, processor);
}
catch (Exception e) {
log.error("Failed to perform post processing for data record ", e);
}
}
return super.postProcessRecord(rec);
}
/** {@inheritDoc} */
@Override protected IgniteCheckedException handleRecordException(
@NotNull Exception e,
@Nullable WALPointer ptr
) {
if (e instanceof IgniteCheckedException)
if (X.hasCause(e, IgniteDataIntegrityViolationException.class))
// "curIdx" is an index in walFileDescriptors list.
if (curIdx == walFileDescriptors.size() - 1)
// This means that there is no explicit last sengment, so we stop as if we reached the end
// of the WAL.
if (highBound.equals(DFLT_HIGH_BOUND))
return null;
return super.handleRecordException(e, ptr);
}
/**
* Performs post processing of lazy data record, converts it to unwrap record.
*
* @param dataRec data record to post process records.
* @param kernalCtx kernal context.
* @param processor processor to convert binary form from WAL into CacheObject/BinaryObject.
* @return post-processed record.
* @throws IgniteCheckedException if failed.
*/
@NotNull private WALRecord postProcessDataRecord(
@NotNull DataRecord dataRec,
GridKernalContext kernalCtx,
IgniteCacheObjectProcessor processor
) throws IgniteCheckedException {
final CacheObjectContext fakeCacheObjCtx = new CacheObjectContext(
kernalCtx, null, null, false, false, false, false, false);
final List<DataEntry> entries = dataRec.writeEntries();
final List<DataEntry> postProcessedEntries = new ArrayList<>(entries.size());
for (DataEntry dataEntry : entries) {
final DataEntry postProcessedEntry = postProcessDataEntry(processor, fakeCacheObjCtx, dataEntry);
postProcessedEntries.add(postProcessedEntry);
}
DataRecord res = dataRec instanceof MvccDataRecord ?
new MvccDataRecord(postProcessedEntries, dataRec.timestamp()) :
new DataRecord(postProcessedEntries, dataRec.timestamp());
res.size(dataRec.size());
res.position(dataRec.position());
return res;
}
/**
* Converts entry or lazy data entry into unwrapped entry
*
* @param processor cache object processor for de-serializing objects.
* @param fakeCacheObjCtx cache object context for de-serializing binary and unwrapping objects.
* @param dataEntry entry to process
* @return post precessed entry
* @throws IgniteCheckedException if failed
*/
private @NotNull DataEntry postProcessDataEntry(
final IgniteCacheObjectProcessor processor,
final CacheObjectContext fakeCacheObjCtx,
final DataEntry dataEntry) throws IgniteCheckedException {
if (dataEntry instanceof EncryptedDataEntry)
return dataEntry;
final KeyCacheObject key;
final CacheObject val;
boolean keepBinary = this.keepBinary || !fakeCacheObjCtx.kernalContext().marshallerContext().initialized();
if (dataEntry instanceof MarshalledDataEntry) {
final MarshalledDataEntry lazyDataEntry = (MarshalledDataEntry)dataEntry;
key = processor.toKeyCacheObject(fakeCacheObjCtx,
lazyDataEntry.getKeyType(),
lazyDataEntry.getKeyBytes());
final byte type = lazyDataEntry.getValType();
val = type == 0 ? null :
processor.toCacheObject(fakeCacheObjCtx,
type,
lazyDataEntry.getValBytes());
}
else {
key = dataEntry.key();
val = dataEntry.value();
}
return unwrapDataEntry(fakeCacheObjCtx, dataEntry, key, val, keepBinary);
}
/**
* Unwrap data entry.
* @param coCtx CacheObject context.
* @param dataEntry Data entry.
* @param key Entry key.
* @param val Entry value.
* @param keepBinary Don't convert non primitive types.
* @return Unwrapped entry.
*/
private DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry,
KeyCacheObject key, CacheObject val, boolean keepBinary) {
if (dataEntry instanceof MvccDataEntry)
return new UnwrapMvccDataEntry(
dataEntry.cacheId(),
key,
val,
dataEntry.op(),
dataEntry.nearXidVersion(),
dataEntry.writeVersion(),
dataEntry.expireTime(),
dataEntry.partitionId(),
dataEntry.partitionCounter(),
((MvccDataEntry)dataEntry).mvccVer(),
coCtx,
keepBinary);
else
return new UnwrapDataEntry(
dataEntry.cacheId(),
key,
val,
dataEntry.op(),
dataEntry.nearXidVersion(),
dataEntry.writeVersion(),
dataEntry.expireTime(),
dataEntry.partitionId(),
dataEntry.partitionCounter(),
coCtx,
keepBinary,
dataEntry.flags());
}
/** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
super.onClose();
curRec = null;
closeCurrentWalSegment();
curWalSegmIdx = Integer.MAX_VALUE;
for (GridComponent comp : sharedCtx.kernalContext())
comp.stop(true);
}
/** {@inheritDoc} */
@Override protected AbstractReadFileHandle createReadFileHandle(
SegmentIO fileIO, RecordSerializer ser, FileInput in
) {
return new ReadFileHandle(fileIO, ser, in, null);
}
}