blob: a5aa0d838935a83466d5b08ff6777381c8cb7c36 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
/**
* Transaction prepare request for optimistic and eventually consistent
* transactions.
*/
public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage implements IgniteTxStateAware {
/** */
private static final long serialVersionUID = 0L;
/** */
private static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01;
/** */
private static final int INVALIDATE_FLAG_MASK = 0x02;
/** */
private static final int ONE_PHASE_COMMIT_FLAG_MASK = 0x04;
/** */
private static final int LAST_REQ_FLAG_MASK = 0x08;
/** */
private static final int SYSTEM_TX_FLAG_MASK = 0x10;
/** */
public static final int STORE_WRITE_THROUGH_FLAG_MASK = 0x20;
/** */
public static final int QUERY_UPDATE_FLAG_MASK = 0x40;
/** Collection to message converter. */
private static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() {
@Override public UUIDCollectionMessage apply(Collection<UUID> uuids) {
return new UUIDCollectionMessage(uuids);
}
};
/** Message to collection converter. */
private static final C1<UUIDCollectionMessage, Collection<UUID>> MSG_TO_COL = new C1<UUIDCollectionMessage, Collection<UUID>>() {
@Override public Collection<UUID> apply(UUIDCollectionMessage msg) {
return msg.uuids();
}
};
/** Thread ID. */
@GridToStringInclude
private long threadId;
/** Transaction concurrency. */
@GridToStringInclude
private TransactionConcurrency concurrency;
/** Transaction isolation. */
@GridToStringInclude
private TransactionIsolation isolation;
/** Commit version for EC transactions. */
@GridToStringInclude
private GridCacheVersion writeVer;
/** Transaction timeout. */
@GridToStringInclude
private long timeout;
/** Transaction read set. */
@GridToStringInclude
@GridDirectCollection(IgniteTxEntry.class)
private Collection<IgniteTxEntry> reads;
/** Transaction write entries. */
@GridToStringInclude
@GridDirectCollection(IgniteTxEntry.class)
private Collection<IgniteTxEntry> writes;
/** DHT versions to verify. */
@GridToStringInclude
@GridDirectTransient
private Map<IgniteTxKey, GridCacheVersion> dhtVers;
/** */
@GridDirectCollection(IgniteTxKey.class)
private Collection<IgniteTxKey> dhtVerKeys;
/** */
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> dhtVerVals;
/** Expected transaction size. */
private int txSize;
/** Transaction nodes mapping (primary node -> related backup nodes). */
@GridDirectTransient
private Map<UUID, Collection<UUID>> txNodes;
/** Tx nodes direct marshallable message. */
@GridDirectMap(keyType = UUID.class, valueType = UUIDCollectionMessage.class)
private Map<UUID, UUIDCollectionMessage> txNodesMsg;
/** IO policy. */
private byte plc;
/** Transient TX state. */
@GridDirectTransient
private IgniteTxState txState;
/** */
@GridToStringExclude
private byte flags;
/**
* Required by {@link Externalizable}.
*/
public GridDistributedTxPrepareRequest() {
/* No-op. */
}
/**
* @param tx Cache transaction.
* @param timeout Transactions timeout.
* @param reads Read entries.
* @param writes Write entries.
* @param txNodes Transaction nodes mapping.
* @param retVal Return value flag.
* @param last Last request flag.
* @param onePhaseCommit One phase commit flag.
* @param addDepInfo Deployment info flag.
*/
public GridDistributedTxPrepareRequest(
IgniteInternalTx tx,
long timeout,
@Nullable Collection<IgniteTxEntry> reads,
Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes,
boolean retVal,
boolean last,
boolean onePhaseCommit,
boolean addDepInfo
) {
super(tx.xidVersion(), 0, addDepInfo);
writeVer = tx.writeVersion();
threadId = tx.threadId();
concurrency = tx.concurrency();
isolation = tx.isolation();
txSize = tx.size();
plc = tx.ioPolicy();
this.timeout = timeout;
this.reads = reads;
this.writes = writes;
this.txNodes = txNodes;
setFlag(tx.system(), SYSTEM_TX_FLAG_MASK);
setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
setFlag(tx.isInvalidate(), INVALIDATE_FLAG_MASK);
setFlag(onePhaseCommit, ONE_PHASE_COMMIT_FLAG_MASK);
setFlag(last, LAST_REQ_FLAG_MASK);
}
/**
* @return Flag indicating whether transaction needs return value.
*/
public final boolean needReturnValue() {
return isFlag(NEED_RETURN_VALUE_FLAG_MASK);
}
/**
* @param retVal Need return value.
*/
public final void needReturnValue(boolean retVal) {
setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
}
/**
* @return Transaction nodes mapping.
*/
public Map<UUID, Collection<UUID>> transactionNodes() {
return txNodes;
}
/**
* @return System flag.
*/
public final boolean system() {
return isFlag(SYSTEM_TX_FLAG_MASK);
}
/**
* @return Flag indicating whether transaction use cache store.
*/
public boolean storeWriteThrough() {
return (flags & STORE_WRITE_THROUGH_FLAG_MASK) != 0;
}
/**
* @param storeWriteThrough Store write through value.
*/
public void storeWriteThrough(boolean storeWriteThrough) {
if (storeWriteThrough)
flags = (byte)(flags | STORE_WRITE_THROUGH_FLAG_MASK);
else
flags &= ~STORE_WRITE_THROUGH_FLAG_MASK;
}
/**
*
* @return Flag indicating whether it is a query update.
*/
public boolean queryUpdate() {
return (flags & QUERY_UPDATE_FLAG_MASK) != 0;
}
/**
*
* @param queryUpdate Query update value.
*/
public void queryUpdate(boolean queryUpdate) {
if (queryUpdate)
flags = (byte)(flags | QUERY_UPDATE_FLAG_MASK);
else
flags &= ~QUERY_UPDATE_FLAG_MASK;
}
/**
* @return IO policy.
*/
public byte policy() {
return plc;
}
/**
* Adds version to be verified on remote node.
*
* @param key Key for which version is verified.
* @param dhtVer DHT version to check.
*/
public void addDhtVersion(IgniteTxKey key, @Nullable GridCacheVersion dhtVer) {
if (dhtVers == null)
dhtVers = new HashMap<>();
dhtVers.put(key, dhtVer);
}
/**
* @return Map of versions to be verified.
*/
public Map<IgniteTxKey, GridCacheVersion> dhtVersions() {
return dhtVers == null ? Collections.<IgniteTxKey, GridCacheVersion>emptyMap() : dhtVers;
}
/**
* @return Thread ID.
*/
public long threadId() {
return threadId;
}
/**
* @return Commit version.
*/
public GridCacheVersion writeVersion() {
return writeVer;
}
/**
* @return Invalidate flag.
*/
public boolean isInvalidate() {
return isFlag(INVALIDATE_FLAG_MASK);
}
/**
* @return Transaction timeout.
*/
public long timeout() {
return timeout;
}
/**
* @return Concurrency.
*/
public TransactionConcurrency concurrency() {
return concurrency;
}
/**
* @return Isolation level.
*/
public TransactionIsolation isolation() {
return isolation;
}
/**
* @return Read set.
*/
public Collection<IgniteTxEntry> reads() {
return reads;
}
/**
* @return Write entries.
*/
public Collection<IgniteTxEntry> writes() {
return writes;
}
/**
* @param reads Reads.
*/
protected void reads(Collection<IgniteTxEntry> reads) {
this.reads = reads;
}
/**
* @param writes Writes.
*/
protected void writes(Collection<IgniteTxEntry> writes) {
this.writes = writes;
}
/**
* @return Expected transaction size.
*/
public int txSize() {
return txSize;
}
/**
* @return One phase commit flag.
*/
public boolean onePhaseCommit() {
return isFlag(ONE_PHASE_COMMIT_FLAG_MASK);
}
/**
* @return {@code True} if this is last prepare request for node.
*/
public boolean last() {
return isFlag(LAST_REQ_FLAG_MASK);
}
/** {@inheritDoc} */
@Override public IgniteTxState txState() {
return txState;
}
/** {@inheritDoc} */
@Override public void txState(IgniteTxState txState) {
this.txState = txState;
}
/** {@inheritDoc}
* @param ctx*/
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
if (writes != null)
marshalTx(writes, ctx);
if (reads != null)
marshalTx(reads, ctx);
if (dhtVers != null && dhtVerKeys == null) {
for (IgniteTxKey key : dhtVers.keySet()) {
GridCacheContext cctx = ctx.cacheContext(key.cacheId());
key.prepareMarshal(cctx);
}
dhtVerKeys = dhtVers.keySet();
dhtVerVals = dhtVers.values();
}
if (txNodesMsg == null)
txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG);
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
if (writes != null)
unmarshalTx(writes, false, ctx, ldr);
if (reads != null)
unmarshalTx(reads, false, ctx, ldr);
if (dhtVerKeys != null && dhtVers == null) {
assert dhtVerVals != null;
assert dhtVerKeys.size() == dhtVerVals.size();
Iterator<IgniteTxKey> keyIt = dhtVerKeys.iterator();
Iterator<GridCacheVersion> verIt = dhtVerVals.iterator();
dhtVers = U.newHashMap(dhtVerKeys.size());
while (keyIt.hasNext()) {
IgniteTxKey key = keyIt.next();
key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr);
dhtVers.put(key, verIt.next());
}
}
if (txNodesMsg != null)
txNodes = F.viewReadOnly(txNodesMsg, MSG_TO_COL);
}
/** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return addDepInfo || forceAddDepInfo;
}
/** {@inheritDoc} */
@Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
return ctx.txPrepareMessageLogger();
}
/**
* Sets flag mask.
*
* @param flag Set or clear.
* @param mask Mask.
*/
private void setFlag(boolean flag, int mask) {
flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
}
/**
* Reags flag mask.
*
* @param mask Mask to read.
* @return Flag value.
*/
private boolean isFlag(int mask) {
return (flags & mask) != 0;
}
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
if (!super.writeTo(buf, writer))
return false;
if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType(), fieldsCount()))
return false;
writer.onHeaderWritten();
}
switch (writer.state()) {
case 7:
if (!writer.writeByte("concurrency", concurrency != null ? (byte)concurrency.ordinal() : -1))
return false;
writer.incrementState();
case 8:
if (!writer.writeCollection("dhtVerKeys", dhtVerKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 9:
if (!writer.writeCollection("dhtVerVals", dhtVerVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 10:
if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 11:
if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
return false;
writer.incrementState();
case 12:
if (!writer.writeByte("plc", plc))
return false;
writer.incrementState();
case 13:
if (!writer.writeCollection("reads", reads, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 14:
if (!writer.writeLong("threadId", threadId))
return false;
writer.incrementState();
case 15:
if (!writer.writeLong("timeout", timeout))
return false;
writer.incrementState();
case 16:
if (!writer.writeMap("txNodesMsg", txNodesMsg, MessageCollectionItemType.UUID, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 17:
if (!writer.writeInt("txSize", txSize))
return false;
writer.incrementState();
case 18:
if (!writer.writeMessage("writeVer", writeVer))
return false;
writer.incrementState();
case 19:
if (!writer.writeCollection("writes", writes, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
}
return true;
}
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);
if (!reader.beforeMessageRead())
return false;
if (!super.readFrom(buf, reader))
return false;
switch (reader.state()) {
case 7:
byte concurrencyOrd;
concurrencyOrd = reader.readByte("concurrency");
if (!reader.isLastRead())
return false;
concurrency = TransactionConcurrency.fromOrdinal(concurrencyOrd);
reader.incrementState();
case 8:
dhtVerKeys = reader.readCollection("dhtVerKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
reader.incrementState();
case 9:
dhtVerVals = reader.readCollection("dhtVerVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
reader.incrementState();
case 10:
flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 11:
byte isolationOrd;
isolationOrd = reader.readByte("isolation");
if (!reader.isLastRead())
return false;
isolation = TransactionIsolation.fromOrdinal(isolationOrd);
reader.incrementState();
case 12:
plc = reader.readByte("plc");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 13:
reads = reader.readCollection("reads", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
reader.incrementState();
case 14:
threadId = reader.readLong("threadId");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 15:
timeout = reader.readLong("timeout");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 16:
txNodesMsg = reader.readMap("txNodesMsg", MessageCollectionItemType.UUID, MessageCollectionItemType.MSG, false);
if (!reader.isLastRead())
return false;
reader.incrementState();
case 17:
txSize = reader.readInt("txSize");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 18:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 19:
writes = reader.readCollection("writes", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
reader.incrementState();
}
return reader.afterMessageRead(GridDistributedTxPrepareRequest.class);
}
/** {@inheritDoc} */
@Override public short directType() {
return 25;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 20;
}
/** {@inheritDoc} */
@Override public String toString() {
StringBuilder flags = new StringBuilder();
if (needReturnValue())
appendFlag(flags, "retVal");
if (isInvalidate())
appendFlag(flags, "invalidate");
if (onePhaseCommit())
appendFlag(flags, "onePhase");
if (last())
appendFlag(flags, "last");
if (system())
appendFlag(flags, "sys");
return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this,
"flags", flags.toString(),
"super", super.toString());
}
}