blob: a9685a1eb8d42c3e83143a2100c3e0e0b08ca812 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
* Parent of all cache messages.
*/
public abstract class GridCacheMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
/** Maximum number of cache lookup indexes. */
public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 7;
/** Cache message index field name. */
public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX";
/** Message index id. */
private static final AtomicInteger msgIdx = new AtomicInteger();
/** Null message ID. */
private static final long NULL_MSG_ID = -1;
/** ID of this message. */
private long msgId = NULL_MSG_ID;
/** */
@GridToStringInclude
private GridDeploymentInfoBean depInfo;
/** */
@GridToStringInclude
private @Nullable AffinityTopologyVersion lastAffChangedTopVer;
/** */
@GridDirectTransient
protected boolean addDepInfo;
/** Force addition of deployment info regardless of {@code addDepInfo} flag value.*/
@GridDirectTransient
protected boolean forceAddDepInfo;
/** */
@GridDirectTransient
private IgniteCheckedException err;
/** */
@GridDirectTransient
private boolean skipPrepare;
/**
* @return ID to distinguish message handlers for the same messages but for different caches/cache groups.
*/
public abstract int handlerId();
/**
* @return {@code True} if cache group message.
*/
public abstract boolean cacheGroupMessage();
/**
* @return Error, if any.
*/
@Nullable public Throwable error() {
return null;
}
/**
* Gets next ID for indexed message ID.
*
* @return Message ID.
*/
public static int nextIndexId() {
return msgIdx.getAndIncrement();
}
/**
* @return {@code True} if this message is partition exchange message.
*/
public boolean partitionExchangeMessage() {
return false;
}
/**
* @return {@code True} if class loading errors should be ignored, false otherwise.
*/
public boolean ignoreClassErrors() {
return false;
}
/**
* Gets message lookup index. All messages that does not return -1 in this method must return a unique
* number in range from 0 to {@link #MAX_CACHE_MSG_LOOKUP_INDEX}.
*
* @return Message lookup index.
*/
public int lookupIndex() {
return -1;
}
/**
* @return Partition ID this message is targeted to or {@code -1} if it cannot be determined.
*/
public int partition() {
return -1;
}
/**
* If class loading error occurred during unmarshalling and {@link #ignoreClassErrors()} is
* set to {@code true}, then the error will be passed into this method.
*
* @param err Error.
*/
public void onClassError(IgniteCheckedException err) {
this.err = err;
}
/**
* @return Error set via {@link #onClassError(IgniteCheckedException)} method.
*/
public IgniteCheckedException classError() {
return err;
}
/**
* @return Message ID.
*/
public long messageId() {
return msgId;
}
/**
* Sets message ID. This method is package protected and is only called
* by {@link GridCacheIoManager}.
*
* @param msgId New message ID.
*/
void messageId(long msgId) {
this.msgId = msgId;
}
/**
* Gets topology version or -1 in case of topology version is not required for this message.
*
* @return Topology version.
*/
public AffinityTopologyVersion topologyVersion() {
return AffinityTopologyVersion.NONE;
}
/**
* Returns the earliest affinity topology version for which this message is valid.
*
* @return Last affinity topology version when affinity was modified.
*/
public AffinityTopologyVersion lastAffinityChangedTopologyVersion() {
if (lastAffChangedTopVer == null || lastAffChangedTopVer.topologyVersion() <= 0)
return topologyVersion();
return lastAffChangedTopVer;
}
/**
* Sets the earliest affinity topology version for which this message is valid.
*
* @param topVer Last affinity topology version when affinity was modified.
*/
public void lastAffinityChangedTopologyVersion(AffinityTopologyVersion topVer) {
lastAffChangedTopVer = topVer;
}
/**
* Deployment enabled flag indicates whether deployment info has to be added to this message.
*
* @return {@code true} or if deployment info must be added to the the message, {@code false} otherwise.
*/
public abstract boolean addDeploymentInfo();
/**
* @param o Object to prepare for marshalling.
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
protected final void prepareObject(@Nullable Object o, GridCacheContext ctx) throws IgniteCheckedException {
prepareObject(o, ctx.shared());
}
/**
* @param o Object to prepare for marshalling.
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
protected final void prepareObject(@Nullable Object o, GridCacheSharedContext ctx) throws IgniteCheckedException {
assert addDepInfo || forceAddDepInfo;
if (!skipPrepare && o != null) {
GridDeploymentInfo d = ctx.deploy().globalDeploymentInfo();
if (d != null) {
prepare(d);
// Global deployment has been injected.
skipPrepare = true;
}
else {
Class<?> cls = U.detectClass(o);
ctx.deploy().registerClass(cls);
ClassLoader ldr = U.detectClassLoader(cls);
if (ldr instanceof GridDeploymentInfo)
prepare((GridDeploymentInfo)ldr);
}
}
}
/**
* @param depInfo Deployment to set.
* @see GridCacheDeployable#prepare(GridDeploymentInfo)
*/
public final void prepare(GridDeploymentInfo depInfo) {
if (depInfo != this.depInfo) {
if (this.depInfo != null && depInfo instanceof GridDeployment)
// Make sure not to replace remote deployment with local.
if (((GridDeployment)depInfo).local())
return;
this.depInfo = depInfo instanceof GridDeploymentInfoBean ?
(GridDeploymentInfoBean)depInfo : new GridDeploymentInfoBean(depInfo);
}
}
/**
* @return Preset deployment info.
* @see GridCacheDeployable#deployInfo()
*/
public GridDeploymentInfo deployInfo() {
return depInfo;
}
/**
* This method is called before the whole message is serialized
* and is responsible for pre-marshalling state.
*
* @param ctx Cache context.
* @throws IgniteCheckedException If failed.
*/
public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
// No-op.
}
/**
* This method is called after the message is deserialized and is responsible for
* unmarshalling state marshalled in {@link #prepareMarshal(GridCacheSharedContext)} method.
*
* @param ctx Context.
* @param ldr Class loader.
* @throws IgniteCheckedException If failed.
*/
public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
// No-op.
}
/**
* @param info Entry to marshal.
* @param ctx Context.
* @param cacheObjCtx Cache object context.
* @throws IgniteCheckedException If failed.
*/
protected final void marshalInfo(GridCacheEntryInfo info,
GridCacheSharedContext ctx,
CacheObjectContext cacheObjCtx
) throws IgniteCheckedException {
assert ctx != null;
if (info != null) {
info.marshal(cacheObjCtx);
if (addDepInfo) {
if (info.key() != null)
prepareObject(info.key().value(cacheObjCtx, false), ctx);
CacheObject val = info.value();
if (val != null) {
val.finishUnmarshal(cacheObjCtx, ctx.deploy().globalLoader());
prepareObject(val.value(cacheObjCtx, false), ctx);
}
}
}
}
/**
* @param info Entry to unmarshal.
* @param ctx Context.
* @param ldr Loader.
* @throws IgniteCheckedException If failed.
*/
protected final void unmarshalInfo(GridCacheEntryInfo info, GridCacheContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
assert ldr != null;
assert ctx != null;
if (info != null)
info.unmarshal(ctx.cacheObjectContext(), ldr);
}
/**
* @param infos Entries to marshal.
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
protected final void marshalInfos(
Iterable<? extends GridCacheEntryInfo> infos,
GridCacheSharedContext ctx,
CacheObjectContext cacheObjCtx
) throws IgniteCheckedException {
assert ctx != null;
if (infos != null)
for (GridCacheEntryInfo e : infos)
marshalInfo(e, ctx, cacheObjCtx);
}
/**
* @param infos Entries to unmarshal.
* @param ctx Context.
* @param ldr Loader.
* @throws IgniteCheckedException If failed.
*/
protected final void unmarshalInfos(Iterable<? extends GridCacheEntryInfo> infos,
GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException {
assert ldr != null;
assert ctx != null;
if (infos != null)
for (GridCacheEntryInfo e : infos)
unmarshalInfo(e, ctx, ldr);
}
/**
* @param txEntries Entries to marshal.
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
protected final void marshalTx(Iterable<IgniteTxEntry> txEntries, GridCacheSharedContext ctx)
throws IgniteCheckedException {
assert ctx != null;
if (txEntries != null) {
boolean transferExpiry = transferExpiryPolicy();
boolean p2pEnabled = ctx.deploymentEnabled();
for (IgniteTxEntry e : txEntries) {
e.marshal(ctx, transferExpiry);
GridCacheContext cctx = e.context();
if (addDepInfo) {
if (e.key() != null)
prepareObject(e.key().value(cctx.cacheObjectContext(), false), ctx);
if (e.value() != null)
prepareObject(e.value().value(cctx.cacheObjectContext(), false), ctx);
if (e.entryProcessors() != null) {
for (T2<EntryProcessor<Object, Object, Object>, Object[]> entProc : e.entryProcessors())
prepareObject(entProc.get1(), ctx);
}
}
else if (p2pEnabled && e.entryProcessors() != null) {
if (!forceAddDepInfo)
forceAddDepInfo = true;
for (T2<EntryProcessor<Object, Object, Object>, Object[]> entProc : e.entryProcessors())
prepareObject(entProc.get1(), ctx);
}
}
}
}
/**
* @return {@code True} if entries expire policy should be marshalled.
*/
protected boolean transferExpiryPolicy() {
return false;
}
/**
* @param txEntries Entries to unmarshal.
* @param ctx Context.
* @param ldr Loader.
* @throws IgniteCheckedException If failed.
*/
protected final void unmarshalTx(Iterable<IgniteTxEntry> txEntries,
boolean near,
GridCacheSharedContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
assert ldr != null;
assert ctx != null;
if (txEntries != null) {
for (IgniteTxEntry e : txEntries)
e.unmarshal(ctx, near, ldr);
}
}
/**
* @param args Arguments to marshal.
* @param ctx Context.
* @return Marshalled collection.
* @throws IgniteCheckedException If failed.
*/
@Nullable protected final byte[][] marshalInvokeArguments(@Nullable Object[] args, GridCacheContext ctx)
throws IgniteCheckedException {
assert ctx != null;
if (args == null || args.length == 0)
return null;
byte[][] argsBytes = new byte[args.length][];
for (int i = 0; i < args.length; i++) {
Object arg = args[i];
if (addDepInfo)
prepareObject(arg, ctx.shared());
argsBytes[i] = arg == null ? null : CU.marshal(ctx, arg);
}
return argsBytes;
}
/**
* @param byteCol Collection to unmarshal.
* @param ctx Context.
* @param ldr Loader.
* @return Unmarshalled collection.
* @throws IgniteCheckedException If failed.
*/
@Nullable protected final Object[] unmarshalInvokeArguments(@Nullable byte[][] byteCol,
GridCacheSharedContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
assert ldr != null;
assert ctx != null;
if (byteCol == null)
return null;
Object[] args = new Object[byteCol.length];
Marshaller marsh = ctx.marshaller();
for (int i = 0; i < byteCol.length; i++)
args[i] = byteCol[i] == null ? null : U.unmarshal(marsh, byteCol[i], U.resolveClassLoader(ldr, ctx.gridConfig()));
return args;
}
/**
* @param col Collection to marshal.
* @param ctx Context.
* @return Marshalled collection.
* @throws IgniteCheckedException If failed.
*/
@Nullable protected List<byte[]> marshalCollection(@Nullable Collection<?> col,
GridCacheContext ctx) throws IgniteCheckedException {
assert ctx != null;
if (col == null)
return null;
List<byte[]> byteCol = new ArrayList<>(col.size());
for (Object o : col) {
if (addDepInfo)
prepareObject(o, ctx.shared());
byteCol.add(o == null ? null : CU.marshal(ctx, o));
}
return byteCol;
}
/**
* @param col Collection.
* @param ctx Cache context.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
public final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
GridCacheContext ctx) throws IgniteCheckedException {
if (col == null)
return;
int size = col.size();
for (int i = 0; i < size; i++)
prepareMarshalCacheObject(col.get(i), ctx);
}
/**
* @param obj Object.
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
protected final void prepareMarshalCacheObject(CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException {
if (obj != null) {
obj.prepareMarshal(ctx.cacheObjectContext());
if (addDepInfo)
prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx.shared());
}
}
/**
* @param col Collection.
* @param ctx Cache context.
* @throws IgniteCheckedException If failed.
*/
protected final void prepareMarshalCacheObjects(@Nullable Collection<? extends CacheObject> col,
GridCacheContext ctx) throws IgniteCheckedException {
if (col == null)
return;
for (CacheObject obj : col) {
if (obj != null) {
obj.prepareMarshal(ctx.cacheObjectContext());
if (addDepInfo)
prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx.shared());
}
}
}
/**
* @param col Collection.
* @param ctx Context.
* @param ldr Class loader.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
public final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
GridCacheContext ctx,
ClassLoader ldr
) throws IgniteCheckedException {
if (col == null)
return;
int size = col.size();
for (int i = 0; i < size; i++) {
CacheObject obj = col.get(i);
if (obj != null)
obj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
}
}
/**
* @param col Collection.
* @param ctx Context.
* @param ldr Class loader.
* @throws IgniteCheckedException If failed.
*/
protected final void finishUnmarshalCacheObjects(@Nullable Collection<? extends CacheObject> col,
GridCacheContext ctx,
ClassLoader ldr
) throws IgniteCheckedException {
if (col == null)
return;
for (CacheObject obj : col) {
if (obj != null)
obj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
}
}
/** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
}
/**
* @param byteCol Collection to unmarshal.
* @param ctx Context.
* @param ldr Loader.
* @return Unmarshalled collection.
* @throws IgniteCheckedException If failed.
*/
@Nullable protected <T> List<T> unmarshalCollection(@Nullable Collection<byte[]> byteCol,
GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
assert ldr != null;
assert ctx != null;
if (byteCol == null)
return null;
List<T> col = new ArrayList<>(byteCol.size());
Marshaller marsh = ctx.marshaller();
for (byte[] bytes : byteCol)
col.add(bytes == null ? null : U.<T>unmarshal(marsh, bytes, U.resolveClassLoader(ldr, ctx.gridConfig())));
return col;
}
/**
* @param ctx Context.
* @return Logger.
*/
public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
return ctx.messageLogger();
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 3;
}
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType(), fieldsCount()))
return false;
writer.onHeaderWritten();
}
switch (writer.state()) {
case 0:
if (!writer.writeMessage("depInfo", depInfo))
return false;
writer.incrementState();
case 1:
if (!writer.writeAffinityTopologyVersion("lastAffChangedTopVer", lastAffChangedTopVer))
return false;
writer.incrementState();
case 2:
if (!writer.writeLong("msgId", msgId))
return false;
writer.incrementState();
}
return true;
}
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);
if (!reader.beforeMessageRead())
return false;
switch (reader.state()) {
case 0:
depInfo = reader.readMessage("depInfo");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 1:
lastAffChangedTopVer = reader.readAffinityTopologyVersion("lastAffChangedTopVer");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 2:
msgId = reader.readLong("msgId");
if (!reader.isLastRead())
return false;
reader.incrementState();
}
return reader.afterMessageRead(GridCacheMessage.class);
}
/**
* @param str Bulder.
* @param name Flag name.
*/
protected final void appendFlag(StringBuilder str, String name) {
if (str.length() > 0)
str.append('|');
str.append(name);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheMessage.class, this);
}
}