blob: 4c101b290fd54f613b7b617ad1368d85565ed0df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.binary;
import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.CacheException;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryField;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.binary.BinaryTypeConfiguration;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.UnregisteredBinaryTypeException;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
import org.apache.ignite.internal.binary.BinaryFieldMetadata;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryMetadataHandler;
import org.apache.ignite.internal.binary.BinaryObjectEx;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.MutableSingletonList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.IgniteDiscoveryThread;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC;
/**
* Binary processor implementation.
*/
public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorImpl implements
CacheObjectBinaryProcessor {
/** */
private volatile boolean discoveryStarted;
/** */
private BinaryContext binaryCtx;
/** */
private Marshaller marsh;
/** */
private GridBinaryMarshaller binaryMarsh;
/** */
private BinaryMetadataFileStore metadataFileStore;
/**
* Custom folder specifying local folder for {@link #metadataFileStore}.<br>
* {@code null} means no specific folder is configured. <br>
* In this case folder for metadata is composed from work directory and consistentId <br>
*/
@Nullable private File binaryMetadataFileStoreDir;
/** */
@GridToStringExclude
private IgniteBinary binaries;
/** Listener removes all registered binary schemas and user type descriptors after the local client reconnected. */
private final GridLocalEventListener clientDisconLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
binaryContext().unregisterUserTypeDescriptors();
binaryContext().unregisterBinarySchemas();
metadataLocCache.clear();
}
};
/** Locally cached metadata. This local cache is managed by exchanging discovery custom events. */
private final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache = new ConcurrentHashMap<>();
/** */
private BinaryMetadataTransport transport;
/** Cached affinity key field names. */
private final ConcurrentHashMap<Integer, T1<BinaryField>> affKeyFields = new ConcurrentHashMap<>();
/**
* @param ctx Kernal context.
*/
public CacheObjectBinaryProcessorImpl(GridKernalContext ctx) {
super(ctx);
marsh = ctx.grid().configuration().getMarshaller();
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
if (marsh instanceof BinaryMarshaller) {
if (ctx.clientNode())
ctx.event().addLocalEventListener(clientDisconLsnr, EVT_CLIENT_NODE_DISCONNECTED);
if (!ctx.clientNode())
metadataFileStore = new BinaryMetadataFileStore(metadataLocCache, ctx, log, binaryMetadataFileStoreDir);
transport = new BinaryMetadataTransport(metadataLocCache, metadataFileStore, ctx, log);
BinaryMetadataHandler metaHnd = new BinaryMetadataHandler() {
@Override public void addMeta(int typeId, BinaryType newMeta, boolean failIfUnregistered) throws BinaryObjectException {
assert newMeta != null;
assert newMeta instanceof BinaryTypeImpl;
if (!discoveryStarted) {
BinaryMetadataHolder holder = metadataLocCache.get(typeId);
BinaryMetadata oldMeta = holder != null ? holder.metadata() : null;
BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, ((BinaryTypeImpl)newMeta).metadata());
if (oldMeta != mergedMeta)
metadataLocCache.put(typeId, new BinaryMetadataHolder(mergedMeta, 0, 0));
return;
}
BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
CacheObjectBinaryProcessorImpl.this.addMeta(typeId, newMeta0.wrap(binaryCtx), failIfUnregistered);
}
@Override public BinaryType metadata(int typeId) throws BinaryObjectException {
return CacheObjectBinaryProcessorImpl.this.metadata(typeId);
}
@Override public BinaryMetadata metadata0(int typeId) throws BinaryObjectException {
return CacheObjectBinaryProcessorImpl.this.metadata0(typeId);
}
@Override public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
return CacheObjectBinaryProcessorImpl.this.metadata(typeId, schemaId);
}
@Override public Collection<BinaryType> metadata() throws BinaryObjectException {
return CacheObjectBinaryProcessorImpl.this.metadata();
}
};
BinaryMarshaller bMarsh0 = (BinaryMarshaller)marsh;
binaryCtx = new BinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class));
IgniteUtils.invoke(BinaryMarshaller.class, bMarsh0, "setBinaryContext", binaryCtx, ctx.config());
binaryMarsh = new GridBinaryMarshaller(binaryCtx);
binaries = new IgniteBinaryImpl(ctx, this);
if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
BinaryConfiguration bCfg = ctx.config().getBinaryConfiguration();
if (bCfg != null) {
Map<String, Object> map = new HashMap<>();
map.put("globIdMapper", bCfg.getIdMapper() != null ? bCfg.getIdMapper().getClass().getName() : null);
map.put("globSerializer", bCfg.getSerializer() != null ? bCfg.getSerializer().getClass() : null);
map.put("compactFooter", bCfg.isCompactFooter());
if (bCfg.getTypeConfigurations() != null) {
Map<Object, Object> typeCfgsMap = new HashMap<>();
for (BinaryTypeConfiguration c : bCfg.getTypeConfigurations()) {
typeCfgsMap.put(
c.getTypeName() != null,
Arrays.asList(
c.getIdMapper() != null ? c.getIdMapper().getClass() : null,
c.getSerializer() != null ? c.getSerializer().getClass() : null,
c.isEnum()
)
);
if (c.isEnum())
BinaryUtils.validateEnumValues(c.getTypeName(), c.getEnumValues());
}
map.put("typeCfgs", typeCfgsMap);
}
ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_BINARY_CONFIGURATION, map);
}
}
if (!ctx.clientNode())
metadataFileStore.restoreMetadata();
}
}
/**
* @param lsnr Listener.
*/
public void addBinaryMetadataUpdateListener(BinaryMetadataUpdatedListener lsnr) {
if (transport != null)
transport.addBinaryMetadataUpdateListener(lsnr);
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) {
if (ctx.clientNode())
ctx.event().removeLocalEventListener(clientDisconLsnr);
if (transport != null)
transport.stop();
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
if (transport != null)
transport.onDisconnected();
}
/** {@inheritDoc} */
@Override public void onKernalStart(boolean active) throws IgniteCheckedException {
super.onKernalStart(active);
discoveryStarted = true;
}
/** {@inheritDoc} */
@Override public int typeId(String typeName) {
if (binaryCtx == null)
return super.typeId(typeName);
return binaryCtx.typeId(typeName);
}
/**
* @param obj Object.
* @return Bytes.
* @throws BinaryObjectException If failed.
*/
public byte[] marshal(@Nullable Object obj) throws BinaryObjectException {
byte[] arr = binaryMarsh.marshal(obj, false);
assert arr.length > 0;
return arr;
}
/**
* @param ptr Off-heap pointer.
* @param forceHeap If {@code true} creates heap-based object.
* @return Object.
* @throws BinaryObjectException If failed.
*/
public Object unmarshal(long ptr, boolean forceHeap) throws BinaryObjectException {
assert ptr > 0 : ptr;
int size = GridUnsafe.getInt(ptr);
ptr += 4;
byte type = GridUnsafe.getByte(ptr++);
if (type != CacheObject.TYPE_BYTE_ARR) {
assert size > 0 : size;
BinaryInputStream in = new BinaryOffheapInputStream(ptr, size, forceHeap);
return binaryMarsh.unmarshal(in);
}
else
return U.copyMemory(ptr, size);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public Object marshalToBinary(
@Nullable Object obj,
boolean failIfUnregistered
) throws BinaryObjectException {
if (obj == null)
return null;
if (BinaryUtils.isBinaryType(obj.getClass()))
return obj;
if (obj instanceof Object[]) {
Object[] arr = (Object[])obj;
Object[] pArr = new Object[arr.length];
for (int i = 0; i < arr.length; i++)
pArr[i] = marshalToBinary(arr[i], failIfUnregistered);
return pArr;
}
if (obj instanceof IgniteBiTuple) {
IgniteBiTuple tup = (IgniteBiTuple)obj;
if (obj instanceof T2)
return new T2<>(marshalToBinary(tup.get1(), failIfUnregistered),
marshalToBinary(tup.get2(), failIfUnregistered));
return new IgniteBiTuple<>(marshalToBinary(tup.get1(), failIfUnregistered),
marshalToBinary(tup.get2(), failIfUnregistered));
}
{
Collection<Object> pCol = BinaryUtils.newKnownCollection(obj);
if (pCol != null) {
Collection<?> col = (Collection<?>)obj;
for (Object item : col)
pCol.add(marshalToBinary(item, failIfUnregistered));
return (pCol instanceof MutableSingletonList) ? U.convertToSingletonList(pCol) : pCol;
}
}
{
Map<Object, Object> pMap = BinaryUtils.newKnownMap(obj);
if (pMap != null) {
Map<?, ?> map = (Map<?, ?>)obj;
for (Map.Entry<?, ?> e : map.entrySet())
pMap.put(marshalToBinary(e.getKey(), failIfUnregistered),
marshalToBinary(e.getValue(), failIfUnregistered));
return pMap;
}
}
if (obj instanceof Map.Entry) {
Map.Entry<?, ?> e = (Map.Entry<?, ?>)obj;
return new GridMapEntry<>(marshalToBinary(e.getKey(), failIfUnregistered),
marshalToBinary(e.getValue(), failIfUnregistered));
}
if (binaryMarsh.mustDeserialize(obj))
return obj; // No need to go through marshal-unmarshal because result will be the same as initial object.
byte[] arr = binaryMarsh.marshal(obj, failIfUnregistered);
assert arr.length > 0;
Object obj0 = binaryMarsh.unmarshal(arr, null);
// Possible if a class has writeObject method.
if (obj0 instanceof BinaryObjectImpl)
((BinaryObjectImpl)obj0).detachAllowed(true);
return obj0;
}
/**
* @return Marshaller.
*/
public GridBinaryMarshaller marshaller() {
return binaryMarsh;
}
/** {@inheritDoc} */
@Override public BinaryObjectBuilder builder(String clsName) {
return new BinaryObjectBuilderImpl(binaryCtx, clsName);
}
/** {@inheritDoc} */
@Override public BinaryObjectBuilder builder(BinaryObject binaryObj) {
return BinaryObjectBuilderImpl.wrap(binaryObj);
}
/** {@inheritDoc} */
@Override public void updateMetadata(int typeId, String typeName, @Nullable String affKeyFieldName,
Map<String, BinaryFieldMetadata> fieldTypeIds, boolean isEnum, @Nullable Map<String, Integer> enumMap)
throws BinaryObjectException {
BinaryMetadata meta = new BinaryMetadata(typeId, typeName, fieldTypeIds, affKeyFieldName, null, isEnum,
enumMap);
binaryCtx.updateMetadata(typeId, meta, false);
}
/** {@inheritDoc} */
@Override public void addMeta(final int typeId, final BinaryType newMeta, boolean failIfUnregistered) throws BinaryObjectException {
assert newMeta != null;
assert newMeta instanceof BinaryTypeImpl;
BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
try {
BinaryMetadataHolder metaHolder = metadataLocCache.get(typeId);
BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
//metadata requested to be added is exactly the same as already presented in the cache
if (mergedMeta == oldMeta)
return;
if (failIfUnregistered)
throw new UnregisteredBinaryTypeException(
"Attempted to update binary metadata inside a critical synchronization block (will be " +
"automatically retried). This exception must not be wrapped to any other exception class. " +
"If you encounter this exception outside of EntryProcessor, please report to Apache Ignite " +
"dev-list.",
typeId, mergedMeta);
MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get();
assert res != null;
if (res.rejected())
throw res.error();
}
catch (IgniteCheckedException e) {
throw new BinaryObjectException("Failed to update meta data for type: " + newMeta.typeName(), e);
}
}
/** {@inheritDoc} */
@Override public void addMetaLocally(int typeId, BinaryType newMeta) throws BinaryObjectException {
assert newMeta != null;
assert newMeta instanceof BinaryTypeImpl;
BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
BinaryMetadataHolder metaHolder = metadataLocCache.get(typeId);
BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
try {
BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
if (!ctx.clientNode())
metadataFileStore.mergeAndWriteMetadata(mergedMeta);
metadataLocCache.put(typeId, new BinaryMetadataHolder(mergedMeta, 0, 0));
}
catch (BinaryObjectException e) {
throw new BinaryObjectException("New binary metadata is incompatible with binary metadata" +
" persisted locally." +
" Consider cleaning up persisted metadata from <workDir>/binary_meta directory.", e);
}
}
/** {@inheritDoc} */
@Nullable @Override public BinaryType metadata(final int typeId) {
BinaryMetadata meta = metadata0(typeId);
return meta != null ? meta.wrap(binaryCtx) : null;
}
/**
* @param typeId Type ID.
* @return Meta data.
* @throws IgniteException In case of error.
*/
@Nullable public BinaryMetadata metadata0(final int typeId) {
BinaryMetadataHolder holder = metadataLocCache.get(typeId);
if (holder == null) {
if (ctx.clientNode()) {
try {
transport.requestUpToDateMetadata(typeId).get();
holder = metadataLocCache.get(typeId);
}
catch (IgniteCheckedException ignored) {
// No-op.
}
}
}
if (holder != null) {
if (IgniteThread.current() instanceof IgniteDiscoveryThread)
return holder.metadata();
if (holder.pendingVersion() - holder.acceptedVersion() > 0) {
GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(typeId, holder.pendingVersion());
if (log.isDebugEnabled() && !fut.isDone())
log.debug("Waiting for update for" +
" [typeId=" + typeId +
", pendingVer=" + holder.pendingVersion() +
", acceptedVer=" + holder.acceptedVersion() + "]");
try {
fut.get();
}
catch (IgniteCheckedException ignored) {
// No-op.
}
}
return holder.metadata();
}
else
return null;
}
/** {@inheritDoc} */
@Nullable @Override public BinaryType metadata(final int typeId, final int schemaId) {
BinaryMetadataHolder holder = metadataLocCache.get(typeId);
if (ctx.clientNode()) {
if (holder == null || !holder.metadata().hasSchema(schemaId)) {
try {
transport.requestUpToDateMetadata(typeId).get();
holder = metadataLocCache.get(typeId);
}
catch (IgniteCheckedException ignored) {
// No-op.
}
}
}
else if (holder != null) {
if (IgniteThread.current() instanceof IgniteDiscoveryThread)
return holder.metadata().wrap(binaryCtx);
if (holder.pendingVersion() - holder.acceptedVersion() > 0) {
GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(
typeId,
holder.pendingVersion());
if (log.isDebugEnabled() && !fut.isDone())
log.debug("Waiting for update for" +
" [typeId=" + typeId
+ ", schemaId=" + schemaId
+ ", pendingVer=" + holder.pendingVersion()
+ ", acceptedVer=" + holder.acceptedVersion() + "]");
try {
fut.get();
}
catch (IgniteCheckedException ignored) {
// No-op.
}
holder = metadataLocCache.get(typeId);
}
}
return holder != null ? holder.metadata().wrap(binaryCtx) : null;
}
/** {@inheritDoc} */
@Override public Map<Integer, BinaryType> metadata(Collection<Integer> typeIds)
throws BinaryObjectException {
try {
Map<Integer, BinaryType> res = U.newHashMap(metadataLocCache.size());
for (Map.Entry<Integer, BinaryMetadataHolder> e : metadataLocCache.entrySet())
res.put(e.getKey(), e.getValue().metadata().wrap(binaryCtx));
return res;
}
catch (CacheException e) {
throw new BinaryObjectException(e);
}
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public Collection<BinaryType> metadata() throws BinaryObjectException {
return F.viewReadOnly(metadataLocCache.values(), new IgniteClosure<BinaryMetadataHolder, BinaryType>() {
@Override public BinaryType apply(BinaryMetadataHolder metaHolder) {
return metaHolder.metadata().wrap(binaryCtx);
}
});
}
/** {@inheritDoc} */
@Override public BinaryObject buildEnum(String typeName, int ord) throws BinaryObjectException {
A.notNullOrEmpty(typeName, "enum type name");
int typeId = binaryCtx.typeId(typeName);
typeName = binaryCtx.userTypeName(typeName);
updateMetadata(typeId, typeName, null, null, true, null);
return new BinaryEnumObjectImpl(binaryCtx, typeId, null, ord);
}
/** {@inheritDoc} */
@Override public BinaryObject buildEnum(String typeName, String name) throws BinaryObjectException {
A.notNullOrEmpty(typeName, "enum type name");
A.notNullOrEmpty(name, "enum name");
int typeId = binaryCtx.typeId(typeName);
BinaryMetadata metadata = metadata0(typeId);
if (metadata == null)
throw new BinaryObjectException("Failed to get metadata for type [typeId=" +
typeId + ", typeName='" + typeName + "']");
Integer ordinal = metadata.getEnumOrdinalByName(name);
typeName = binaryCtx.userTypeName(typeName);
if (ordinal == null)
throw new BinaryObjectException("Failed to resolve enum ordinal by name [typeId=" +
typeId + ", typeName='" + typeName + "', name='" + name + "']");
return new BinaryEnumObjectImpl(binaryCtx, typeId, null, ordinal);
}
/** {@inheritDoc} */
@Override public BinaryType registerEnum(String typeName, Map<String, Integer> vals) throws BinaryObjectException {
A.notNullOrEmpty(typeName, "enum type name");
int typeId = binaryCtx.typeId(typeName);
typeName = binaryCtx.userTypeName(typeName);
BinaryUtils.validateEnumValues(typeName, vals);
updateMetadata(typeId, typeName, null, null, true, vals);
return binaryCtx.metadata(typeId);
}
/** {@inheritDoc} */
@Override public IgniteBinary binary() throws IgniteException {
return binaries;
}
/** {@inheritDoc} */
@Override public boolean isBinaryObject(Object obj) {
return obj instanceof BinaryObject;
}
/** {@inheritDoc} */
@Override public boolean isBinaryEnabled(CacheConfiguration<?, ?> ccfg) {
return marsh instanceof BinaryMarshaller;
}
/**
* Get affinity key field.
*
* @param typeId Binary object type ID.
* @return Affinity key.
*/
public BinaryField affinityKeyField(int typeId) {
// Fast path for already cached field.
T1<BinaryField> fieldHolder = affKeyFields.get(typeId);
if (fieldHolder != null)
return fieldHolder.get();
// Slow path if affinity field is not cached yet.
String name = binaryCtx.affinityKeyFieldName(typeId);
if (name != null) {
BinaryField field = binaryCtx.createField(typeId, name);
affKeyFields.putIfAbsent(typeId, new T1<>(field));
return field;
}
else {
affKeyFields.putIfAbsent(typeId, new T1<BinaryField>(null));
return null;
}
}
/** {@inheritDoc} */
@Override public int typeId(Object obj) {
if (obj == null)
return 0;
return isBinaryObject(obj) ? ((BinaryObjectEx)obj).typeId() : typeId(obj.getClass().getSimpleName());
}
/** {@inheritDoc} */
@Override public Object field(Object obj, String fieldName) {
if (obj == null)
return null;
return isBinaryObject(obj) ? ((BinaryObject)obj).field(fieldName) : super.field(obj, fieldName);
}
/** {@inheritDoc} */
@Override public boolean hasField(Object obj, String fieldName) {
return obj != null && ((BinaryObject)obj).hasField(fieldName);
}
/**
* @return Binary context.
*/
public BinaryContext binaryContext() {
return binaryCtx;
}
/** {@inheritDoc} */
@Override public CacheObjectContext contextForCache(CacheConfiguration cfg) throws IgniteCheckedException {
assert cfg != null;
boolean binaryEnabled = marsh instanceof BinaryMarshaller && !GridCacheUtils.isSystemCache(cfg.getName()) &&
!GridCacheUtils.isIgfsCache(ctx.config(), cfg.getName());
CacheObjectContext ctx0 = super.contextForCache(cfg);
CacheObjectContext res = new CacheObjectBinaryContext(ctx,
cfg,
ctx0.copyOnGet(),
ctx0.storeValue(),
binaryEnabled,
ctx0.addDeploymentInfo());
ctx.resource().injectGeneric(res.defaultAffMapper());
return res;
}
/** {@inheritDoc} */
@Override public byte[] marshal(CacheObjectValueContext ctx, Object val) throws IgniteCheckedException {
if (!ctx.binaryEnabled() || binaryMarsh == null)
return super.marshal(ctx, val);
byte[] arr = binaryMarsh.marshal(val, false);
assert arr.length > 0;
return arr;
}
/** {@inheritDoc} */
@Override public Object unmarshal(CacheObjectValueContext ctx, byte[] bytes, ClassLoader clsLdr)
throws IgniteCheckedException {
if (!ctx.binaryEnabled() || binaryMarsh == null)
return super.unmarshal(ctx, bytes, clsLdr);
return binaryMarsh.unmarshal(bytes, clsLdr);
}
/** {@inheritDoc} */
@Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, @Nullable GridCacheContext cctx,
Object obj, boolean userObj) {
if (!ctx.binaryEnabled())
return super.toCacheKeyObject(ctx, cctx, obj, userObj);
if (obj instanceof KeyCacheObject) {
KeyCacheObject key = (KeyCacheObject)obj;
if (key instanceof BinaryObjectImpl) {
// Need to create a copy because the key can be reused at the application layer after that (IGNITE-3505).
key = key.copy(partition(ctx, cctx, key));
}
else if (key.partition() == -1)
// Assume others KeyCacheObjects can not be reused for another cache.
key.partition(partition(ctx, cctx, key));
return key;
}
obj = toBinary(obj, false);
if (obj instanceof BinaryObjectImpl) {
((BinaryObjectImpl)obj).partition(partition(ctx, cctx, obj));
return (KeyCacheObject)obj;
}
return toCacheKeyObject0(ctx, cctx, obj, userObj);
}
/** {@inheritDoc} */
@Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj,
boolean userObj, boolean failIfUnregistered) {
if (!ctx.binaryEnabled())
return super.toCacheObject(ctx, obj, userObj, failIfUnregistered);
if (obj == null || obj instanceof CacheObject)
return (CacheObject)obj;
obj = toBinary(obj, failIfUnregistered);
if (obj instanceof CacheObject)
return (CacheObject)obj;
return toCacheObject0(obj, userObj);
}
/** {@inheritDoc} */
@Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) {
if (type == BinaryObjectImpl.TYPE_BINARY)
return new BinaryObjectImpl(binaryContext(), bytes, 0);
else if (type == BinaryObjectImpl.TYPE_BINARY_ENUM)
return new BinaryEnumObjectImpl(binaryContext(), bytes);
return super.toCacheObject(ctx, type, bytes);
}
/** {@inheritDoc} */
@Override public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type, byte[] bytes)
throws IgniteCheckedException {
if (type == BinaryObjectImpl.TYPE_BINARY)
return new BinaryObjectImpl(binaryContext(), bytes, 0);
return super.toKeyCacheObject(ctx, type, bytes);
}
/** {@inheritDoc} */
@Override public Object unwrapTemporary(GridCacheContext ctx, Object obj) throws BinaryObjectException {
if (!ctx.cacheObjectContext().binaryEnabled())
return obj;
if (obj instanceof BinaryObjectOffheapImpl)
return ((BinaryObjectOffheapImpl)obj).heapCopy();
return obj;
}
/**
* @param obj Object.
* @return Binary object.
* @throws IgniteException In case of error.
*/
@Nullable public Object toBinary(@Nullable Object obj, boolean failIfUnregistered) throws IgniteException {
if (obj == null)
return null;
if (isBinaryObject(obj))
return obj;
return marshalToBinary(obj, failIfUnregistered);
}
/** {@inheritDoc} */
@Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode rmtNode,
DiscoveryDataBag.JoiningNodeDiscoveryData discoData
) {
IgniteNodeValidationResult res;
if (getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK) || !(marsh instanceof BinaryMarshaller))
return null;
if ((res = validateBinaryConfiguration(rmtNode)) != null)
return res;
return validateBinaryMetadata(rmtNode.id(), (Map<Integer, BinaryMetadataHolder>) discoData.joiningNodeData());
}
/** */
private IgniteNodeValidationResult validateBinaryConfiguration(ClusterNode rmtNode) {
Object rmtBinaryCfg = rmtNode.attribute(IgniteNodeAttributes.ATTR_BINARY_CONFIGURATION);
ClusterNode locNode = ctx.discovery().localNode();
Object locBinaryCfg = locNode.attribute(IgniteNodeAttributes.ATTR_BINARY_CONFIGURATION);
if (!F.eq(locBinaryCfg, rmtBinaryCfg)) {
String msg = "Local node's binary configuration is not equal to remote node's binary configuration " +
"[locNodeId=%s, rmtNodeId=%s, locBinaryCfg=%s, rmtBinaryCfg=%s]";
return new IgniteNodeValidationResult(rmtNode.id(),
String.format(msg, locNode.id(), rmtNode.id(), locBinaryCfg, rmtBinaryCfg),
String.format(msg, rmtNode.id(), locNode.id(), rmtBinaryCfg, locBinaryCfg));
}
return null;
}
/** */
private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Map<Integer, BinaryMetadataHolder> newNodeMeta) {
if (newNodeMeta == null)
return null;
for (Map.Entry<Integer, BinaryMetadataHolder> metaEntry : newNodeMeta.entrySet()) {
if (!metadataLocCache.containsKey(metaEntry.getKey()))
continue;
BinaryMetadata locMeta = metadataLocCache.get(metaEntry.getKey()).metadata();
BinaryMetadata rmtMeta = metaEntry.getValue().metadata();
if (locMeta == null || rmtMeta == null)
continue;
try {
BinaryUtils.mergeMetadata(locMeta, rmtMeta);
}
catch (Exception e) {
String locMsg = "Exception was thrown when merging binary metadata from node %s: %s";
String rmtMsg = "Exception was thrown on coordinator when merging binary metadata from this node: %s";
return new IgniteNodeValidationResult(rmtNodeId,
String.format(locMsg, rmtNodeId.toString(), e.getMessage()),
String.format(rmtMsg, e.getMessage()));
}
}
return null;
}
/** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
return BINARY_PROC;
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
if (!dataBag.commonDataCollectedFor(BINARY_PROC.ordinal())) {
Map<Integer, BinaryMetadataHolder> res = U.newHashMap(metadataLocCache.size());
for (Map.Entry<Integer,BinaryMetadataHolder> e : metadataLocCache.entrySet())
res.put(e.getKey(), e.getValue());
dataBag.addGridCommonData(BINARY_PROC.ordinal(), (Serializable) res);
}
}
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
Map<Integer, BinaryMetadataHolder> res = U.newHashMap(metadataLocCache.size());
for (Map.Entry<Integer,BinaryMetadataHolder> e : metadataLocCache.entrySet())
res.put(e.getKey(), e.getValue());
dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), (Serializable) res);
}
/** {@inheritDoc} */
@Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
Map<Integer,BinaryMetadataHolder> newNodeMeta = (Map<Integer, BinaryMetadataHolder>) data.joiningNodeData();
if (newNodeMeta == null)
return;
UUID joiningNode = data.joiningNodeId();
for (Map.Entry<Integer, BinaryMetadataHolder> metaEntry : newNodeMeta.entrySet()) {
if (metadataLocCache.containsKey(metaEntry.getKey())) {
BinaryMetadataHolder localMetaHolder = metadataLocCache.get(metaEntry.getKey());
BinaryMetadata newMeta = metaEntry.getValue().metadata();
BinaryMetadata localMeta = localMetaHolder.metadata();
BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(localMeta, newMeta);
if (mergedMeta != localMeta) {
//put mergedMeta to local cache and store to disk
U.log(log,
String.format("Newer version of existing BinaryMetadata[typeId=%d, typeName=%s] " +
"is received from node %s; updating it locally",
mergedMeta.typeId(),
mergedMeta.typeName(),
joiningNode));
metadataLocCache.put(metaEntry.getKey(),
new BinaryMetadataHolder(mergedMeta,
localMetaHolder.pendingVersion(),
localMetaHolder.acceptedVersion()));
if (!ctx.clientNode())
metadataFileStore.writeMetadata(mergedMeta);
}
}
else {
BinaryMetadataHolder newMetaHolder = metaEntry.getValue();
BinaryMetadata newMeta = newMetaHolder.metadata();
U.log(log,
String.format("New BinaryMetadata[typeId=%d, typeName=%s] " +
"is received from node %s; adding it locally",
newMeta.typeId(),
newMeta.typeName(),
joiningNode));
metadataLocCache.put(metaEntry.getKey(), newMetaHolder);
if (!ctx.clientNode())
metadataFileStore.writeMetadata(newMeta);
}
}
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
Map<Integer, BinaryMetadataHolder> receivedData = (Map<Integer, BinaryMetadataHolder>) data.commonData();
if (receivedData != null) {
for (Map.Entry<Integer, BinaryMetadataHolder> e : receivedData.entrySet()) {
BinaryMetadataHolder holder = e.getValue();
BinaryMetadataHolder localHolder = new BinaryMetadataHolder(holder.metadata(),
holder.pendingVersion(),
holder.pendingVersion());
if (log.isDebugEnabled())
log.debug("Received metadata on join: " + localHolder);
metadataLocCache.put(e.getKey(), localHolder);
if (!ctx.clientNode())
metadataFileStore.writeMetadata(holder.metadata());
}
}
}
/**
* Sets path to binary metadata store configured by user, should include binary_meta and consistentId
* @param binaryMetadataFileStoreDir path to binary_meta
*/
public void setBinaryMetadataFileStoreDir(@Nullable File binaryMetadataFileStoreDir) {
this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir;
}
}