blob: f6895734545a52bc8bddcd55fa86fed4d52fa83d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.marshaller;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC;
import static org.apache.ignite.internal.GridTopic.TOPIC_MAPPING_MARSH;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
/**
* Processor responsible for managing custom {@link DiscoveryCustomMessage}
* events for exchanging marshalling mappings between nodes in grid.
*
* In particular it processes two flows:
* <ul>
* <li>
* Some node, server or client, wants to add new mapping for some class.
* In that case a pair of {@link MappingProposedMessage} and {@link MappingAcceptedMessage} events is used.
* </li>
* <li>
* As discovery events are delivered to clients asynchronously,
* client node may not have some mapping when server nodes in the grid are already allowed to use the mapping.
* In that situation client sends a {@link MissingMappingRequestMessage} request
* and processor handles it as well as {@link MissingMappingResponseMessage} message.
* </li>
* </ul>
*/
public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
/** */
private final MarshallerContextImpl marshallerCtx;
/** */
private final GridClosureProcessor closProc;
/** */
private final List<MappingUpdatedListener> mappingUpdatedLsnrs = new CopyOnWriteArrayList<>();
/** */
private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchangeSyncMap
= new ConcurrentHashMap<>();
/** */
private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap = new ConcurrentHashMap<>();
/**
* @param ctx Kernal context.
*/
public GridMarshallerMappingProcessor(GridKernalContext ctx) {
super(ctx);
marshallerCtx = ctx.marshallerContext();
closProc = ctx.closure();
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
GridDiscoveryManager discoMgr = ctx.discovery();
GridIoManager ioMgr = ctx.io();
MarshallerMappingTransport transport = new MarshallerMappingTransport(
ctx,
mappingExchangeSyncMap,
clientReqSyncMap
);
marshallerCtx.onMarshallerProcessorStarted(ctx, transport);
discoMgr.setCustomEventListener(MappingProposedMessage.class, new MappingProposedListener());
discoMgr.setCustomEventListener(MappingAcceptedMessage.class, new MappingAcceptedListener());
if (ctx.clientNode())
ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingResponseListener());
else
ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingRequestListener(ioMgr));
if (ctx.clientNode())
ctx.event().addLocalEventListener(new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
DiscoveryEvent evt0 = (DiscoveryEvent)evt;
if (!ctx.isStopping()) {
for (ClientRequestFuture fut : clientReqSyncMap.values())
fut.onNodeLeft(evt0.eventNode().id());
}
}
}, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
/**
* Adds a listener to be notified when mapping changes.
*
* @param lsnr listener for mapping updated events.
*/
public void addMappingUpdatedListener(MappingUpdatedListener lsnr) {
mappingUpdatedLsnrs.add(lsnr);
}
/**
* Gets an iterator over all current mappings.
*
* @return Iterator over current mappings.
*/
public Iterator<Map.Entry<Byte, Map<Integer, String>>> currentMappings() {
return marshallerCtx.currentMappings();
}
/**
*
*/
private final class MissingMappingRequestListener implements GridMessageListener {
/** */
private final GridIoManager ioMgr;
/**
* @param ioMgr Io manager.
*/
MissingMappingRequestListener(GridIoManager ioMgr) {
this.ioMgr = ioMgr;
}
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MissingMappingRequestMessage : msg;
MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage)msg;
byte platformId = msg0.platformId();
int typeId = msg0.typeId();
String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
try {
ioMgr.sendToGridTopic(
nodeId,
TOPIC_MAPPING_MARSH,
new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),
SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send missing mapping response, node failed: " + nodeId);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send missing mapping response.", e);
}
}
}
/**
*
*/
private final class MissingMappingResponseListener implements GridMessageListener {
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MissingMappingResponseMessage : msg;
MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage)msg;
byte platformId = msg0.platformId();
int typeId = msg0.typeId();
String resolvedClsName = msg0.className();
MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, null);
GridFutureAdapter<MappingExchangeResult> fut = clientReqSyncMap.get(item);
if (fut != null) {
if (resolvedClsName != null) {
marshallerCtx.onMappingAccepted(new MarshallerMappingItem(platformId, typeId, resolvedClsName));
fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName));
}
else
fut.onDone(MappingExchangeResult.createFailureResult(
new IgniteCheckedException(
"Failed to resolve mapping [platformId: "
+ platformId
+ ", typeId: "
+ typeId + "]")));
}
}
}
/**
*
*/
private final class MappingProposedListener implements CustomEventListener<MappingProposedMessage> {
/** {@inheritDoc} */
@Override public void onCustomEvent(
AffinityTopologyVersion topVer,
ClusterNode snd,
MappingProposedMessage msg
) {
if (!ctx.isStopping()) {
if (msg.duplicated())
return;
if (!msg.inConflict()) {
MarshallerMappingItem item = msg.mappingItem();
MappedName existingName = marshallerCtx.onMappingProposed(item);
if (existingName != null) {
String existingClsName = existingName.className();
if (existingClsName.equals(item.className()) && !existingName.accepted())
msg.markDuplicated();
else if (!existingClsName.equals(item.className()))
msg.conflictingWithClass(existingClsName);
}
}
else {
UUID origNodeId = msg.origNodeId();
if (origNodeId.equals(ctx.localNodeId())) {
GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(msg.mappingItem());
assert fut != null : msg;
fut.onDone(MappingExchangeResult.createFailureResult(
duplicateMappingException(msg.mappingItem(), msg.conflictingClassName())));
}
}
}
}
/**
* @param mappingItem Mapping item.
* @param conflictingClsName Conflicting class name.
*/
private IgniteCheckedException duplicateMappingException(
MarshallerMappingItem mappingItem,
String conflictingClsName
) {
return new IgniteCheckedException("Duplicate ID [platformId="
+ mappingItem.platformId()
+ ", typeId="
+ mappingItem.typeId()
+ ", oldCls="
+ conflictingClsName
+ ", newCls="
+ mappingItem.className() + "]");
}
}
/**
*
*/
private final class MappingAcceptedListener implements CustomEventListener<MappingAcceptedMessage> {
/** {@inheritDoc} */
@Override public void onCustomEvent(
AffinityTopologyVersion topVer,
ClusterNode snd,
MappingAcceptedMessage msg
) {
final MarshallerMappingItem item = msg.getMappingItem();
marshallerCtx.onMappingAccepted(item);
closProc.runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
for (MappingUpdatedListener lsnr : mappingUpdatedLsnrs)
lsnr.mappingUpdated(item.platformId(), item.typeId(), item.className());
}
});
ClientRequestFuture rqFut = clientReqSyncMap.get(new MarshallerMappingItem(item.platformId(), item.typeId(), null));
if (rqFut != null)
rqFut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item);
if (fut != null)
fut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
}
}
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
dataBag.addJoiningNodeData(MARSHALLER_PROC.ordinal(), marshallerCtx.getCachedMappings());
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
if (!dataBag.commonDataCollectedFor(MARSHALLER_PROC.ordinal()))
dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(), marshallerCtx.getCachedMappings());
}
/** {@inheritDoc} */
@Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
List<Map<Integer, MappedName>> mappings = (List<Map<Integer, MappedName>>)data.joiningNodeData();
processIncomingMappings(mappings);
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
List<Map<Integer, MappedName>> mappings = (List<Map<Integer, MappedName>>)data.commonData();
processIncomingMappings(mappings);
}
/**
* @param mappings Incoming marshaller mappings.
*/
private void processIncomingMappings(List<Map<Integer, MappedName>> mappings) {
marshallerCtx.onMappingDataReceived(log, mappings);
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
cancelFutures(MappingExchangeResult.createFailureResult(new IgniteClientDisconnectedCheckedException(
ctx.cluster().clientReconnectFuture(),
"Failed to propose or request mapping, client node disconnected.")));
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
marshallerCtx.onMarshallerProcessorStop();
cancelFutures(MappingExchangeResult.createExchangeDisabledResult());
}
/** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
return MARSHALLER_PROC;
}
/**
* @param res Response.
*/
private void cancelFutures(MappingExchangeResult res) {
for (GridFutureAdapter<MappingExchangeResult> fut : mappingExchangeSyncMap.values())
fut.onDone(res);
for (GridFutureAdapter<MappingExchangeResult> fut : clientReqSyncMap.values())
fut.onDone(res);
}
}