blob: 61dec3a4768b4dd476817c6654d1047e01079a2c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.platform;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.PlatformConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.cluster.DetachedClusterNode;
import org.apache.ignite.internal.logger.platform.PlatformLogger;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongImpl;
import org.apache.ignite.internal.processors.platform.binary.PlatformBinaryProcessor;
import org.apache.ignite.internal.processors.platform.cache.PlatformCache;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheExtension;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheManager;
import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinity;
import org.apache.ignite.internal.processors.platform.cache.store.PlatformCacheStore;
import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterGroup;
import org.apache.ignite.internal.processors.platform.datastreamer.PlatformDataStreamer;
import org.apache.ignite.internal.processors.platform.datastructures.PlatformAtomicLong;
import org.apache.ignite.internal.processors.platform.datastructures.PlatformAtomicReference;
import org.apache.ignite.internal.processors.platform.datastructures.PlatformAtomicSequence;
import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.transactions.PlatformTransactions;
import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.platform.PlatformAbstractTarget.FALSE;
import static org.apache.ignite.internal.processors.platform.PlatformAbstractTarget.TRUE;
/**
* Platform processor.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class PlatformProcessorImpl extends GridProcessorAdapter implements PlatformProcessor, PlatformTarget {
/** */
private static final int OP_GET_CACHE = 1;
/** */
private static final int OP_CREATE_CACHE = 2;
/** */
private static final int OP_GET_OR_CREATE_CACHE = 3;
/** */
private static final int OP_CREATE_CACHE_FROM_CONFIG = 4;
/** */
private static final int OP_GET_OR_CREATE_CACHE_FROM_CONFIG = 5;
/** */
private static final int OP_DESTROY_CACHE = 6;
/** */
private static final int OP_GET_AFFINITY = 7;
/** */
private static final int OP_GET_DATA_STREAMER = 8;
/** */
private static final int OP_GET_TRANSACTIONS = 9;
/** */
private static final int OP_GET_CLUSTER_GROUP = 10;
/** */
private static final int OP_GET_EXTENSION = 11;
/** */
private static final int OP_GET_ATOMIC_LONG = 12;
/** */
private static final int OP_GET_ATOMIC_REFERENCE = 13;
/** */
private static final int OP_GET_ATOMIC_SEQUENCE = 14;
/** */
private static final int OP_GET_IGNITE_CONFIGURATION = 15;
/** */
private static final int OP_GET_CACHE_NAMES = 16;
/** */
private static final int OP_CREATE_NEAR_CACHE = 17;
/** */
private static final int OP_GET_OR_CREATE_NEAR_CACHE = 18;
/** */
private static final int OP_LOGGER_IS_LEVEL_ENABLED = 19;
/** */
private static final int OP_LOGGER_LOG = 20;
/** */
private static final int OP_GET_BINARY_PROCESSOR = 21;
/** */
private static final int OP_RELEASE_START = 22;
/** */
private static final int OP_ADD_CACHE_CONFIGURATION = 23;
/** */
private static final int OP_SET_BASELINE_TOPOLOGY_VER = 24;
/** */
private static final int OP_SET_BASELINE_TOPOLOGY_NODES = 25;
/** */
private static final int OP_GET_BASELINE_TOPOLOGY = 26;
/** */
private static final int OP_DISABLE_WAL = 27;
/** */
private static final int OP_ENABLE_WAL = 28;
/** */
private static final int OP_IS_WAL_ENABLED = 29;
/** */
private static final int OP_SET_TX_TIMEOUT_ON_PME = 30;
/** */
private static final int OP_NODE_VERSION = 31;
/** */
private static final int OP_IS_BASELINE_AUTO_ADJ_ENABLED = 32;
/** */
private static final int OP_SET_BASELINE_AUTO_ADJ_ENABLED = 33;
/** */
private static final int OP_GET_BASELINE_AUTO_ADJ_TIMEOUT = 34;
/** */
private static final int OP_SET_BASELINE_AUTO_ADJ_TIMEOUT = 35;
/** */
private static final int OP_GET_CACHE_CONFIG = 36;
/** */
private static final int OP_GET_THREAD_LOCAL = 37;
/** */
private static final int OP_GET_OR_CREATE_LOCK = 38;
/** Start latch. */
private final CountDownLatch startLatch = new CountDownLatch(1);
/** Stores pending initialization. */
private final Collection<StoreInfo> pendingStores =
Collections.newSetFromMap(new ConcurrentHashMap<StoreInfo, Boolean>());
/** Lock for store lifecycle operations. */
private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
/** Logger. */
@SuppressWarnings("FieldCanBeLocal")
private final IgniteLogger log;
/** Context. */
private final PlatformContext platformCtx;
/** Interop configuration. */
private final PlatformConfigurationEx interopCfg;
/** Extensions. */
private final PlatformPluginExtension[] extensions;
/** Whether processor is started. */
private boolean started;
/** Whether processor if stopped (or stopping). */
private volatile boolean stopped;
/** Cache extensions. */
private final PlatformCacheExtension[] cacheExts;
/** Cluster restart flag for the reconnect callback. */
private volatile boolean clusterRestarted;
/** Thread local storage for platform needs. */
private final ThreadLocal<Object> threadLocal = new ThreadLocal<>();
/**
* Constructor.
*
* @param ctx Kernal context.
*/
public PlatformProcessorImpl(GridKernalContext ctx) {
super(ctx);
log = ctx.log(PlatformProcessorImpl.class);
PlatformConfiguration interopCfg0 = ctx.config().getPlatformConfiguration();
assert interopCfg0 != null : "Must be checked earlier during component creation.";
if (!(interopCfg0 instanceof PlatformConfigurationEx))
throw new IgniteException("Unsupported platform configuration: " + interopCfg0.getClass().getName());
interopCfg = (PlatformConfigurationEx)interopCfg0;
if (!F.isEmpty(interopCfg.warnings())) {
for (String w : interopCfg.warnings())
U.warn(log, w);
}
platformCtx = new PlatformContextImpl(ctx, interopCfg.gate(), interopCfg.memory(), interopCfg.platform());
// Initialize cache extensions (if any).
cacheExts = prepareCacheExtensions(interopCfg.cacheExtensions());
if (interopCfg.logger() != null)
interopCfg.logger().setContext(platformCtx);
// Initialize extensions (if any).
extensions = prepareExtensions(ctx.plugins().extensions(PlatformPluginExtension.class));
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
try (PlatformMemory mem = platformCtx.memory().allocate()) {
PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = platformCtx.writer(out);
writer.writeString(ctx.igniteInstanceName());
out.synchronize();
platformCtx.gateway().onStart(new PlatformTargetProxyImpl(this, platformCtx), mem.pointer());
}
// At this moment all necessary native libraries must be loaded, so we can process with store creation.
storeLock.writeLock().lock();
try {
for (StoreInfo store : pendingStores)
registerStore0(store.store, store.convertBinary);
pendingStores.clear();
started = true;
}
finally {
storeLock.writeLock().unlock();
}
// Add Interop node attributes.
ctx.addNodeAttribute(PlatformUtils.ATTR_PLATFORM, interopCfg.platform());
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
startLatch.countDown();
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
if (platformCtx != null) {
stopped = true;
platformCtx.gateway().onStop();
}
}
/** {@inheritDoc} */
@Override public Ignite ignite() {
return ctx.grid();
}
/** {@inheritDoc} */
@Override public long environmentPointer() {
return platformCtx.gateway().environmentPointer();
}
/** {@inheritDoc} */
@Override public void releaseStart() {
startLatch.countDown();
}
/** {@inheritDoc} */
@Override public void awaitStart() throws IgniteCheckedException {
U.await(startLatch);
}
/** {@inheritDoc} */
@Override public PlatformContext context() {
return platformCtx;
}
/** {@inheritDoc} */
@Override public boolean hasContext() {
return true;
}
/** {@inheritDoc} */
@Override public void registerStore(PlatformCacheStore store, boolean convertBinary)
throws IgniteCheckedException {
storeLock.readLock().lock();
try {
if (stopped)
throw new IgniteCheckedException("Failed to initialize interop store because node is stopping: " +
store);
if (started)
registerStore0(store, convertBinary);
else
pendingStores.add(new StoreInfo(store, convertBinary));
}
finally {
storeLock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public PlatformCacheManager cacheManager() {
return new PlatformCacheManager(platformCtx.gateway());
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
platformCtx.gateway().onClientDisconnected();
// 1) onReconnected is called on all grid components.
// 2) After all of grid components have completed their reconnection, reconnectFut is completed.
reconnectFut.listen(new CI1<IgniteFuture<?>>() {
@Override public void apply(IgniteFuture<?> future) {
platformCtx.gateway().onClientReconnected(clusterRestarted);
}
});
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
// Save the flag value for callback of reconnectFut.
this.clusterRestarted = clusterRestarted;
return null;
}
/**
* Creates new platform cache.
*/
private PlatformTarget createPlatformCache(IgniteCacheProxy cache) {
assert cache != null;
return new PlatformCache(platformCtx, cache, false, cacheExts);
}
/**
* Checks whether logger level is enabled.
*
* @param level Level.
* @return Result.
*/
private boolean loggerIsLevelEnabled(int level) {
IgniteLogger log = ctx.grid().log();
switch (level) {
case PlatformLogger.LVL_TRACE:
return log.isTraceEnabled();
case PlatformLogger.LVL_DEBUG:
return log.isDebugEnabled();
case PlatformLogger.LVL_INFO:
return log.isInfoEnabled();
case PlatformLogger.LVL_WARN:
return true;
case PlatformLogger.LVL_ERROR:
return true;
default:
assert false;
}
return false;
}
/**
* Logs to the Ignite logger.
*
* @param level Level.
* @param message Message.
* @param category Category.
* @param errorInfo Exception.
*/
private void loggerLog(int level, String message, String category, String errorInfo) {
IgniteLogger log = ctx.grid().log();
if (category != null)
log = log.getLogger(category);
Throwable err = errorInfo == null ? null : new IgniteException("Platform error:" + errorInfo);
switch (level) {
case PlatformLogger.LVL_TRACE:
log.trace(message);
break;
case PlatformLogger.LVL_DEBUG:
log.debug(message);
break;
case PlatformLogger.LVL_INFO:
log.info(message);
break;
case PlatformLogger.LVL_WARN:
log.warning(message, err);
break;
case PlatformLogger.LVL_ERROR:
log.error(message, err);
break;
default:
assert false;
}
}
/** {@inheritDoc} */
@Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_LOGGER_IS_LEVEL_ENABLED: {
return loggerIsLevelEnabled((int) val) ? TRUE : FALSE;
}
case OP_RELEASE_START: {
releaseStart();
return 0;
}
case OP_SET_BASELINE_TOPOLOGY_VER: {
ctx.grid().cluster().setBaselineTopology(val);
return 0;
}
case OP_SET_BASELINE_AUTO_ADJ_TIMEOUT: {
ctx.grid().cluster().baselineAutoAdjustTimeout(val);
return 0;
}
}
return PlatformAbstractTarget.throwUnsupported(type);
}
/** {@inheritDoc} */
@Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_DESTROY_CACHE: {
ctx.grid().destroyCache(reader.readString());
return 0;
}
case OP_LOGGER_LOG: {
loggerLog(reader.readInt(), reader.readString(), reader.readString(), reader.readString());
return 0;
}
case OP_SET_BASELINE_TOPOLOGY_NODES: {
int cnt = reader.readInt();
Collection<BaselineNode> nodes = new ArrayList<>(cnt);
for (int i = 0; i < cnt; i++) {
Object consId = reader.readObjectDetached();
Map<String, Object> attrs = PlatformUtils.readNodeAttributes(reader);
nodes.add(new DetachedClusterNode(consId, attrs));
}
ctx.grid().cluster().setBaselineTopology(nodes);
return 0;
}
case OP_ADD_CACHE_CONFIGURATION:
CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader);
ctx.grid().addCacheConfiguration(cfg);
return 0;
case OP_DISABLE_WAL:
ctx.grid().cluster().disableWal(reader.readString());
return 0;
case OP_ENABLE_WAL:
ctx.grid().cluster().enableWal(reader.readString());
return 0;
case OP_SET_TX_TIMEOUT_ON_PME:
ctx.grid().cluster().setTxTimeoutOnPartitionMapExchange(reader.readLong());
return 0;
case OP_IS_WAL_ENABLED:
return ctx.grid().cluster().isWalEnabled(reader.readString()) ? TRUE : FALSE;
case OP_IS_BASELINE_AUTO_ADJ_ENABLED:
return ctx.grid().cluster().isBaselineAutoAdjustEnabled() ? TRUE : FALSE;
case OP_SET_BASELINE_AUTO_ADJ_ENABLED:
boolean isEnabled = reader.readBoolean();
ctx.grid().cluster().baselineAutoAdjustEnabled(isEnabled);
return 0;
case OP_GET_BASELINE_AUTO_ADJ_TIMEOUT:
return ctx.grid().cluster().baselineAutoAdjustTimeout();
}
return PlatformAbstractTarget.throwUnsupported(type);
}
/** {@inheritDoc} */
@Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException {
return processInStreamOutLong(type, reader);
}
/** {@inheritDoc} */
@Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException {
if (type == OP_GET_CACHE_CONFIG) {
int cacheId = reader.readInt();
CacheConfiguration cfg = ctx.cache().cacheDescriptor(cacheId).cacheConfiguration();
PlatformConfigurationUtils.writeCacheConfiguration(writer, cfg);
return;
}
PlatformAbstractTarget.throwUnsupported(type);
}
/** {@inheritDoc} */
@Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_GET_CACHE: {
String name = reader.readString();
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().cache(name);
if (cache == null)
throw new IllegalArgumentException("Cache doesn't exist: " + name);
return createPlatformCache(cache);
}
case OP_CREATE_CACHE: {
String name = reader.readString();
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createCache(name);
return createPlatformCache(cache);
}
case OP_GET_OR_CREATE_CACHE: {
String name = reader.readString();
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateCache(name);
return createPlatformCache(cache);
}
case OP_CREATE_CACHE_FROM_CONFIG: {
CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader);
IgniteCacheProxy cache = reader.readBoolean()
? (IgniteCacheProxy)ctx.grid().createCache(cfg, PlatformConfigurationUtils.readNearConfiguration(reader))
: (IgniteCacheProxy)ctx.grid().createCache(cfg);
setPlatformCache(reader, cache);
return createPlatformCache(cache);
}
case OP_GET_OR_CREATE_CACHE_FROM_CONFIG: {
CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader);
IgniteCacheProxy cache = reader.readBoolean()
? (IgniteCacheProxy)ctx.grid().getOrCreateCache(cfg,
PlatformConfigurationUtils.readNearConfiguration(reader))
: (IgniteCacheProxy)ctx.grid().getOrCreateCache(cfg);
setPlatformCache(reader, cache);
return createPlatformCache(cache);
}
case OP_GET_AFFINITY: {
return new PlatformAffinity(platformCtx, ctx, reader.readString());
}
case OP_GET_DATA_STREAMER: {
String cacheName = reader.readString();
boolean keepBinary = reader.readBoolean();
IgniteDataStreamer ldr = ctx.dataStream().dataStreamer(cacheName);
ldr.keepBinary(true);
return new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepBinary);
}
case OP_GET_EXTENSION: {
int id = reader.readInt();
if (extensions != null && id < extensions.length) {
PlatformPluginExtension ext = extensions[id];
if (ext != null) {
return ext.createTarget();
}
}
throw new IgniteException("Platform extension is not registered [id=" + id + ']');
}
case OP_GET_ATOMIC_LONG: {
String name = reader.readString();
long initVal = reader.readLong();
boolean create = reader.readBoolean();
GridCacheAtomicLongImpl atomicLong = (GridCacheAtomicLongImpl)ignite().atomicLong(name, initVal, create);
if (atomicLong == null)
return null;
return new PlatformAtomicLong(platformCtx, atomicLong);
}
case OP_GET_ATOMIC_REFERENCE: {
String name = reader.readString();
Object initVal = reader.readObjectDetached();
boolean create = reader.readBoolean();
return PlatformAtomicReference.createInstance(platformCtx, name, initVal, create);
}
case OP_GET_ATOMIC_SEQUENCE: {
String name = reader.readString();
long initVal = reader.readLong();
boolean create = reader.readBoolean();
IgniteAtomicSequence atomicSeq = ignite().atomicSequence(name, initVal, create);
if (atomicSeq == null)
return null;
return new PlatformAtomicSequence(platformCtx, atomicSeq);
}
case OP_CREATE_NEAR_CACHE: {
String cacheName = reader.readString();
NearCacheConfiguration cfg = PlatformConfigurationUtils.readNearConfiguration(reader);
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createNearCache(cacheName, cfg);
setPlatformCache(reader, cache);
return createPlatformCache(cache);
}
case OP_GET_OR_CREATE_NEAR_CACHE: {
String cacheName = reader.readString();
NearCacheConfiguration cfg = PlatformConfigurationUtils.readNearConfiguration(reader);
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateNearCache(cacheName, cfg);
setPlatformCache(reader, cache);
return createPlatformCache(cache);
}
case OP_GET_TRANSACTIONS: {
String lbl = reader.readString();
return new PlatformTransactions(platformCtx, lbl);
}
case OP_GET_OR_CREATE_LOCK: {
String name = reader.readString();
boolean failoverSafe = reader.readBoolean();
boolean fair = reader.readBoolean();
boolean create = reader.readBoolean();
IgniteLock lock = ctx.grid().reentrantLock(name, failoverSafe, fair, create);
return lock == null ? null : new PlatformLock(platformCtx, lock);
}
}
return PlatformAbstractTarget.throwUnsupported(type);
}
/** {@inheritDoc} */
@Override public PlatformTarget processInObjectStreamOutObjectStream(int type, @Nullable PlatformTarget arg,
BinaryRawReaderEx reader,
BinaryRawWriterEx writer)
throws IgniteCheckedException {
return PlatformAbstractTarget.throwUnsupported(type);
}
/** {@inheritDoc} */
@Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_GET_IGNITE_CONFIGURATION: {
PlatformConfigurationUtils.writeIgniteConfiguration(writer, ignite().configuration());
return;
}
case OP_GET_CACHE_NAMES: {
Collection<String> names = ignite().cacheNames();
writer.writeInt(names.size());
for (String name : names)
writer.writeString(name);
return;
}
case OP_GET_BASELINE_TOPOLOGY: {
Collection<BaselineNode> blt = ignite().cluster().currentBaselineTopology();
writer.writeInt(blt.size());
for (BaselineNode n : blt) {
writer.writeObjectDetached(n.consistentId());
PlatformUtils.writeNodeAttributes(writer, n.attributes());
}
return;
}
case OP_NODE_VERSION: {
IgniteProductVersion productVersion = ignite().cluster().node().version();
PlatformUtils.writeNodeVersion(writer, productVersion);
return;
}
case OP_GET_THREAD_LOCAL: {
writer.writeObjectDetached(threadLocal.get());
return;
}
}
PlatformAbstractTarget.throwUnsupported(type);
}
/** {@inheritDoc} */
@Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_GET_TRANSACTIONS:
return new PlatformTransactions(platformCtx);
case OP_GET_CLUSTER_GROUP:
return new PlatformClusterGroup(platformCtx, ctx.grid().cluster());
case OP_GET_BINARY_PROCESSOR: {
return new PlatformBinaryProcessor(platformCtx);
}
}
return PlatformAbstractTarget.throwUnsupported(type);
}
/** {@inheritDoc} */
@Override public PlatformAsyncResult processInStreamAsync(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
return PlatformAbstractTarget.throwUnsupported(type);
}
/** {@inheritDoc} */
@Override public Exception convertException(Exception e) {
return e;
}
/** {@inheritDoc} */
@Override public void setThreadLocal(Object value) {
threadLocal.set(value);
}
/**
* Internal store initialization routine.
*
* @param store Store.
* @param convertBinary Convert binary flag.
* @throws IgniteCheckedException If failed.
*/
private void registerStore0(PlatformCacheStore store, boolean convertBinary) throws IgniteCheckedException {
if (store instanceof PlatformDotNetCacheStore) {
PlatformDotNetCacheStore store0 = (PlatformDotNetCacheStore)store;
store0.initialize(ctx, convertBinary);
}
else
throw new IgniteCheckedException("Unsupported interop store: " + store);
}
/**
* Sets platform cache config when present in the given reader.
*
* @param reader Reader.
* @param cache Cache.
*/
private static void setPlatformCache(BinaryRawReaderEx reader, IgniteCacheProxy cache) {
if (reader.readBoolean())
cache.context().cache().configuration().setPlatformCacheConfiguration(
PlatformConfigurationUtils.readPlatformCacheConfiguration(reader));
}
/**
* Prepare cache extensions.
*
* @param cacheExts Original extensions.
* @return Prepared extensions.
*/
private static PlatformCacheExtension[] prepareCacheExtensions(Collection<PlatformCacheExtension> cacheExts) {
if (!F.isEmpty(cacheExts)) {
int maxExtId = 0;
Map<Integer, PlatformCacheExtension> idToExt = new HashMap<>();
for (PlatformCacheExtension cacheExt : cacheExts) {
if (cacheExt == null)
throw new IgniteException("Platform cache extension cannot be null.");
if (cacheExt.id() < 0)
throw new IgniteException("Platform cache extension ID cannot be negative: " + cacheExt);
PlatformCacheExtension oldCacheExt = idToExt.put(cacheExt.id(), cacheExt);
if (oldCacheExt != null)
throw new IgniteException("Platform cache extensions cannot have the same ID [" +
"id=" + cacheExt.id() + ", first=" + oldCacheExt + ", second=" + cacheExt + ']');
if (cacheExt.id() > maxExtId)
maxExtId = cacheExt.id();
}
PlatformCacheExtension[] res = new PlatformCacheExtension[maxExtId + 1];
for (PlatformCacheExtension cacheExt : cacheExts)
res[cacheExt.id()] = cacheExt;
return res;
}
else
//noinspection ZeroLengthArrayAllocation
return new PlatformCacheExtension[0];
}
/**
* Prepare extensions.
*
* @param exts Original extensions.
* @return Prepared extensions.
*/
private static PlatformPluginExtension[] prepareExtensions(PlatformPluginExtension[] exts) {
if (!F.isEmpty(exts)) {
int maxExtId = 0;
Map<Integer, PlatformPluginExtension> idToExt = new HashMap<>();
for (PlatformPluginExtension ext : exts) {
if (ext == null)
throw new IgniteException("Platform extension cannot be null.");
if (ext.id() < 0)
throw new IgniteException("Platform extension ID cannot be negative: " + ext);
PlatformPluginExtension oldCacheExt = idToExt.put(ext.id(), ext);
if (oldCacheExt != null)
throw new IgniteException("Platform extensions cannot have the same ID [" +
"id=" + ext.id() + ", first=" + oldCacheExt + ", second=" + ext + ']');
if (ext.id() > maxExtId)
maxExtId = ext.id();
}
PlatformPluginExtension[] res = new PlatformPluginExtension[maxExtId + 1];
for (PlatformPluginExtension ext : exts)
res[ext.id()] = ext;
return res;
}
else
//noinspection ZeroLengthArrayAllocation
return new PlatformPluginExtension[0];
}
/**
* Store and manager pair.
*/
private static class StoreInfo {
/** Store. */
private final PlatformCacheStore store;
/** Convert binary flag. */
private final boolean convertBinary;
/**
* Constructor.
*
* @param store Store.
* @param convertBinary Convert binary flag.
*/
private StoreInfo(PlatformCacheStore store, boolean convertBinary) {
this.store = store;
this.convertBinary = convertBinary;
}
}
}