blob: dd25b8cca40496a28de050f1470949f91d504a1f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord;
import org.apache.ignite.internal.pagemem.wal.record.LazyMvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
* Record data V2 serializer.
*/
public class RecordDataV2Serializer extends RecordDataV1Serializer implements RecordDataSerializer {
/** Length of HEADER record data. */
private static final int HEADER_RECORD_DATA_SIZE = /*Magic*/8 + /*Version*/4;
/** Serializer of {@link TxRecord} records. */
private final TxRecordSerializer txRecordSerializer;
/**
* Create an instance of V2 data serializer.
*
* @param cctx Cache shared context.
*/
public RecordDataV2Serializer(GridCacheSharedContext cctx) {
super(cctx);
this.txRecordSerializer = new TxRecordSerializer();
}
/** {@inheritDoc} */
@Override protected int plainSize(WALRecord rec) throws IgniteCheckedException {
switch (rec.type()) {
case HEADER_RECORD:
return HEADER_RECORD_DATA_SIZE;
case CHECKPOINT_RECORD:
CheckpointRecord cpRec = (CheckpointRecord)rec;
assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer :
"Invalid WAL record: " + cpRec;
int cacheStatesSize = cacheStatesSize(cpRec.cacheGroupStates());
FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark();
return 18 + cacheStatesSize + (walPtr == null ? 0 : 16);
case MVCC_DATA_RECORD:
return 4/*entry count*/ + 8/*timestamp*/ + dataSize((DataRecord)rec);
case DATA_RECORD:
return super.plainSize(rec) + 8/*timestamp*/;
case SNAPSHOT:
return 8 + 1;
case EXCHANGE:
return 4 /*type*/ + 8 /*timestamp*/ + 2 /*constId*/;
case TX_RECORD:
return txRecordSerializer.size((TxRecord)rec);
case MVCC_TX_RECORD:
return txRecordSerializer.size((MvccTxRecord)rec);
default:
return super.plainSize(rec);
}
}
/** {@inheritDoc} */
@Override WALRecord readPlainRecord(
RecordType type,
ByteBufferBackedDataInput in,
boolean encrypted
) throws IOException, IgniteCheckedException {
switch (type) {
case CHECKPOINT_RECORD:
long msb = in.readLong();
long lsb = in.readLong();
boolean hasPtr = in.readByte() != 0;
int idx0 = hasPtr ? in.readInt() : 0;
int off = hasPtr ? in.readInt() : 0;
int len = hasPtr ? in.readInt() : 0;
Map<Integer, CacheState> states = readPartitionStates(in);
boolean end = in.readByte() != 0;
FileWALPointer walPtr = hasPtr ? new FileWALPointer(idx0, off, len) : null;
CheckpointRecord cpRec = new CheckpointRecord(new UUID(msb, lsb), walPtr, end);
cpRec.cacheGroupStates(states);
return cpRec;
case DATA_RECORD:
int entryCnt = in.readInt();
long timeStamp = in.readLong();
List<DataEntry> entries = new ArrayList<>(entryCnt);
for (int i = 0; i < entryCnt; i++)
entries.add(readPlainDataEntry(in));
return new DataRecord(entries, timeStamp);
case MVCC_DATA_RECORD:
entryCnt = in.readInt();
timeStamp = in.readLong();
entries = new ArrayList<>(entryCnt);
for (int i = 0; i < entryCnt; i++)
entries.add(readMvccDataEntry(in));
return new MvccDataRecord(entries, timeStamp);
case ENCRYPTED_DATA_RECORD:
entryCnt = in.readInt();
timeStamp = in.readLong();
entries = new ArrayList<>(entryCnt);
for (int i = 0; i < entryCnt; i++)
entries.add(readEncryptedDataEntry(in));
return new DataRecord(entries, timeStamp);
case SNAPSHOT:
long snpId = in.readLong();
byte full = in.readByte();
return new SnapshotRecord(snpId, full == 1);
case EXCHANGE:
int idx = in.readInt();
short constId = in.readShort();
long ts = in.readLong();
return new ExchangeRecord(constId, ExchangeRecord.Type.values()[idx], ts);
case TX_RECORD:
return txRecordSerializer.readTx(in);
case MVCC_TX_RECORD:
return txRecordSerializer.readMvccTx(in);
default:
return super.readPlainRecord(type, in, encrypted);
}
}
/** {@inheritDoc} */
@Override protected void writePlainRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
if (rec instanceof HeaderRecord)
throw new UnsupportedOperationException("Writing header records is forbidden since version 2 of serializer");
switch (rec.type()) {
case CHECKPOINT_RECORD:
CheckpointRecord cpRec = (CheckpointRecord)rec;
assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer :
"Invalid WAL record: " + cpRec;
FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark();
UUID cpId = cpRec.checkpointId();
buf.putLong(cpId.getMostSignificantBits());
buf.putLong(cpId.getLeastSignificantBits());
buf.put(walPtr == null ? (byte)0 : 1);
if (walPtr != null) {
buf.putLong(walPtr.index());
buf.putInt(walPtr.fileOffset());
buf.putInt(walPtr.length());
}
putCacheStates(buf, cpRec.cacheGroupStates());
buf.put(cpRec.end() ? (byte)1 : 0);
break;
case MVCC_DATA_RECORD:
case DATA_RECORD:
DataRecord dataRec = (DataRecord)rec;
buf.putInt(dataRec.writeEntries().size());
buf.putLong(dataRec.timestamp());
boolean encrypted = isDataRecordEncrypted(dataRec);
for (DataEntry dataEntry : dataRec.writeEntries()) {
if (encrypted)
putEncryptedDataEntry(buf, dataEntry);
else
putPlainDataEntry(buf, dataEntry);
}
break;
case SNAPSHOT:
SnapshotRecord snpRec = (SnapshotRecord)rec;
buf.putLong(snpRec.getSnapshotId());
buf.put(snpRec.isFull() ? (byte)1 : 0);
break;
case EXCHANGE:
ExchangeRecord r = (ExchangeRecord)rec;
buf.putInt(r.getType().ordinal());
buf.putShort(r.getConstId());
buf.putLong(r.timestamp());
break;
case TX_RECORD:
txRecordSerializer.write((TxRecord)rec, buf);
break;
case MVCC_TX_RECORD:
txRecordSerializer.write((MvccTxRecord)rec, buf);
break;
default:
super.writePlainRecord(rec, buf);
}
}
/** {@inheritDoc} */
@Override void putPlainDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException {
if (entry instanceof MvccDataEntry)
putMvccDataEntry(buf, (MvccDataEntry)entry);
else
super.putPlainDataEntry(buf, entry);
}
/**
* @param buf Buffer to write to.
* @param entry Data entry.
*/
private void putMvccDataEntry(ByteBuffer buf, MvccDataEntry entry) throws IgniteCheckedException {
super.putPlainDataEntry(buf, entry);
txRecordSerializer.putMvccVersion(buf, entry.mvccVer());
}
/**
* @param in Input to read from.
* @return Read entry.
*/
private MvccDataEntry readMvccDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
int cacheId = in.readInt();
int keySize = in.readInt();
byte keyType = in.readByte();
byte[] keyBytes = new byte[keySize];
in.readFully(keyBytes);
int valSize = in.readInt();
byte valType = 0;
byte[] valBytes = null;
if (valSize >= 0) {
valType = in.readByte();
valBytes = new byte[valSize];
in.readFully(valBytes);
}
byte ord = in.readByte();
GridCacheOperation op = GridCacheOperation.fromOrdinal(ord & 0xFF);
GridCacheVersion nearXidVer = readVersion(in, true);
GridCacheVersion writeVer = readVersion(in, false);
int partId = in.readInt();
long partCntr = in.readLong();
long expireTime = in.readLong();
MvccVersion mvccVer = txRecordSerializer.readMvccVersion(in);
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
if (cacheCtx != null) {
CacheObjectContext coCtx = cacheCtx.cacheObjectContext();
KeyCacheObject key = co.toKeyCacheObject(coCtx, keyType, keyBytes);
if (key.partition() == -1)
key.partition(partId);
CacheObject val = valBytes != null ? co.toCacheObject(coCtx, valType, valBytes) : null;
return new MvccDataEntry(
cacheId,
key,
val,
op,
nearXidVer,
writeVer,
expireTime,
partId,
partCntr,
mvccVer
);
}
else
return new LazyMvccDataEntry(
cctx,
cacheId,
keyType,
keyBytes,
valType,
valBytes,
op,
nearXidVer,
writeVer,
expireTime,
partId,
partCntr,
mvccVer);
}
/** {@inheritDoc} */
@Override protected int entrySize(DataEntry entry) throws IgniteCheckedException {
return super.entrySize(entry) +
/*mvcc version*/ ((entry instanceof MvccDataEntry) ? (8 + 8 + 4) : 0);
}
/**
* @param buf Buffer to read from.
* @return Read map.
*/
private Map<Integer, CacheState> readPartitionStates(DataInput buf) throws IOException {
int caches = buf.readShort() & 0xFFFF;
if (caches == 0)
return Collections.emptyMap();
Map<Integer, CacheState> states = new HashMap<>(caches, 1.0f);
for (int i = 0; i < caches; i++) {
int cacheId = buf.readInt();
int parts = buf.readShort() & 0xFFFF;
CacheState state = new CacheState(parts);
for (int p = 0; p < parts; p++) {
int partId = buf.readShort() & 0xFFFF;
long size = buf.readLong();
long partCntr = buf.readLong();
byte partState = buf.readByte();
state.addPartitionState(partId, size, partCntr, partState);
}
states.put(cacheId, state);
}
return states;
}
/**
* @param states Cache states.
*/
private static void putCacheStates(ByteBuffer buf, Map<Integer, CacheState> states) {
buf.putShort((short)states.size());
for (Map.Entry<Integer, CacheState> entry : states.entrySet()) {
buf.putInt(entry.getKey());
CacheState state = entry.getValue();
// Need 2 bytes for the number of partitions.
buf.putShort((short)state.size());
for (int i = 0; i < state.size(); i++) {
short partIdx = (short)state.partitionByIndex(i);
buf.putShort(partIdx);
buf.putLong(state.partitionSizeByIndex(i));
buf.putLong(state.partitionCounterByIndex(i));
buf.put(state.stateByIndex(i));
}
}
}
/**
* @param states Partition states.
* @return Size required to write partition states.
*/
private int cacheStatesSize(Map<Integer, CacheState> states) {
// Need 4 bytes for the number of caches.
int size = 2;
for (Map.Entry<Integer, CacheState> entry : states.entrySet()) {
// Cache ID.
size += 4;
// Need 2 bytes for the number of partitions.
size += 2;
CacheState state = entry.getValue();
// 2 bytes partition ID, size and counter per partition, part state.
size += 19 * state.size();
}
return size;
}
}