blob: b80ae3690a03f10de2a45108b3575c3dadc7825b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.marshaller;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.jetbrains.annotations.Nullable;
/**
* Provides capabilities of sending custom discovery events to propose new mapping
* or request missing mapping to {@link MarshallerContextImpl}.
*
* For more information about particular events see documentation of {@link GridMarshallerMappingProcessor}.
*/
public final class MarshallerMappingTransport {
/** */
private final GridKernalContext ctx;
/** */
private final GridDiscoveryManager discoMgr;
/** */
private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchSyncMap;
/** */
private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap;
/** */
private volatile boolean stopping;
/**
* @param ctx Context.
* @param mappingExchSyncMap Mapping exch sync map.
* @param clientReqSyncMap Client request sync map.
*/
MarshallerMappingTransport(
GridKernalContext ctx,
ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchSyncMap,
ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap
) {
this.ctx = ctx;
discoMgr = ctx.discovery();
this.mappingExchSyncMap = mappingExchSyncMap;
this.clientReqSyncMap = clientReqSyncMap;
stopping = false;
}
/**
* @param item Item.
* @param cache Cache.
*/
public GridFutureAdapter<MappingExchangeResult> awaitMappingAcceptance(
MarshallerMappingItem item, ConcurrentMap<Integer,
MappedName> cache
) {
GridFutureAdapter<MappingExchangeResult> fut = new MappingExchangeResultFuture(item);
GridFutureAdapter<MappingExchangeResult> oldFut = mappingExchSyncMap.putIfAbsent(item, fut);
if (oldFut != null)
return oldFut;
MappedName mappedName = cache.get(item.typeId());
assert mappedName != null;
//double check whether mapping is accepted, first check was in MarshallerContextImpl::registerClassName
if (mappedName.accepted())
fut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
return fut;
}
/**
* @param item Item.
* @param cache Cache.
*/
public GridFutureAdapter<MappingExchangeResult> proposeMapping(MarshallerMappingItem item, ConcurrentMap<Integer, MappedName> cache) throws IgniteCheckedException {
GridFutureAdapter<MappingExchangeResult> fut = new MappingExchangeResultFuture(item);
GridFutureAdapter<MappingExchangeResult> oldFut = mappingExchSyncMap.putIfAbsent(item, fut);
if (oldFut != null)
return oldFut;
else {
//double check, first check was in caller: MarshallerContextImpl::registerClassName
MappedName mapping = cache.get(item.typeId());
if (mapping != null) {
String mappedClsName = mapping.className();
if (!mappedClsName.equals(item.className()))
fut.onDone(MappingExchangeResult.createFailureResult(duplicateMappingException(item, mappedClsName)));
else if (mapping.accepted())
fut.onDone(MappingExchangeResult.createSuccessfulResult(mappedClsName));
else if (stopping)
fut.onDone(MappingExchangeResult.createExchangeDisabledResult());
return fut;
}
}
DiscoveryCustomMessage msg = new MappingProposedMessage(item, discoMgr.localNode().id());
discoMgr.sendCustomEvent(msg);
return fut;
}
/**
* @param item Item.
* @param cache Cache.
*/
public GridFutureAdapter<MappingExchangeResult> requestMapping(
MarshallerMappingItem item,
ConcurrentMap<Integer, MappedName> cache
) {
ClientRequestFuture newFut = new ClientRequestFuture(ctx, item, clientReqSyncMap);
ClientRequestFuture oldFut = clientReqSyncMap.putIfAbsent(item, newFut);
if (oldFut != null)
return oldFut;
MappedName mappedName = cache.get(item.typeId());
if (mappedName != null) {
newFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
return newFut;
}
newFut.requestMapping();
return newFut;
}
/**
* @param item Item.
* @param mappedClsName Mapped class name.
*/
private IgniteCheckedException duplicateMappingException(MarshallerMappingItem item, String mappedClsName) {
return new IgniteCheckedException("Duplicate ID [platformId="
+ item.platformId()
+ ", typeId="
+ item.typeId()
+ ", oldCls="
+ mappedClsName
+ ", newCls="
+ item.className() + "]");
}
/** */
public void markStopping() {
stopping = true;
}
/** */
public boolean stopping() {
return stopping;
}
/**
* Future to wait for mapping exchange result to arrive. Removes itself from map when completed.
*/
private class MappingExchangeResultFuture extends GridFutureAdapter<MappingExchangeResult> {
/** */
private final MarshallerMappingItem mappingItem;
/**
* @param mappingItem Mapping item.
*/
private MappingExchangeResultFuture(MarshallerMappingItem mappingItem) {
this.mappingItem = mappingItem;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable MappingExchangeResult res, @Nullable Throwable err) {
assert res != null;
boolean done = super.onDone(res, null);
if (done)
mappingExchSyncMap.remove(mappingItem, this);
return done;
}
}
}