blob: d0a4261b37db728a9bb82469c1459a90104a7e25 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.UnregisteredBinaryTypeException;
import org.apache.ignite.internal.UnregisteredClassException;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
* Return value for cases where both, value and success flag need to be returned.
*/
public class GridCacheReturn implements Externalizable, Message {
/** */
private static final long serialVersionUID = 0L;
/** Value. */
@GridToStringInclude(sensitive = true)
@GridDirectTransient
private volatile Object v;
/** */
private CacheObject cacheObj;
/** */
@GridDirectCollection(CacheInvokeDirectResult.class)
private Collection<CacheInvokeDirectResult> invokeResCol;
/** Success flag. */
private volatile boolean success;
/** */
private volatile boolean invokeRes;
/** Local result flag, if non local then do not need unwrap cache objects. */
@GridDirectTransient
private transient boolean loc;
/** */
private int cacheId;
/**
* Empty constructor.
*/
public GridCacheReturn() {
loc = true;
}
/**
* @param loc {@code True} if created on the node initiated cache operation.
*/
public GridCacheReturn(boolean loc) {
this.loc = loc;
}
/**
* @param loc {@code True} if created on the node initiated cache operation.
* @param success Success flag.
*/
public GridCacheReturn(boolean loc, boolean success) {
this.loc = loc;
this.success = success;
}
/**
* @param cctx Cache context.
* @param loc {@code True} if created on the node initiated cache operation.
* @param keepBinary True is deserialize value from a binary representation, false otherwise.
* @param ldr Class loader, used for deserialization from binary representation.
* @param v Value.
* @param success Success flag.
*/
public GridCacheReturn(
GridCacheContext cctx,
boolean loc,
boolean keepBinary,
@Nullable ClassLoader ldr,
Object v,
boolean success
) {
this.loc = loc;
this.success = success;
if (v != null) {
if (v instanceof CacheObject)
initValue(cctx, (CacheObject)v, keepBinary, ldr);
else {
assert loc;
this.v = v;
}
}
}
/**
* @return Value.
*/
@Nullable public <V> V value() {
return (V)v;
}
/**
*
*/
public boolean emptyResult() {
return !invokeRes && v == null && cacheObj == null && success;
}
/**
* @return If return is invoke result.
*/
public boolean invokeResult() {
return invokeRes;
}
/**
* @param invokeRes Invoke result flag.
*/
public void invokeResult(boolean invokeRes) {
this.invokeRes = invokeRes;
}
/**
* @param cctx Cache context.
* @param v Value.
* @param keepBinary Keep binary flag.
* @param ldr Class loader, used for deserialization from binary representation.
* @return This instance for chaining.
*/
public GridCacheReturn value(GridCacheContext cctx, CacheObject v, boolean keepBinary, @Nullable ClassLoader ldr) {
initValue(cctx, v, keepBinary, ldr);
return this;
}
/**
* @return Success flag.
*/
public boolean success() {
return success;
}
/**
* @param cctx Cache context.
* @param cacheObj Value to set.
* @param success Success flag to set.
* @param keepBinary Keep binary flag.
* @param ldr Class loader, used for deserialization from binary representation.
* @return This instance for chaining.
*/
public GridCacheReturn set(
GridCacheContext cctx,
@Nullable CacheObject cacheObj,
boolean success,
boolean keepBinary,
@Nullable ClassLoader ldr
) {
this.success = success;
initValue(cctx, cacheObj, keepBinary, ldr);
return this;
}
/**
* @param cctx Cache context.
* @param cacheObj Cache object.
* @param keepBinary Keep binary flag.
* @param ldr Class loader, used for deserialization from binary representation.
*/
private void initValue(
GridCacheContext cctx,
@Nullable CacheObject cacheObj,
boolean keepBinary,
@Nullable ClassLoader ldr
) {
if (loc)
v = cctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheObj, keepBinary, true, ldr);
else {
assert cacheId == 0 || cacheId == cctx.cacheId();
cacheId = cctx.cacheId();
this.cacheObj = cacheObj;
}
}
/**
* @param success Success flag.
* @return This instance for chaining.
*/
public GridCacheReturn success(boolean success) {
this.success = success;
return this;
}
/**
* @param cctx Context.
* @param key Key.
* @param key0 Key value.
* @param res Result.
* @param err Error.
* @param keepBinary Keep binary.
*/
public synchronized void addEntryProcessResult(
GridCacheContext cctx,
KeyCacheObject key,
@Nullable Object key0,
@Nullable Object res,
@Nullable Exception err,
boolean keepBinary) {
assert v == null || v instanceof Map : v;
assert key != null;
assert res != null || err != null;
invokeRes = true;
if (loc) {
HashMap<Object, EntryProcessorResult> resMap = (HashMap<Object, EntryProcessorResult>)v;
if (resMap == null) {
resMap = new HashMap<>();
v = resMap;
}
// These exceptions mean that we should register class and call EntryProcessor again.
if (err != null) {
if (err instanceof UnregisteredClassException)
throw (UnregisteredClassException)err;
else if (err instanceof UnregisteredBinaryTypeException)
throw (UnregisteredBinaryTypeException)err;
}
CacheInvokeResult res0 = err == null ? CacheInvokeResult.fromResult(res) : CacheInvokeResult.fromError(err);
Object resKey = key0 != null ? key0 :
((keepBinary && key instanceof BinaryObject) ? key : CU.value(key, cctx, true));
resMap.put(resKey, res0);
}
else {
assert v == null;
assert cacheId == 0 || cacheId == cctx.cacheId();
cacheId = cctx.cacheId();
if (invokeResCol == null)
invokeResCol = new ArrayList<>();
CacheInvokeDirectResult res0 = err == null ?
cctx.transactional() ?
new CacheInvokeDirectResult(key, cctx.toCacheObject(res)) :
CacheInvokeDirectResult.lazyResult(key, res) :
new CacheInvokeDirectResult(key, err);
invokeResCol.add(res0);
}
}
/**
* @return Cache ID.
*/
public int cacheId() {
return cacheId;
}
/**
* @param other Other result to merge with.
*/
public synchronized void mergeEntryProcessResults(GridCacheReturn other) {
assert invokeRes || v == null : "Invalid state to merge: " + this;
assert other.invokeRes;
assert loc == other.loc : loc;
if (other.v == null)
return;
invokeRes = true;
HashMap<Object, EntryProcessorResult> resMap = (HashMap<Object, EntryProcessorResult>)v;
if (resMap == null) {
resMap = new HashMap<>();
v = resMap;
}
resMap.putAll((Map<Object, EntryProcessorResult>)other.v);
}
/**
* Converts entry processor invokation results to cache object instances.
*
* @param ctx Cache context.
*/
public void marshalResult(GridCacheContext ctx) {
if (invokeRes && invokeResCol != null) {
for (CacheInvokeDirectResult directRes : invokeResCol)
directRes.marshalResult(ctx);
}
}
/**
* @param ctx Cache context.
* @throws IgniteCheckedException If failed.
*/
public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException {
assert !loc;
if (cacheObj != null)
cacheObj.prepareMarshal(ctx.cacheObjectContext());
if (invokeRes && invokeResCol != null) {
for (CacheInvokeDirectResult res : invokeResCol)
res.prepareMarshal(ctx);
}
}
/**
* @param ctx Cache context.
* @param ldr Class loader.
* @throws IgniteCheckedException If failed.
*/
public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException {
loc = true;
if (cacheObj != null) {
cacheObj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
v = ctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheObj, true, false, ldr);
}
if (invokeRes && invokeResCol != null) {
for (CacheInvokeDirectResult res : invokeResCol)
res.finishUnmarshal(ctx, ldr);
Map<Object, CacheInvokeResult> map0 = U.newHashMap(invokeResCol.size());
for (CacheInvokeDirectResult res : invokeResCol) {
CacheInvokeResult<?> res0 = res.error() == null ?
CacheInvokeResult.fromResult(ctx.cacheObjectContext().unwrapBinaryIfNeeded(res.result(), true, false, null)) :
CacheInvokeResult.fromError(res.error());
map0.put(ctx.cacheObjectContext().unwrapBinaryIfNeeded(res.key(), true, false, null), res0);
}
v = map0;
}
}
/** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
}
/** {@inheritDoc} */
@Override public short directType() {
return 88;
}
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType(), fieldsCount()))
return false;
writer.onHeaderWritten();
}
switch (writer.state()) {
case 0:
if (!writer.writeInt("cacheId", cacheId))
return false;
writer.incrementState();
case 1:
if (!writer.writeMessage("cacheObj", cacheObj))
return false;
writer.incrementState();
case 2:
if (!writer.writeBoolean("invokeRes", invokeRes))
return false;
writer.incrementState();
case 3:
if (!writer.writeCollection("invokeResCol", invokeResCol, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 4:
if (!writer.writeBoolean("success", success))
return false;
writer.incrementState();
}
return true;
}
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);
if (!reader.beforeMessageRead())
return false;
switch (reader.state()) {
case 0:
cacheId = reader.readInt("cacheId");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 1:
cacheObj = reader.readMessage("cacheObj");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 2:
invokeRes = reader.readBoolean("invokeRes");
if (!reader.isLastRead())
return false;
reader.incrementState();
case 3:
invokeResCol = reader.readCollection("invokeResCol", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
reader.incrementState();
case 4:
success = reader.readBoolean("success");
if (!reader.isLastRead())
return false;
reader.incrementState();
}
return reader.afterMessageRead(GridCacheReturn.class);
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 5;
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
assert false;
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
assert false;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheReturn.class, this);
}
}