| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.managers.communication; |
| |
| import org.apache.ignite.*; |
| import org.apache.ignite.cluster.*; |
| import org.apache.ignite.events.*; |
| import org.apache.ignite.internal.*; |
| import org.apache.ignite.internal.direct.*; |
| import org.apache.ignite.internal.managers.*; |
| import org.apache.ignite.internal.managers.deployment.*; |
| import org.apache.ignite.internal.managers.eventstorage.*; |
| import org.apache.ignite.internal.processors.timeout.*; |
| import org.apache.ignite.internal.util.*; |
| import org.apache.ignite.internal.util.lang.*; |
| import org.apache.ignite.internal.util.tostring.*; |
| import org.apache.ignite.internal.util.typedef.*; |
| import org.apache.ignite.internal.util.typedef.internal.*; |
| import org.apache.ignite.lang.*; |
| import org.apache.ignite.marshaller.*; |
| import org.apache.ignite.plugin.extensions.communication.*; |
| import org.apache.ignite.spi.*; |
| import org.apache.ignite.spi.communication.*; |
| import org.apache.ignite.thread.*; |
| import org.jetbrains.annotations.*; |
| import org.jsr166.*; |
| |
| import java.io.*; |
| import java.util.*; |
| import java.util.Map.*; |
| import java.util.concurrent.*; |
| import java.util.concurrent.atomic.*; |
| import java.util.concurrent.locks.*; |
| |
| import static org.apache.ignite.events.EventType.*; |
| import static org.apache.ignite.internal.GridTopic.*; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; |
| import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.*; |
| import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.*; |
| |
| /** |
| * Grid communication manager. |
| */ |
| public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>> { |
| /** */ |
| public static volatile boolean TURBO_DEBUG_MODE; |
| |
| /** Empty array of message factories. */ |
| public static final MessageFactory[] EMPTY = {}; |
| |
| /** Max closed topics to store. */ |
| public static final int MAX_CLOSED_TOPICS = 10240; |
| |
| /** Listeners by topic. */ |
| private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>(); |
| |
| /** Disconnect listeners. */ |
| private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>(); |
| |
| /** Map of {@link IoPool}-s injected by Ignite plugins. */ |
| private final IoPool[] ioPools = new IoPool[128]; |
| |
| /** Public pool. */ |
| private ExecutorService pubPool; |
| |
| /** Internal P2P pool. */ |
| private ExecutorService p2pPool; |
| |
| /** Internal system pool. */ |
| private ExecutorService sysPool; |
| |
| /** Internal management pool. */ |
| private ExecutorService mgmtPool; |
| |
| /** Affinity assignment executor service. */ |
| private ExecutorService affPool; |
| |
| /** Utility cache pool. */ |
| private ExecutorService utilityCachePool; |
| |
| /** Marshaller cache pool. */ |
| private ExecutorService marshCachePool; |
| |
| /** Discovery listener. */ |
| private GridLocalEventListener discoLsnr; |
| |
| /** */ |
| private final ConcurrentMap<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> msgSetMap = |
| new ConcurrentHashMap8<>(); |
| |
| /** Local node ID. */ |
| private final UUID locNodeId; |
| |
| /** Discovery delay. */ |
| private final long discoDelay; |
| |
| /** Cache for messages that were received prior to discovery. */ |
| private final ConcurrentMap<UUID, ConcurrentLinkedDeque8<DelayedMessage>> waitMap = |
| new ConcurrentHashMap8<>(); |
| |
| /** Communication message listener. */ |
| private CommunicationListener<Serializable> commLsnr; |
| |
| /** Grid marshaller. */ |
| private final Marshaller marsh; |
| |
| /** Busy lock. */ |
| private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); |
| |
| /** Lock to sync maps access. */ |
| private final ReadWriteLock lock = new ReentrantReadWriteLock(); |
| |
| /** Fully started flag. When set to true, can send and receive messages. */ |
| private volatile boolean started; |
| |
| /** Closed topics. */ |
| private final GridBoundedConcurrentLinkedHashSet<Object> closedTopics = |
| new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_TOPICS, MAX_CLOSED_TOPICS, 0.75f, 256, |
| PER_SEGMENT_Q_OPTIMIZED_RMV); |
| |
| /** */ |
| private MessageFactory msgFactory; |
| |
| /** */ |
| private MessageFormatter formatter; |
| |
| /** Stopping flag. */ |
| private boolean stopping; |
| |
| /** |
| * @param ctx Grid kernal context. |
| */ |
| @SuppressWarnings("deprecation") |
| public GridIoManager(GridKernalContext ctx) { |
| super(ctx, ctx.config().getCommunicationSpi()); |
| |
| locNodeId = ctx.localNodeId(); |
| |
| discoDelay = ctx.config().getDiscoveryStartupDelay(); |
| |
| marsh = ctx.config().getMarshaller(); |
| } |
| |
| /** |
| * @return Message factory. |
| */ |
| public MessageFactory messageFactory() { |
| assert msgFactory != null; |
| |
| return msgFactory; |
| } |
| |
| /** |
| * @return Message writer factory. |
| */ |
| public MessageFormatter formatter() { |
| assert formatter != null; |
| |
| return formatter; |
| } |
| |
| /** |
| * Resets metrics for this manager. |
| */ |
| public void resetMetrics() { |
| getSpi().resetMetrics(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("deprecation") |
| @Override public void start() throws IgniteCheckedException { |
| assertParameter(discoDelay > 0, "discoveryStartupDelay > 0"); |
| |
| startSpi(); |
| |
| pubPool = ctx.getExecutorService(); |
| p2pPool = ctx.getPeerClassLoadingExecutorService(); |
| sysPool = ctx.getSystemExecutorService(); |
| mgmtPool = ctx.getManagementExecutorService(); |
| utilityCachePool = ctx.utilityCachePool(); |
| marshCachePool = ctx.marshallerCachePool(); |
| affPool = new IgniteThreadPoolExecutor( |
| "aff-" + ctx.gridName(), |
| 1, |
| 1, |
| 0, |
| new LinkedBlockingQueue<Runnable>()); |
| |
| getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() { |
| @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) { |
| try { |
| onMessage0(nodeId, (GridIoMessage)msg, msgC); |
| } |
| catch (ClassCastException ignored) { |
| U.error(log, "Communication manager received message of unknown type (will ignore): " + |
| msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " + |
| "which is illegal - make sure to send messages only via GridProjection API."); |
| } |
| } |
| |
| @Override public void onDisconnected(UUID nodeId) { |
| for (GridDisconnectListener lsnr : disconnectLsnrs) |
| lsnr.onNodeDisconnected(nodeId); |
| } |
| }); |
| |
| MessageFormatter[] formatterExt = ctx.plugins().extensions(MessageFormatter.class); |
| |
| if (formatterExt != null && formatterExt.length > 0) { |
| if (formatterExt.length > 1) |
| throw new IgniteCheckedException("More than one MessageFormatter extension is defined. Check your " + |
| "plugins configuration and make sure that only one of them provides custom message format."); |
| |
| formatter = formatterExt[0]; |
| } |
| else { |
| formatter = new MessageFormatter() { |
| @Override public MessageWriter writer() { |
| return new DirectMessageWriter(); |
| } |
| |
| @Override public MessageReader reader(MessageFactory factory) { |
| return new DirectMessageReader(msgFactory, this); |
| } |
| }; |
| } |
| |
| MessageFactory[] msgs = ctx.plugins().extensions(MessageFactory.class); |
| |
| if (msgs == null) |
| msgs = EMPTY; |
| |
| List<MessageFactory> compMsgs = new ArrayList<>(); |
| |
| for (IgniteComponentType compType : IgniteComponentType.values()) { |
| MessageFactory f = compType.messageFactory(); |
| |
| if (f != null) |
| compMsgs.add(f); |
| } |
| |
| if (!compMsgs.isEmpty()) |
| msgs = F.concat(msgs, compMsgs.toArray(new MessageFactory[compMsgs.size()])); |
| |
| msgFactory = new GridIoMessageFactory(msgs); |
| |
| if (log.isDebugEnabled()) |
| log.debug(startInfo()); |
| |
| registerIoPoolExtensions(); |
| } |
| |
| /** |
| * Processes IO messaging pool extensions. |
| * @throws IgniteCheckedException On error. |
| */ |
| private void registerIoPoolExtensions() throws IgniteCheckedException { |
| // Process custom IO messaging pool extensions: |
| final IoPool[] executorExtensions = ctx.plugins().extensions(IoPool.class); |
| |
| if (executorExtensions != null) { |
| // Store it into the map and check for duplicates: |
| for (IoPool ex : executorExtensions) { |
| final byte id = ex.id(); |
| |
| // 1. Check the pool id is non-negative: |
| if (id < 0) |
| throw new IgniteCheckedException("Failed to register IO executor pool because its Id is negative " + |
| "[id=" + id + ']'); |
| |
| // 2. Check the pool id is in allowed range: |
| if (isReservedGridIoPolicy(id)) |
| throw new IgniteCheckedException("Failed to register IO executor pool because its Id in in the " + |
| "reserved range (0-31) [id=" + id + ']'); |
| |
| // 3. Check the pool for duplicates: |
| if (ioPools[id] != null) |
| throw new IgniteCheckedException("Failed to register IO executor pool because its " + |
| "Id as already used [id=" + id + ']'); |
| |
| ioPools[id] = ex; |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"}) |
| @Override public void onKernalStart0() throws IgniteCheckedException { |
| discoLsnr = new GridLocalEventListener() { |
| @SuppressWarnings({"TooBroadScope", "fallthrough"}) |
| @Override public void onEvent(Event evt) { |
| assert evt instanceof DiscoveryEvent : "Invalid event: " + evt; |
| |
| DiscoveryEvent discoEvt = (DiscoveryEvent)evt; |
| |
| UUID nodeId = discoEvt.eventNode().id(); |
| |
| switch (evt.type()) { |
| case EVT_NODE_JOINED: |
| assert waitMap.get(nodeId) == null; // We can't receive messages from undiscovered nodes. |
| |
| break; |
| |
| case EVT_NODE_LEFT: |
| case EVT_NODE_FAILED: |
| for (Map.Entry<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> e : |
| msgSetMap.entrySet()) { |
| ConcurrentMap<UUID, GridCommunicationMessageSet> map = e.getValue(); |
| |
| GridCommunicationMessageSet set; |
| |
| boolean empty; |
| |
| synchronized (map) { |
| set = map.remove(nodeId); |
| |
| empty = map.isEmpty(); |
| } |
| |
| if (set != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Removed message set due to node leaving grid: " + set); |
| |
| // Unregister timeout listener. |
| ctx.timeout().removeTimeoutObject(set); |
| |
| // Node may still send stale messages for this topic |
| // even after discovery notification is done. |
| closedTopics.add(set.topic()); |
| } |
| |
| if (empty) |
| msgSetMap.remove(e.getKey(), map); |
| } |
| |
| // Clean up delayed and ordered messages (need exclusive lock). |
| lock.writeLock().lock(); |
| |
| try { |
| ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Removed messages from discovery startup delay list " + |
| "(sender node left topology): " + waitList); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| |
| break; |
| |
| default: |
| assert false : "Unexpected event: " + evt; |
| } |
| } |
| }; |
| |
| ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| // Make sure that there are no stale messages due to window between communication |
| // manager start and kernal start. |
| // 1. Process wait list. |
| Collection<Collection<DelayedMessage>> delayedMsgs = new ArrayList<>(); |
| |
| lock.writeLock().lock(); |
| |
| try { |
| started = true; |
| |
| for (Entry<UUID, ConcurrentLinkedDeque8<DelayedMessage>> e : waitMap.entrySet()) { |
| if (ctx.discovery().node(e.getKey()) != null) { |
| ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(e.getKey()); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Processing messages from discovery startup delay list: " + waitList); |
| |
| if (waitList != null) |
| delayedMsgs.add(waitList); |
| } |
| } |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| |
| // After write lock released. |
| if (!delayedMsgs.isEmpty()) { |
| for (Collection<DelayedMessage> col : delayedMsgs) |
| for (DelayedMessage msg : col) |
| commLsnr.onMessage(msg.nodeId(), msg.message(), msg.callback()); |
| } |
| |
| // 2. Process messages sets. |
| for (Map.Entry<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> e : msgSetMap.entrySet()) { |
| ConcurrentMap<UUID, GridCommunicationMessageSet> map = e.getValue(); |
| |
| for (GridCommunicationMessageSet set : map.values()) { |
| if (ctx.discovery().node(set.nodeId()) == null) { |
| // All map modifications should be synced for consistency. |
| boolean rmv; |
| |
| synchronized (map) { |
| rmv = map.remove(set.nodeId(), set); |
| } |
| |
| if (rmv) { |
| if (log.isDebugEnabled()) |
| log.debug("Removed message set due to node leaving grid: " + set); |
| |
| // Unregister timeout listener. |
| ctx.timeout().removeTimeoutObject(set); |
| } |
| |
| } |
| } |
| |
| boolean rmv; |
| |
| synchronized (map) { |
| rmv = map.isEmpty(); |
| } |
| |
| if (rmv) { |
| msgSetMap.remove(e.getKey(), map); |
| |
| // Node may still send stale messages for this topic |
| // even after discovery notification is done. |
| closedTopics.add(e.getKey()); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("BusyWait") |
| @Override public void onKernalStop0(boolean cancel) { |
| // No more communication messages. |
| getSpi().setListener(null); |
| |
| boolean interrupted = false; |
| |
| // Busy wait is intentional. |
| while (true) { |
| try { |
| if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS)) |
| break; |
| else |
| Thread.sleep(200); |
| } |
| catch (InterruptedException ignore) { |
| // Preserve interrupt status & ignore. |
| // Note that interrupted flag is cleared. |
| interrupted = true; |
| } |
| } |
| |
| try { |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| |
| U.shutdownNow(getClass(), affPool, log); |
| |
| GridEventStorageManager evtMgr = ctx.event(); |
| |
| if (evtMgr != null && discoLsnr != null) |
| evtMgr.removeLocalEventListener(discoLsnr); |
| |
| stopping = true; |
| } |
| finally { |
| busyLock.writeUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop(boolean cancel) throws IgniteCheckedException { |
| stopSpi(); |
| |
| if (log.isDebugEnabled()) |
| log.debug(stopInfo()); |
| |
| Arrays.fill(ioPools, null); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Message bytes. |
| * @param msgC Closure to call when message processing finished. |
| */ |
| @SuppressWarnings("fallthrough") |
| private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) { |
| assert nodeId != null; |
| assert msg != null; |
| |
| busyLock.readLock(); |
| |
| try { |
| if (stopping) { |
| if (log.isDebugEnabled()) |
| log.debug("Received communication message while stopping (will ignore) [nodeId=" + |
| nodeId + ", msg=" + msg + ']'); |
| |
| return; |
| } |
| |
| // Check discovery. |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Ignoring message from dead node [senderId=" + nodeId + ", msg=" + msg + ']'); |
| |
| return; // We can't receive messages from non-discovered ones. |
| } |
| |
| if (msg.topic() == null) { |
| int topicOrd = msg.topicOrdinal(); |
| |
| msg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) : marsh.unmarshal(msg.topicBytes(), null)); |
| } |
| |
| if (!started) { |
| lock.readLock().lock(); |
| |
| try { |
| if (!started) { // Sets to true in write lock, so double checking. |
| // Received message before valid context is set to manager. |
| if (log.isDebugEnabled()) |
| log.debug("Adding message to waiting list [senderId=" + nodeId + |
| ", msg=" + msg + ']'); |
| |
| ConcurrentLinkedDeque8<DelayedMessage> list = |
| F.addIfAbsent(waitMap, nodeId, F.<DelayedMessage>newDeque()); |
| |
| assert list != null; |
| |
| list.add(new DelayedMessage(nodeId, msg, msgC)); |
| |
| return; |
| } |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| // If message is P2P, then process in P2P service. |
| // This is done to avoid extra waiting and potential deadlocks |
| // as thread pool may not have any available threads to give. |
| byte plc = msg.policy(); |
| |
| switch (plc) { |
| case P2P_POOL: { |
| processP2PMessage(nodeId, msg, msgC); |
| |
| break; |
| } |
| |
| case PUBLIC_POOL: |
| case SYSTEM_POOL: |
| case MANAGEMENT_POOL: |
| case AFFINITY_POOL: |
| case UTILITY_CACHE_POOL: |
| case MARSH_CACHE_POOL: |
| { |
| if (msg.isOrdered()) |
| processOrderedMessage(nodeId, msg, plc, msgC); |
| else |
| processRegularMessage(nodeId, msg, plc, msgC); |
| |
| break; |
| } |
| |
| default: |
| assert plc >= 0 : "Negative policy: " + plc; |
| |
| if (isReservedGridIoPolicy(plc)) |
| throw new IgniteCheckedException("Failed to process message with policy of reserved range. " + |
| "[policy=" + plc + ']'); |
| |
| if (msg.isOrdered()) |
| processOrderedMessage(nodeId, msg, plc, msgC); |
| else |
| processRegularMessage(nodeId, msg, plc, msgC); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to process message (will ignore): " + msg, e); |
| } |
| finally { |
| busyLock.readUnlock(); |
| } |
| } |
| |
| /** |
| * Gets execution pool for policy. |
| * |
| * @param plc Policy. |
| * @return Execution pool. |
| */ |
| private Executor pool(byte plc) throws IgniteCheckedException { |
| switch (plc) { |
| case P2P_POOL: |
| return p2pPool; |
| case SYSTEM_POOL: |
| return sysPool; |
| case PUBLIC_POOL: |
| return pubPool; |
| case MANAGEMENT_POOL: |
| return mgmtPool; |
| case AFFINITY_POOL: |
| return affPool; |
| |
| case UTILITY_CACHE_POOL: |
| assert utilityCachePool != null : "Utility cache pool is not configured."; |
| |
| return utilityCachePool; |
| |
| case MARSH_CACHE_POOL: |
| assert marshCachePool != null : "Marshaller cache pool is not configured."; |
| |
| return marshCachePool; |
| |
| default: { |
| assert plc >= 0 : "Negative policy: " + plc; |
| |
| if (isReservedGridIoPolicy(plc)) |
| throw new IgniteCheckedException("Failed to process message with policy of reserved" + |
| " range (0-31), [policy=" + plc + ']'); |
| |
| IoPool pool = ioPools[plc]; |
| |
| if (pool == null) |
| throw new IgniteCheckedException("Failed to process message because no pool is registered " + |
| "for policy. [policy=" + plc + ']'); |
| |
| assert plc == pool.id(); |
| |
| Executor ex = pool.executor(); |
| |
| if (ex == null) |
| throw new IgniteCheckedException("Failed to process message because corresponding executor " + |
| "is null. [id=" + plc + ']'); |
| |
| return ex; |
| } |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Message. |
| * @param msgC Closure to call when message processing finished. |
| */ |
| private void processP2PMessage( |
| final UUID nodeId, |
| final GridIoMessage msg, |
| final IgniteRunnable msgC |
| ) { |
| Runnable c = new Runnable() { |
| @Override public void run() { |
| try { |
| threadProcessingMessage(true); |
| |
| GridMessageListener lsnr = lsnrMap.get(msg.topic()); |
| |
| if (lsnr == null) |
| return; |
| |
| Object obj = msg.message(); |
| |
| assert obj != null; |
| |
| lsnr.onMessage(nodeId, obj); |
| } |
| finally { |
| threadProcessingMessage(false); |
| |
| msgC.run(); |
| } |
| } |
| }; |
| |
| try { |
| p2pPool.execute(c); |
| } |
| catch (RejectedExecutionException e) { |
| U.error(log, "Failed to process P2P message due to execution rejection. Increase the upper bound " + |
| "on 'ExecutorService' provided by 'IgniteConfiguration.getPeerClassLoadingThreadPoolSize()'. " + |
| "Will attempt to process message in the listener thread instead.", e); |
| |
| c.run(); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Message. |
| * @param plc Execution policy. |
| * @param msgC Closure to call when message processing finished. |
| */ |
| private void processRegularMessage( |
| final UUID nodeId, |
| final GridIoMessage msg, |
| byte plc, |
| final IgniteRunnable msgC |
| ) throws IgniteCheckedException { |
| Runnable c = new Runnable() { |
| @Override public void run() { |
| try { |
| threadProcessingMessage(true); |
| |
| processRegularMessage0(msg, nodeId); |
| } |
| finally { |
| threadProcessingMessage(false); |
| |
| msgC.run(); |
| } |
| } |
| }; |
| |
| try { |
| pool(plc).execute(c); |
| } |
| catch (RejectedExecutionException e) { |
| U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " + |
| "on 'ExecutorService' provided by 'IgniteConfiguration.getPublicThreadPoolSize()'. " + |
| "Will attempt to process message in the listener thread instead.", e); |
| |
| c.run(); |
| } |
| } |
| |
| /** |
| * @param msg Message. |
| * @param nodeId Node ID. |
| */ |
| @SuppressWarnings("deprecation") |
| private void processRegularMessage0(GridIoMessage msg, UUID nodeId) { |
| GridMessageListener lsnr = lsnrMap.get(msg.topic()); |
| |
| if (lsnr == null) |
| return; |
| |
| Object obj = msg.message(); |
| |
| assert obj != null; |
| |
| lsnr.onMessage(nodeId, obj); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Ordered message. |
| * @param plc Execution policy. |
| * @param msgC Closure to call when message processing finished ({@code null} for sync processing). |
| */ |
| @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") |
| private void processOrderedMessage( |
| final UUID nodeId, |
| final GridIoMessage msg, |
| final byte plc, |
| @Nullable final IgniteRunnable msgC |
| ) throws IgniteCheckedException { |
| assert msg != null; |
| |
| long timeout = msg.timeout(); |
| boolean skipOnTimeout = msg.skipOnTimeout(); |
| |
| boolean isNew = false; |
| |
| ConcurrentMap<UUID, GridCommunicationMessageSet> map; |
| |
| GridCommunicationMessageSet set = null; |
| |
| while (true) { |
| map = msgSetMap.get(msg.topic()); |
| |
| if (map == null) { |
| set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg, msgC); |
| |
| map = new ConcurrentHashMap0<>(); |
| |
| map.put(nodeId, set); |
| |
| ConcurrentMap<UUID, GridCommunicationMessageSet> old = msgSetMap.putIfAbsent( |
| msg.topic(), map); |
| |
| if (old != null) |
| map = old; |
| else { |
| isNew = true; |
| |
| // Put succeeded. |
| break; |
| } |
| } |
| |
| boolean rmv = false; |
| |
| synchronized (map) { |
| if (map.isEmpty()) |
| rmv = true; |
| else { |
| set = map.get(nodeId); |
| |
| if (set == null) { |
| GridCommunicationMessageSet old = map.putIfAbsent(nodeId, |
| set = new GridCommunicationMessageSet(plc, msg.topic(), |
| nodeId, timeout, skipOnTimeout, msg, msgC)); |
| |
| assert old == null; |
| |
| isNew = true; |
| |
| // Put succeeded. |
| break; |
| } |
| } |
| } |
| |
| if (rmv) |
| msgSetMap.remove(msg.topic(), map); |
| else { |
| assert set != null; |
| assert !isNew; |
| |
| set.add(msg, msgC); |
| |
| break; |
| } |
| } |
| |
| if (isNew && ctx.discovery().node(nodeId) == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Message is ignored as sender has left the grid: " + msg); |
| |
| assert map != null; |
| |
| boolean rmv; |
| |
| synchronized (map) { |
| map.remove(nodeId); |
| |
| rmv = map.isEmpty(); |
| } |
| |
| if (rmv) |
| msgSetMap.remove(msg.topic(), map); |
| |
| return; |
| } |
| |
| if (isNew && set.endTime() != Long.MAX_VALUE) |
| ctx.timeout().addTimeoutObject(set); |
| |
| final GridMessageListener lsnr = lsnrMap.get(msg.topic()); |
| |
| if (lsnr == null) { |
| if (closedTopics.contains(msg.topic())) { |
| if (log.isDebugEnabled()) |
| log.debug("Message is ignored as it came for the closed topic: " + msg); |
| |
| assert map != null; |
| |
| msgSetMap.remove(msg.topic(), map); |
| } |
| else if (log.isDebugEnabled()) { |
| // Note that we simply keep messages if listener is not |
| // registered yet, until one will be registered. |
| log.debug("Received message for unknown listener (messages will be kept until a " + |
| "listener is registered): " + msg); |
| } |
| |
| // Mark the message as processed, otherwise reading from the connection |
| // may stop. |
| if (msgC != null) |
| msgC.run(); |
| |
| return; |
| } |
| |
| if (msgC == null) { |
| // Message from local node can be processed in sync manner. |
| assert locNodeId.equals(nodeId) || TURBO_DEBUG_MODE; |
| |
| unwindMessageSet(set, lsnr); |
| |
| return; |
| } |
| |
| final GridCommunicationMessageSet msgSet0 = set; |
| |
| Runnable c = new Runnable() { |
| @Override public void run() { |
| try { |
| threadProcessingMessage(true); |
| |
| unwindMessageSet(msgSet0, lsnr); |
| } |
| finally { |
| threadProcessingMessage(false); |
| } |
| } |
| }; |
| |
| try { |
| pool(plc).execute(c); |
| } |
| catch (RejectedExecutionException e) { |
| U.error(log, "Failed to process ordered message due to execution rejection. " + |
| "Increase the upper bound on executor service provided by corresponding " + |
| "configuration property. Will attempt to process message in the listener " + |
| "thread instead [msgPlc=" + plc + ']', e); |
| |
| c.run(); |
| } |
| } |
| |
| /** |
| * @param msgSet Message set to unwind. |
| * @param lsnr Listener to notify. |
| */ |
| private void unwindMessageSet(GridCommunicationMessageSet msgSet, GridMessageListener lsnr) { |
| // Loop until message set is empty or |
| // another thread owns the reservation. |
| while (true) { |
| if (msgSet.reserve()) { |
| try { |
| msgSet.unwind(lsnr); |
| } |
| finally { |
| msgSet.release(); |
| } |
| |
| // Check outside of reservation block. |
| if (!msgSet.changed()) { |
| if (log.isDebugEnabled()) |
| log.debug("Message set has not been changed: " + msgSet); |
| |
| break; |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Another thread owns reservation: " + msgSet); |
| |
| return; |
| } |
| } |
| } |
| |
| /** |
| * @param node Destination node. |
| * @param topic Topic to send the message to. |
| * @param topicOrd GridTopic enumeration ordinal. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @param ordered Ordered flag. |
| * @param timeout Timeout. |
| * @param skipOnTimeout Whether message can be skipped on timeout. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| private void send( |
| ClusterNode node, |
| Object topic, |
| int topicOrd, |
| Message msg, |
| byte plc, |
| boolean ordered, |
| long timeout, |
| boolean skipOnTimeout |
| ) throws IgniteCheckedException { |
| assert node != null; |
| assert topic != null; |
| assert msg != null; |
| |
| GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); |
| |
| if (locNodeId.equals(node.id())) { |
| assert plc != P2P_POOL; |
| |
| CommunicationListener commLsnr = this.commLsnr; |
| |
| if (commLsnr == null) |
| throw new IgniteCheckedException("Trying to send message when grid is not fully started."); |
| |
| if (ordered) |
| processOrderedMessage(locNodeId, ioMsg, plc, null); |
| else |
| processRegularMessage0(ioMsg, locNodeId); |
| } |
| else { |
| if (topicOrd < 0) |
| ioMsg.topicBytes(marsh.marshal(topic)); |
| |
| try { |
| getSpi().sendMessage(node, ioMsg); |
| } |
| catch (IgniteSpiException e) { |
| throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + |
| "TCP connection cannot be established due to firewall issues) " + |
| "[node=" + node + ", topic=" + topic + |
| ", msg=" + msg + ", policy=" + plc + ']', e); |
| } |
| } |
| } |
| |
| /** |
| * This method can be used for debugging tricky concurrency issues |
| * with multi-nodes in single JVM. |
| * <p> |
| * This method eliminates network between nodes started in single JVM |
| * when {@link #TURBO_DEBUG_MODE} is set to {@code true}. |
| * <p> |
| * How to use it: |
| * <ol> |
| * <li>Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean)} |
| * with this method.</li> |
| * <li>Start all grids for your test, then set {@link #TURBO_DEBUG_MODE} to {@code true}.</li> |
| * <li>Perform test operations on the topology. No network will be there.</li> |
| * <li>DO NOT turn on turbo debug before all grids started. This will cause deadlocks.</li> |
| * </ol> |
| * |
| * @param node Destination node. |
| * @param topic Topic to send the message to. |
| * @param topicOrd GridTopic enumeration ordinal. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @param ordered Ordered flag. |
| * @param timeout Timeout. |
| * @param skipOnTimeout Whether message can be skipped on timeout. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| private void sendTurboDebug( |
| ClusterNode node, |
| Object topic, |
| int topicOrd, |
| Message msg, |
| byte plc, |
| boolean ordered, |
| long timeout, |
| boolean skipOnTimeout |
| ) throws IgniteCheckedException { |
| assert node != null; |
| assert topic != null; |
| assert msg != null; |
| |
| GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); |
| |
| IgniteKernal rmt; |
| |
| if (locNodeId.equals(node.id())) { |
| assert plc != P2P_POOL; |
| |
| CommunicationListener commLsnr = this.commLsnr; |
| |
| if (commLsnr == null) |
| throw new IgniteCheckedException("Trying to send message when grid is not fully started."); |
| |
| if (ordered) |
| processOrderedMessage(locNodeId, ioMsg, plc, null); |
| else |
| processRegularMessage0(ioMsg, locNodeId); |
| } |
| else if (TURBO_DEBUG_MODE && (rmt = IgnitionEx.gridxx(locNodeId)) != null) { |
| if (ioMsg.isOrdered()) |
| rmt.context().io().processOrderedMessage(locNodeId, ioMsg, ioMsg.policy(), null); |
| else |
| rmt.context().io().processRegularMessage0(ioMsg, locNodeId); |
| } |
| else { |
| if (topicOrd < 0) |
| ioMsg.topicBytes(marsh.marshal(topic)); |
| |
| try { |
| getSpi().sendMessage(node, ioMsg); |
| } |
| catch (IgniteSpiException e) { |
| throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + |
| "TCP connection cannot be established due to firewall issues) " + |
| "[node=" + node + ", topic=" + topic + |
| ", msg=" + msg + ", policy=" + plc + ']', e); |
| } |
| } |
| } |
| |
| /** |
| * @param nodeId Id of destination node. |
| * @param topic Topic to send the message to. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void send(UUID nodeId, Object topic, Message msg, byte plc) |
| throws IgniteCheckedException { |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node == null) |
| throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); |
| |
| send(node, topic, msg, plc); |
| } |
| |
| /** |
| * @param nodeId Id of destination node. |
| * @param topic Topic to send the message to. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| @SuppressWarnings("TypeMayBeWeakened") |
| public void send(UUID nodeId, GridTopic topic, Message msg, byte plc) |
| throws IgniteCheckedException { |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node == null) |
| throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); |
| |
| send(node, topic, topic.ordinal(), msg, plc, false, 0, false); |
| } |
| |
| /** |
| * @param node Destination node. |
| * @param topic Topic to send the message to. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void send(ClusterNode node, Object topic, Message msg, byte plc) |
| throws IgniteCheckedException { |
| send(node, topic, -1, msg, plc, false, 0, false); |
| } |
| |
| /** |
| * @param node Destination node. |
| * @param topic Topic to send the message to. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) |
| throws IgniteCheckedException { |
| send(node, topic, topic.ordinal(), msg, plc, false, 0, false); |
| } |
| |
| /** |
| * @param node Destination node. |
| * @param topic Topic to send the message to. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @param timeout Timeout to keep a message on receiving queue. |
| * @param skipOnTimeout Whether message can be skipped on timeout. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void sendOrderedMessage( |
| ClusterNode node, |
| Object topic, |
| Message msg, |
| byte plc, |
| long timeout, |
| boolean skipOnTimeout |
| ) throws IgniteCheckedException { |
| assert timeout > 0 || skipOnTimeout; |
| |
| send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); |
| } |
| |
| /** |
| * @param nodeId Destination node. |
| * @param topic Topic to send the message to. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @param timeout Timeout to keep a message on receiving queue. |
| * @param skipOnTimeout Whether message can be skipped on timeout. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void sendOrderedMessage( |
| UUID nodeId, |
| Object topic, |
| Message msg, |
| byte plc, |
| long timeout, |
| boolean skipOnTimeout |
| ) throws IgniteCheckedException { |
| assert timeout > 0 || skipOnTimeout; |
| |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node == null) |
| throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); |
| |
| send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); |
| } |
| |
| /** |
| * @param nodes Destination nodes. |
| * @param topic Topic to send the message to. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @param timeout Timeout to keep a message on receiving queue. |
| * @param skipOnTimeout Whether message can be skipped on timeout. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void sendOrderedMessage( |
| Collection<? extends ClusterNode> nodes, |
| Object topic, |
| Message msg, |
| byte plc, |
| long timeout, |
| boolean skipOnTimeout |
| ) |
| throws IgniteCheckedException { |
| assert timeout > 0 || skipOnTimeout; |
| |
| send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout); |
| } |
| |
| /** |
| * @param nodes Destination nodes. |
| * @param topic Topic to send the message to. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void send( |
| Collection<? extends ClusterNode> nodes, |
| Object topic, |
| Message msg, |
| byte plc |
| ) throws IgniteCheckedException { |
| send(nodes, topic, -1, msg, plc, false, 0, false); |
| } |
| |
| /** |
| * @param nodes Destination nodes. |
| * @param topic Topic to send the message to. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void send( |
| Collection<? extends ClusterNode> nodes, |
| GridTopic topic, |
| Message msg, |
| byte plc |
| ) throws IgniteCheckedException { |
| send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false); |
| } |
| |
| /** |
| * Sends a peer deployable user message. |
| * |
| * @param nodes Destination nodes. |
| * @param msg Message to send. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException { |
| sendUserMessage(nodes, msg, null, false, 0); |
| } |
| |
| /** |
| * Sends a peer deployable user message. |
| * |
| * @param nodes Destination nodes. |
| * @param msg Message to send. |
| * @param topic Message topic to use. |
| * @param ordered Is message ordered? |
| * @param timeout Message timeout in milliseconds for ordered messages. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg, |
| @Nullable Object topic, boolean ordered, long timeout) throws IgniteCheckedException { |
| boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId); |
| |
| byte[] serMsg = null; |
| byte[] serTopic = null; |
| |
| if (!loc) { |
| serMsg = marsh.marshal(msg); |
| |
| if (topic != null) |
| serTopic = marsh.marshal(topic); |
| } |
| |
| GridDeployment dep = null; |
| |
| String depClsName = null; |
| |
| if (ctx.config().isPeerClassLoadingEnabled()) { |
| Class<?> cls0 = U.detectClass(msg); |
| |
| if (U.isJdk(cls0) && topic != null) |
| cls0 = U.detectClass(topic); |
| |
| dep = ctx.deploy().deploy(cls0, U.detectClassLoader(cls0)); |
| |
| if (dep == null) |
| throw new IgniteDeploymentCheckedException("Failed to deploy user message: " + msg); |
| |
| depClsName = cls0.getName(); |
| } |
| |
| Message ioMsg = new GridIoUserMessage( |
| msg, |
| serMsg, |
| depClsName, |
| topic, |
| serTopic, |
| dep != null ? dep.classLoaderId() : null, |
| dep != null ? dep.deployMode() : null, |
| dep != null ? dep.userVersion() : null, |
| dep != null ? dep.participants() : null); |
| |
| if (ordered) |
| sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true); |
| else if (loc) |
| send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); |
| else { |
| ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId)); |
| |
| Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(locNodeId)); |
| |
| if (!rmtNodes.isEmpty()) |
| send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); |
| |
| // Will call local listeners in current thread synchronously, so must go the last |
| // to allow remote nodes execute the requested operation in parallel. |
| if (locNode != null) |
| send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); |
| } |
| } |
| |
| /** |
| * @param topic Topic to subscribe to. |
| * @param p Message predicate. |
| */ |
| public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) { |
| if (p != null) { |
| try { |
| if (p instanceof GridLifecycleAwareMessageFilter) |
| ((GridLifecycleAwareMessageFilter)p).initialize(ctx); |
| else |
| ctx.resource().injectGeneric(p); |
| |
| addMessageListener(TOPIC_COMM_USER, |
| new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p)); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| } |
| |
| /** |
| * @param topic Topic to unsubscribe from. |
| * @param p Message predicate. |
| */ |
| public void removeUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) { |
| try { |
| removeMessageListener(TOPIC_COMM_USER, |
| new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p)); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** |
| * @param nodes Destination nodes. |
| * @param topic Topic to send the message to. |
| * @param topicOrd Topic ordinal value. |
| * @param msg Message to send. |
| * @param plc Type of processing. |
| * @param ordered Ordered flag. |
| * @param timeout Message timeout. |
| * @param skipOnTimeout Whether message can be skipped in timeout. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| private void send( |
| Collection<? extends ClusterNode> nodes, |
| Object topic, |
| int topicOrd, |
| Message msg, |
| byte plc, |
| boolean ordered, |
| long timeout, |
| boolean skipOnTimeout |
| ) throws IgniteCheckedException { |
| assert nodes != null; |
| assert topic != null; |
| assert msg != null; |
| |
| if (!ordered) |
| assert F.find(nodes, null, F.localNode(locNodeId)) == null : |
| "Internal Ignite code should never call the method with local node in a node list."; |
| |
| try { |
| // Small optimization, as communication SPIs may have lighter implementation for sending |
| // messages to one node vs. many. |
| if (!nodes.isEmpty()) { |
| for (ClusterNode node : nodes) |
| send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + |
| msg + ", policy=" + plc + ']'); |
| } |
| catch (IgniteSpiException e) { |
| throw new IgniteCheckedException("Failed to send message (nodes may have left the grid or " + |
| "TCP connection cannot be established due to firewall issues) " + |
| "[nodes=" + nodes + ", topic=" + topic + |
| ", msg=" + msg + ", policy=" + plc + ']', e); |
| } |
| } |
| |
| /** |
| * @param topic Listener's topic. |
| * @param lsnr Listener to add. |
| */ |
| @SuppressWarnings({"TypeMayBeWeakened", "deprecation"}) |
| public void addMessageListener(GridTopic topic, GridMessageListener lsnr) { |
| addMessageListener((Object)topic, lsnr); |
| } |
| |
| /** |
| * @param lsnr Listener to add. |
| */ |
| public void addDisconnectListener(GridDisconnectListener lsnr) { |
| disconnectLsnrs.add(lsnr); |
| } |
| |
| /** |
| * @param lsnr Listener to remove. |
| */ |
| public void removeDisconnectListener(GridDisconnectListener lsnr) { |
| disconnectLsnrs.remove(lsnr); |
| } |
| |
| /** |
| * @param topic Listener's topic. |
| * @param lsnr Listener to add. |
| */ |
| @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"}) |
| public void addMessageListener(Object topic, final GridMessageListener lsnr) { |
| assert lsnr != null; |
| assert topic != null; |
| |
| // Make sure that new topic is not in the list of closed topics. |
| closedTopics.remove(topic); |
| |
| GridMessageListener lsnrs; |
| |
| for (;;) { |
| lsnrs = lsnrMap.putIfAbsent(topic, lsnr); |
| |
| if (lsnrs == null) { |
| lsnrs = lsnr; |
| |
| break; |
| } |
| |
| assert lsnrs != null; |
| |
| if (!(lsnrs instanceof ArrayListener)) { // We are putting the second listener, creating array. |
| GridMessageListener arrLsnr = new ArrayListener(lsnrs, lsnr); |
| |
| if (lsnrMap.replace(topic, lsnrs, arrLsnr)) { |
| lsnrs = arrLsnr; |
| |
| break; |
| } |
| } |
| else { |
| if (((ArrayListener)lsnrs).add(lsnr)) |
| break; |
| |
| // Add operation failed because array is already empty and is about to be removed, helping and retrying. |
| lsnrMap.remove(topic, lsnrs); |
| } |
| } |
| |
| Map<UUID, GridCommunicationMessageSet> map = msgSetMap.get(topic); |
| |
| Collection<GridCommunicationMessageSet> msgSets = map != null ? map.values() : null; |
| |
| if (msgSets != null) { |
| final GridMessageListener lsnrs0 = lsnrs; |
| |
| try { |
| for (final GridCommunicationMessageSet msgSet : msgSets) { |
| pool(msgSet.policy()).execute( |
| new Runnable() { |
| @Override public void run() { |
| unwindMessageSet(msgSet, lsnrs0); |
| } |
| }); |
| } |
| } |
| catch (RejectedExecutionException e) { |
| U.error(log, "Failed to process delayed message due to execution rejection. Increase the upper bound " + |
| "on executor service provided in 'IgniteConfiguration.getPublicThreadPoolSize()'). Will attempt to " + |
| "process message in the listener thread instead.", e); |
| |
| for (GridCommunicationMessageSet msgSet : msgSets) |
| unwindMessageSet(msgSet, lsnr); |
| } |
| catch (IgniteCheckedException ice) { |
| throw new IgniteException(ice); |
| } |
| } |
| } |
| |
| /** |
| * @param topic Message topic. |
| * @return Whether or not listener was indeed removed. |
| */ |
| public boolean removeMessageListener(GridTopic topic) { |
| return removeMessageListener((Object)topic); |
| } |
| |
| /** |
| * @param topic Message topic. |
| * @return Whether or not listener was indeed removed. |
| */ |
| public boolean removeMessageListener(Object topic) { |
| return removeMessageListener(topic, null); |
| } |
| |
| /** |
| * @param topic Listener's topic. |
| * @param lsnr Listener to remove. |
| * @return Whether or not the lsnr was removed. |
| */ |
| @SuppressWarnings("deprecation") |
| public boolean removeMessageListener(GridTopic topic, @Nullable GridMessageListener lsnr) { |
| return removeMessageListener((Object)topic, lsnr); |
| } |
| |
| /** |
| * @param topic Listener's topic. |
| * @param lsnr Listener to remove. |
| * @return Whether or not the lsnr was removed. |
| */ |
| @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"}) |
| public boolean removeMessageListener(Object topic, @Nullable GridMessageListener lsnr) { |
| assert topic != null; |
| |
| boolean rmv = true; |
| |
| Collection<GridCommunicationMessageSet> msgSets = null; |
| |
| // If listener is null, then remove all listeners. |
| if (lsnr == null) { |
| closedTopics.add(topic); |
| |
| lsnr = lsnrMap.remove(topic); |
| |
| rmv = lsnr != null; |
| |
| Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic); |
| |
| if (map != null) |
| msgSets = map.values(); |
| } |
| else { |
| for (;;) { |
| GridMessageListener lsnrs = lsnrMap.get(topic); |
| |
| // If removing listener before subscription happened. |
| if (lsnrs == null) { |
| closedTopics.add(topic); |
| |
| Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic); |
| |
| if (map != null) |
| msgSets = map.values(); |
| |
| rmv = false; |
| |
| break; |
| } |
| else { |
| boolean empty = false; |
| |
| if (!(lsnrs instanceof ArrayListener)) { |
| if (lsnrs.equals(lsnr)) { |
| if (!lsnrMap.remove(topic, lsnrs)) |
| continue; // Retry because it can be packed to array listener. |
| |
| empty = true; |
| } |
| else |
| rmv = false; |
| } |
| else { |
| ArrayListener arrLsnr = (ArrayListener)lsnrs; |
| |
| if (arrLsnr.remove(lsnr)) |
| empty = arrLsnr.isEmpty(); |
| else |
| // Listener was not found. |
| rmv = false; |
| |
| if (empty) |
| lsnrMap.remove(topic, lsnrs); |
| } |
| |
| // If removing last subscribed listener. |
| if (empty) { |
| closedTopics.add(topic); |
| |
| Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic); |
| |
| if (map != null) |
| msgSets = map.values(); |
| } |
| |
| break; |
| } |
| } |
| } |
| |
| if (msgSets != null) |
| for (GridCommunicationMessageSet msgSet : msgSets) |
| ctx.timeout().removeTimeoutObject(msgSet); |
| |
| if (rmv && log.isDebugEnabled()) |
| log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']'); |
| |
| if (lsnr instanceof ArrayListener) |
| { |
| for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr) |
| closeListener(childLsnr); |
| } |
| else |
| closeListener(lsnr); |
| |
| return rmv; |
| } |
| |
| /** |
| * Closes a listener, if applicable. |
| * @param lsnr Listener. |
| */ |
| private void closeListener(GridMessageListener lsnr) { |
| if (lsnr instanceof GridUserMessageListener) { |
| GridUserMessageListener userLsnr = (GridUserMessageListener)lsnr; |
| |
| if (userLsnr.predLsnr instanceof GridLifecycleAwareMessageFilter) |
| ((GridLifecycleAwareMessageFilter)userLsnr.predLsnr).close(); |
| } |
| } |
| |
| /** |
| * Gets sent messages count. |
| * |
| * @return Sent messages count. |
| */ |
| public int getSentMessagesCount() { |
| return getSpi().getSentMessagesCount(); |
| } |
| |
| /** |
| * Gets sent bytes count. |
| * |
| * @return Sent bytes count. |
| */ |
| public long getSentBytesCount() { |
| return getSpi().getSentBytesCount(); |
| } |
| |
| /** |
| * Gets received messages count. |
| * |
| * @return Received messages count. |
| */ |
| public int getReceivedMessagesCount() { |
| return getSpi().getReceivedMessagesCount(); |
| } |
| |
| /** |
| * Gets received bytes count. |
| * |
| * @return Received bytes count. |
| */ |
| public long getReceivedBytesCount() { |
| return getSpi().getReceivedBytesCount(); |
| } |
| |
| /** |
| * Gets outbound messages queue size. |
| * |
| * @return Outbound messages queue size. |
| */ |
| public int getOutboundMessagesQueueSize() { |
| return getSpi().getOutboundMessagesQueueSize(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void printMemoryStats() { |
| X.println(">>>"); |
| X.println(">>> IO manager memory stats [grid=" + ctx.gridName() + ']'); |
| X.println(">>> lsnrMapSize: " + lsnrMap.size()); |
| X.println(">>> msgSetMapSize: " + msgSetMap.size()); |
| X.println(">>> closedTopicsSize: " + closedTopics.sizex()); |
| X.println(">>> discoWaitMapSize: " + waitMap.size()); |
| } |
| |
| /** |
| * Linked chain of listeners. |
| */ |
| private static class ArrayListener implements GridMessageListener { |
| /** */ |
| private volatile GridMessageListener[] arr; |
| |
| /** |
| * @param arr Array of listeners. |
| */ |
| ArrayListener(GridMessageListener... arr) { |
| this.arr = arr; |
| } |
| |
| /** |
| * Passes message to the whole chain. |
| * |
| * @param nodeId Node ID. |
| * @param msg Message. |
| */ |
| @Override public void onMessage(UUID nodeId, Object msg) { |
| GridMessageListener[] arr0 = arr; |
| |
| if (arr0 == null) |
| return; |
| |
| for (GridMessageListener l : arr0) |
| l.onMessage(nodeId, msg); |
| } |
| |
| /** |
| * @return {@code true} If this instance is empty. |
| */ |
| boolean isEmpty() { |
| return arr == null; |
| } |
| |
| /** |
| * @param l Listener. |
| * @return {@code true} If listener was removed. |
| */ |
| synchronized boolean remove(GridMessageListener l) { |
| GridMessageListener[] arr0 = arr; |
| |
| if (arr0 == null) |
| return false; |
| |
| if (arr0.length == 1) { |
| if (!arr0[0].equals(l)) |
| return false; |
| |
| arr = null; |
| |
| return true; |
| } |
| |
| for (int i = 0; i < arr0.length; i++) { |
| if (arr0[i].equals(l)) { |
| int newLen = arr0.length - 1; |
| |
| if (i == newLen) // Remove last. |
| arr = Arrays.copyOf(arr0, newLen); |
| else { |
| GridMessageListener[] arr1 = new GridMessageListener[newLen]; |
| |
| if (i != 0) // Not remove first. |
| System.arraycopy(arr0, 0, arr1, 0, i); |
| |
| System.arraycopy(arr0, i + 1, arr1, i, newLen - i); |
| |
| arr = arr1; |
| } |
| |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param l Listener. |
| * @return {@code true} if listener was added. Add can fail if this instance is empty and is about to be removed |
| * from map. |
| */ |
| synchronized boolean add(GridMessageListener l) { |
| GridMessageListener[] arr0 = arr; |
| |
| if (arr0 == null) |
| return false; |
| |
| int oldLen = arr0.length; |
| |
| arr0 = Arrays.copyOf(arr0, oldLen + 1); |
| |
| arr0[oldLen] = l; |
| |
| arr = arr0; |
| |
| return true; |
| } |
| } |
| |
| /** |
| * This class represents a message listener wrapper that knows about peer deployment. |
| */ |
| private class GridUserMessageListener implements GridMessageListener { |
| /** Predicate listeners. */ |
| private final IgniteBiPredicate<UUID, Object> predLsnr; |
| |
| /** User message topic. */ |
| private final Object topic; |
| |
| /** |
| * @param topic User topic. |
| * @param predLsnr Predicate listener. |
| * @throws IgniteCheckedException If failed to inject resources to predicates. |
| */ |
| GridUserMessageListener(@Nullable Object topic, @Nullable IgniteBiPredicate<UUID, Object> predLsnr) |
| throws IgniteCheckedException { |
| this.topic = topic; |
| this.predLsnr = predLsnr; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions", |
| "OverlyStrongTypeCast"}) |
| @Override public void onMessage(UUID nodeId, Object msg) { |
| if (!(msg instanceof GridIoUserMessage)) { |
| U.error(log, "Received unknown message (potentially fatal problem): " + msg); |
| |
| return; |
| } |
| |
| GridIoUserMessage ioMsg = (GridIoUserMessage)msg; |
| |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node == null) { |
| U.warn(log, "Failed to resolve sender node (did the node left grid?): " + nodeId); |
| |
| return; |
| } |
| |
| busyLock.readLock(); |
| |
| try { |
| if (stopping) { |
| if (log.isDebugEnabled()) |
| log.debug("Received user message while stopping (will ignore) [nodeId=" + |
| nodeId + ", msg=" + msg + ']'); |
| |
| return; |
| } |
| |
| Object msgBody = ioMsg.body(); |
| |
| assert msgBody != null || ioMsg.bodyBytes() != null; |
| |
| try { |
| byte[] msgTopicBytes = ioMsg.topicBytes(); |
| |
| Object msgTopic = ioMsg.topic(); |
| |
| GridDeployment dep = ioMsg.deployment(); |
| |
| if (dep == null && ctx.config().isPeerClassLoadingEnabled() && |
| ioMsg.deploymentClassName() != null) { |
| dep = ctx.deploy().getGlobalDeployment( |
| ioMsg.deploymentMode(), |
| ioMsg.deploymentClassName(), |
| ioMsg.deploymentClassName(), |
| ioMsg.userVersion(), |
| nodeId, |
| ioMsg.classLoaderId(), |
| ioMsg.loaderParticipants(), |
| null); |
| |
| if (dep == null) |
| throw new IgniteDeploymentCheckedException( |
| "Failed to obtain deployment information for user message. " + |
| "If you are using custom message or topic class, try implementing " + |
| "GridPeerDeployAware interface. [msg=" + ioMsg + ']'); |
| |
| ioMsg.deployment(dep); // Cache deployment. |
| } |
| |
| // Unmarshall message topic if needed. |
| if (msgTopic == null && msgTopicBytes != null) { |
| msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null); |
| |
| ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings. |
| } |
| |
| if (!F.eq(topic, msgTopic)) |
| return; |
| |
| if (msgBody == null) { |
| msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null); |
| |
| ioMsg.body(msgBody); // Save body to avoid future unmarshallings. |
| } |
| |
| // Resource injection. |
| if (dep != null) |
| ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" + |
| msg + ']', e); |
| } |
| |
| if (msgBody != null) { |
| if (predLsnr != null) { |
| if (!predLsnr.apply(nodeId, msgBody)) |
| removeMessageListener(TOPIC_COMM_USER, this); |
| } |
| } |
| } |
| finally { |
| busyLock.readUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| |
| if (o == null || getClass() != o.getClass()) |
| return false; |
| |
| GridUserMessageListener l = (GridUserMessageListener)o; |
| |
| return F.eq(predLsnr, l.predLsnr) && F.eq(topic, l.topic); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| int res = predLsnr != null ? predLsnr.hashCode() : 0; |
| |
| res = 31 * res + (topic != null ? topic.hashCode() : 0); |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridUserMessageListener.class, this); |
| } |
| } |
| |
| /** |
| * Ordered communication message set. |
| */ |
| private class GridCommunicationMessageSet implements GridTimeoutObject { |
| /** */ |
| private final UUID nodeId; |
| |
| /** */ |
| private long endTime; |
| |
| /** */ |
| private final IgniteUuid timeoutId; |
| |
| /** */ |
| @GridToStringInclude |
| private final Object topic; |
| |
| /** */ |
| private final byte plc; |
| |
| /** */ |
| @GridToStringInclude |
| private final Queue<GridTuple3<GridIoMessage, Long, IgniteRunnable>> msgs = new ConcurrentLinkedDeque<>(); |
| |
| /** */ |
| private final AtomicBoolean reserved = new AtomicBoolean(); |
| |
| /** */ |
| private final long timeout; |
| |
| /** */ |
| private final boolean skipOnTimeout; |
| |
| /** */ |
| private long lastTs; |
| |
| /** |
| * @param plc Communication policy. |
| * @param topic Communication topic. |
| * @param nodeId Node ID. |
| * @param timeout Timeout. |
| * @param skipOnTimeout Whether message can be skipped on timeout. |
| * @param msg Message to add immediately. |
| * @param msgC Message closure (may be {@code null}). |
| */ |
| GridCommunicationMessageSet( |
| byte plc, |
| Object topic, |
| UUID nodeId, |
| long timeout, |
| boolean skipOnTimeout, |
| GridIoMessage msg, |
| @Nullable IgniteRunnable msgC |
| ) { |
| assert nodeId != null; |
| assert topic != null; |
| assert msg != null; |
| |
| this.plc = plc; |
| this.nodeId = nodeId; |
| this.topic = topic; |
| this.timeout = timeout == 0 ? ctx.config().getNetworkTimeout() : timeout; |
| this.skipOnTimeout = skipOnTimeout; |
| |
| endTime = endTime(timeout); |
| |
| timeoutId = IgniteUuid.randomUuid(); |
| |
| lastTs = U.currentTimeMillis(); |
| |
| msgs.add(F.t(msg, lastTs, msgC)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteUuid timeoutId() { |
| return timeoutId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long endTime() { |
| return endTime; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") |
| @Override public void onTimeout() { |
| GridMessageListener lsnr = lsnrMap.get(topic); |
| |
| if (lsnr != null) { |
| long delta = 0; |
| |
| if (skipOnTimeout) { |
| while (true) { |
| delta = 0; |
| |
| boolean unwind = false; |
| |
| synchronized (this) { |
| if (!msgs.isEmpty()) { |
| delta = U.currentTimeMillis() - lastTs; |
| |
| if (delta >= timeout) |
| unwind = true; |
| } |
| } |
| |
| if (unwind) |
| unwindMessageSet(this, lsnr); |
| else |
| break; |
| } |
| } |
| |
| // Someone is still listening to messages, so delay set removal. |
| endTime = endTime(timeout - delta); |
| |
| ctx.timeout().addTimeoutObject(this); |
| |
| return; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Removing message set due to timeout: " + this); |
| |
| ConcurrentMap<UUID, GridCommunicationMessageSet> map = msgSetMap.get(topic); |
| |
| if (map != null) { |
| boolean rmv; |
| |
| synchronized (map) { |
| rmv = map.remove(nodeId, this) && map.isEmpty(); |
| } |
| |
| if (rmv) |
| msgSetMap.remove(topic, map); |
| } |
| } |
| |
| /** |
| * @return ID of node that sent the messages in the set. |
| */ |
| UUID nodeId() { |
| return nodeId; |
| } |
| |
| /** |
| * @return Communication policy. |
| */ |
| byte policy() { |
| return plc; |
| } |
| |
| /** |
| * @return Message topic. |
| */ |
| Object topic() { |
| return topic; |
| } |
| |
| /** |
| * @return {@code True} if successful. |
| */ |
| boolean reserve() { |
| return reserved.compareAndSet(false, true); |
| } |
| |
| /** |
| * @return {@code True} if set is reserved. |
| */ |
| boolean reserved() { |
| return reserved.get(); |
| } |
| |
| /** |
| * Releases reservation. |
| */ |
| void release() { |
| assert reserved.get() : "Message set was not reserved: " + this; |
| |
| reserved.set(false); |
| } |
| |
| /** |
| * @param lsnr Listener to notify. |
| */ |
| void unwind(GridMessageListener lsnr) { |
| assert reserved.get(); |
| |
| for (GridTuple3<GridIoMessage, Long, IgniteRunnable> t = msgs.poll(); t != null; t = msgs.poll()) { |
| try { |
| lsnr.onMessage( |
| nodeId, |
| t.get1().message()); |
| } |
| finally { |
| if (t.get3() != null) |
| t.get3().run(); |
| } |
| } |
| } |
| |
| /** |
| * @param msg Message to add. |
| * @param msgC Message closure (may be {@code null}). |
| */ |
| void add( |
| GridIoMessage msg, |
| @Nullable IgniteRunnable msgC |
| ) { |
| msgs.add(F.t(msg, U.currentTimeMillis(), msgC)); |
| } |
| |
| /** |
| * @return {@code True} if set has messages to unwind. |
| */ |
| boolean changed() { |
| return !msgs.isEmpty(); |
| } |
| |
| /** |
| * Calculates end time with overflow check. |
| * |
| * @param timeout Timeout in milliseconds. |
| * @return End time in milliseconds. |
| */ |
| private long endTime(long timeout) { |
| long endTime = U.currentTimeMillis() + timeout; |
| |
| // Account for overflow. |
| if (endTime < 0) |
| endTime = Long.MAX_VALUE; |
| |
| return endTime; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridCommunicationMessageSet.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class ConcurrentHashMap0<K, V> extends ConcurrentHashMap8<K, V> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private int hash; |
| |
| /** |
| * @param o Object to be compared for equality with this map. |
| * @return {@code True} only for {@code this}. |
| */ |
| @Override public boolean equals(Object o) { |
| return o == this; |
| } |
| |
| /** |
| * @return Identity hash code. |
| */ |
| @Override public int hashCode() { |
| if (hash == 0) { |
| int hash0 = System.identityHashCode(this); |
| |
| hash = hash0 != 0 ? hash0 : -1; |
| } |
| |
| return hash; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class DelayedMessage { |
| /** */ |
| private final UUID nodeId; |
| |
| /** */ |
| private final GridIoMessage msg; |
| |
| /** */ |
| private final IgniteRunnable msgC; |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Message. |
| * @param msgC Callback. |
| */ |
| private DelayedMessage(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) { |
| this.nodeId = nodeId; |
| this.msg = msg; |
| this.msgC = msgC; |
| } |
| |
| /** |
| * @return Message char. |
| */ |
| public IgniteRunnable callback() { |
| return msgC; |
| } |
| |
| /** |
| * @return Message. |
| */ |
| public GridIoMessage message() { |
| return msg; |
| } |
| |
| /** |
| * @return Node id. |
| */ |
| public UUID nodeId() { |
| return nodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(DelayedMessage.class, this, super.toString()); |
| } |
| } |
| } |