blob: c65f37cc8d648fb4c7e94d97da48cdda16a837e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
import java.io.DataInput;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleFileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
/**
* Record V1 serializer.
* Stores records in following format:
* <ul>
* <li>Record type from {@link RecordType#ordinal()} incremented by 1</li>
* <li>WAL pointer to double check consistency</li>
* <li>Data</li>
* <li>CRC or zero padding</li>
* </ul>
*/
public class RecordV1Serializer implements RecordSerializer {
/** Length of Type */
public static final int REC_TYPE_SIZE = 1;
/** Length of WAL Pointer: Index (8) + File offset (4). */
public static final int FILE_WAL_POINTER_SIZE = 8 + 4;
/** Length of CRC value */
public static final int CRC_SIZE = 4;
/** Total length of HEADER record. */
public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE + RecordDataV1Serializer.HEADER_RECORD_DATA_SIZE;
/** Skip CRC calculation/check flag */
public static boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
/** V1 data serializer. */
private final RecordDataV1Serializer dataSerializer;
/** Write pointer. */
private final boolean writePointer;
/**
* Record type filter.
* {@link FilteredRecord} is deserialized instead of original record if type doesn't match filter.
*/
private final IgniteBiPredicate<RecordType, WALPointer> recordFilter;
/** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */
private final boolean skipPositionCheck;
/**
* Marshalled mode.
* Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead.
*/
private final boolean marshalledMode;
/** Thread-local heap byte buffer. */
private final ThreadLocal<ByteBuffer> heapTlb = new ThreadLocal<ByteBuffer>() {
@Override protected ByteBuffer initialValue() {
ByteBuffer buf = ByteBuffer.allocate(4096);
buf.order(GridUnsafe.NATIVE_BYTE_ORDER);
return buf;
}
};
/** Record read/write functional interface. */
private final RecordIO recordIO = new RecordIO() {
/** {@inheritDoc} */
@Override public int sizeWithHeaders(WALRecord record) throws IgniteCheckedException {
int recordSize = dataSerializer.size(record);
int recordSizeWithType = recordSize + REC_TYPE_SIZE;
// Why this condition here, see SWITCH_SEGMENT_RECORD doc.
return record.type() != SWITCH_SEGMENT_RECORD ?
recordSizeWithType + FILE_WAL_POINTER_SIZE + CRC_SIZE : recordSizeWithType;
}
/** {@inheritDoc} */
@Override public WALRecord readWithHeaders(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
RecordType recType = readRecordType(in);
if (recType == RecordType.SWITCH_SEGMENT_RECORD)
throw new SegmentEofException("Reached end of segment", null);
FileWALPointer ptr = readPosition(in);
if (!skipPositionCheck && !F.eq(ptr, expPtr))
throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr +
", readPtr=" + ptr + ']', null);
if (recType == null)
throw new IOException("Unknown record type: " + recType);
final WALRecord rec = dataSerializer.readRecord(recType, in);
rec.position(ptr);
if (recordFilter != null && !recordFilter.apply(rec.type(), ptr))
return FilteredRecord.INSTANCE;
else if (marshalledMode) {
ByteBuffer buf = heapTlb.get();
int recordSize = size(rec);
if (buf.capacity() < recordSize)
heapTlb.set(buf = ByteBuffer.allocate(recordSize * 3 / 2).order(ByteOrder.nativeOrder()));
else
buf.clear();
writeRecord(rec, buf);
buf.flip();
assert buf.remaining() == recordSize;
return new MarshalledRecord(rec.type(), rec.position(), buf);
}
else
return rec;
}
/** {@inheritDoc} */
@Override public void writeWithHeaders(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
// Write record type.
putRecordType(buf, dataSerializer.recordType(rec));
// SWITCH_SEGMENT_RECORD should have only type, no need to write pointer.
if (rec.type() == SWITCH_SEGMENT_RECORD)
return;
// Write record file position.
putPositionOfRecord(buf, rec);
// Write record data.
dataSerializer.writeRecord(rec, buf);
}
};
/**
* Create an instance of V1 serializer.
* @param dataSerializer V1 data serializer.
* @param writePointer Write pointer.
* @param marshalledMode Marshalled mode.
* @param skipPositionCheck Skip position check mode.
* @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead of original record
*/
public RecordV1Serializer(
RecordDataV1Serializer dataSerializer,
boolean writePointer,
boolean marshalledMode,
boolean skipPositionCheck,
IgniteBiPredicate<RecordType, WALPointer> recordFilter
) {
this.dataSerializer = dataSerializer;
this.writePointer = writePointer;
this.recordFilter = recordFilter;
this.skipPositionCheck = skipPositionCheck;
this.marshalledMode = marshalledMode;
}
/** {@inheritDoc} */
@Override public int version() {
return 1;
}
/** {@inheritDoc} */
@Override public boolean writePointer() {
return writePointer;
}
/** {@inheritDoc} */
@SuppressWarnings("CastConflictsWithInstanceof")
@Override public void writeRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
writeWithCrc(rec, buf, recordIO);
}
/** {@inheritDoc} */
@Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws IOException, IgniteCheckedException {
return readWithCrc(in0, expPtr, recordIO);
}
/** {@inheritDoc} */
@SuppressWarnings("CastConflictsWithInstanceof")
@Override public int size(WALRecord record) throws IgniteCheckedException {
return recordIO.sizeWithHeaders(record);
}
/**
* Saves position, WAL pointer (requires {@link #FILE_WAL_POINTER_SIZE} bytes)
* @param buf Byte buffer to serialize version to.
* @param ptr File WAL pointer to write.
*/
public static void putPosition(ByteBuffer buf, FileWALPointer ptr) {
buf.putLong(ptr.index());
buf.putInt(ptr.fileOffset());
}
/**
* Reads stored record from provided {@code io}.
* NOTE: Method mutates position of {@code io}.
*
* @param io I/O interface for file.
* @param segmentFileInputFactory File input factory.
* @return Instance of {@link SegmentHeader} extracted from the file.
* @throws IgniteCheckedException If failed to read serializer version.
*/
public static SegmentHeader readSegmentHeader(SegmentIO io, SegmentFileInputFactory segmentFileInputFactory)
throws IgniteCheckedException, IOException {
try (ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) {
ByteBufferBackedDataInput in = segmentFileInputFactory.createFileInput(io, buf);
in.ensure(HEADER_RECORD_SIZE);
int recordType = in.readUnsignedByte();
if (recordType == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
throw new SegmentEofException("Reached logical end of the segment", null);
WALRecord.RecordType type = WALRecord.RecordType.fromOrdinal(recordType - 1);
if (type != WALRecord.RecordType.HEADER_RECORD)
throw new IOException("Can't read serializer version", null);
// Read file pointer.
FileWALPointer ptr = readPosition(in);
if (io.getSegmentId() != ptr.index())
throw new SegmentEofException("Reached logical end of the segment by pointer", null);
assert ptr.fileOffset() == 0 : "Header record should be placed at the beginning of file " + ptr;
long hdrMagicNum = in.readLong();
boolean compacted;
if (hdrMagicNum == HeaderRecord.REGULAR_MAGIC)
compacted = false;
else if (hdrMagicNum == HeaderRecord.COMPACTED_MAGIC)
compacted = true;
else {
throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.REGULAR_MAGIC) +
", actual=" + U.hexLong(hdrMagicNum) + ']');
}
// Read serializer version.
int ver = in.readInt();
// Read and skip CRC.
in.readInt();
return new SegmentHeader(ver, compacted);
}
}
/**
* @param in Data input to read pointer from.
* @return Read file WAL pointer.
* @throws IOException If failed to write.
*/
public static FileWALPointer readPosition(DataInput in) throws IOException {
long idx = in.readLong();
int fileOff = in.readInt();
return new FileWALPointer(idx, fileOff, 0);
}
/**
* Writes record file position to given {@code buf}.
*
* @param buf Buffer to write record file position.
* @param rec WAL record.
*/
private static void putPositionOfRecord(ByteBuffer buf, WALRecord rec) {
putPosition(buf, (FileWALPointer)rec.position());
}
/**
* Writes record type to given {@code buf}.
*
* @param buf Buffer to write record type.
* @param type WAL record type.
*/
static void putRecordType(ByteBuffer buf, RecordType type) {
buf.put((byte)(type.ordinal() + 1));
}
/**
* Reads record type from given {@code in}.
*
* @param in Buffer to read record type.
* @return Record type.
* @throws IgniteCheckedException If logical end of segment is reached.
* @throws IOException In case of I/O problems.
*/
static RecordType readRecordType(DataInput in) throws IgniteCheckedException, IOException {
int type = in.readUnsignedByte();
if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
throw new SegmentEofException("Reached logical end of the segment", null);
return RecordType.fromOrdinal(type - 1);
}
/**
* Reads record from file {@code in0} and validates CRC of record.
*
* @param in0 File input.
* @param expPtr Expected WAL pointer for record. Used to validate actual position against expected from the file.
* @param reader Record reader I/O interface.
* @return WAL record.
* @throws EOFException In case of end of file.
* @throws IgniteCheckedException If it's unable to read record.
*/
static WALRecord readWithCrc(
FileInput in0,
WALPointer expPtr,
RecordIO reader
) throws EOFException, IgniteCheckedException {
long startPos = -1;
try (SimpleFileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) {
startPos = in0.position();
WALRecord res = reader.readWithHeaders(in, expPtr);
assert res != null;
res.size((int)(in0.position() - startPos + CRC_SIZE)); // Account for CRC which will be read afterwards.
return res;
}
catch (EOFException | SegmentEofException | WalSegmentTailReachedException e) {
throw e;
}
catch (Exception e) {
long size = -1;
try {
size = in0.io().size();
}
catch (IOException ignore) {
// No-op. It just for information. Fail calculate file size.
}
throw new IgniteCheckedException("Failed to read WAL record at position: " + startPos + " size: " + size, e);
}
}
/**
* Writes record with calculated CRC to buffer {@code buf}.
*
* @param rec WAL record.
* @param buf Buffer to write.
* @param writer Record write I/O interface.
* @throws IgniteCheckedException If it's unable to write record.
*/
static void writeWithCrc(WALRecord rec, ByteBuffer buf, RecordIO writer) throws IgniteCheckedException {
assert rec.size() >= 0 && buf.remaining() >= rec.size() : rec.size();
boolean switchSegmentRec = rec.type() == RecordType.SWITCH_SEGMENT_RECORD;
int startPos = buf.position();
writer.writeWithHeaders(rec, buf);
// No need calculate and write CRC for SWITCH_SEGMENT_RECORD.
if (switchSegmentRec)
return;
if (!skipCrc) {
int curPos = buf.position();
buf.position(startPos);
// This call will move buffer position to the end of the record again.
int crcVal = PureJavaCrc32.calcCrc32(buf, curPos - startPos);
buf.putInt(crcVal);
}
else
buf.putInt(0);
}
/**
* @param buf Buffer.
* @param ver Version to write.
* @param allowNull Is {@code null}version allowed.
*/
static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) {
CacheVersionIO.write(buf, ver, allowNull);
}
/**
* Changes the buffer position by the number of read bytes.
*
* @param in Data input to read from.
* @param allowNull Is {@code null}version allowed.
* @return Read cache version.
*/
static GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException {
// To be able to read serialization protocol version.
in.ensure(1);
try {
int size = CacheVersionIO.readSize(in.buffer(), allowNull);
in.ensure(size);
return CacheVersionIO.read(in.buffer(), allowNull);
}
catch (IgniteCheckedException e) {
throw new IOException(e);
}
}
}