| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.platform.messaging; |
| |
| import java.util.UUID; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteMessaging; |
| import org.apache.ignite.internal.binary.BinaryRawReaderEx; |
| import org.apache.ignite.internal.binary.BinaryRawWriterEx; |
| import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; |
| import org.apache.ignite.internal.processors.platform.PlatformContext; |
| import org.apache.ignite.internal.processors.platform.PlatformTarget; |
| import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; |
| import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; |
| import org.apache.ignite.lang.IgniteFuture; |
| |
| /** |
| * Interop messaging. |
| */ |
| public class PlatformMessaging extends PlatformAbstractTarget { |
| /** */ |
| public static final int OP_LOC_LISTEN = 1; |
| |
| /** */ |
| public static final int OP_REMOTE_LISTEN = 2; |
| |
| /** */ |
| public static final int OP_SEND = 3; |
| |
| /** */ |
| public static final int OP_SEND_MULTI = 4; |
| |
| /** */ |
| public static final int OP_SEND_ORDERED = 5; |
| |
| /** */ |
| public static final int OP_STOP_LOC_LISTEN = 6; |
| |
| /** */ |
| public static final int OP_STOP_REMOTE_LISTEN = 7; |
| |
| /** */ |
| public static final int OP_WITH_ASYNC = 8; |
| |
| /** */ |
| public static final int OP_REMOTE_LISTEN_ASYNC = 9; |
| |
| /** */ |
| public static final int OP_STOP_REMOTE_LISTEN_ASYNC = 10; |
| |
| /** */ |
| private final IgniteMessaging messaging; |
| |
| /** |
| * Ctor. |
| * |
| * @param platformCtx Context. |
| * @param messaging Ignite messaging. |
| */ |
| public PlatformMessaging(PlatformContext platformCtx, IgniteMessaging messaging) { |
| super(platformCtx); |
| |
| assert messaging != null; |
| |
| this.messaging = messaging; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) |
| throws IgniteCheckedException { |
| switch (type) { |
| case OP_SEND: |
| messaging.send(reader.readObjectDetached(), reader.readObjectDetached()); |
| |
| return TRUE; |
| |
| case OP_SEND_MULTI: |
| messaging.send(reader.readObjectDetached(), PlatformUtils.readCollection(reader)); |
| |
| return TRUE; |
| |
| case OP_SEND_ORDERED: |
| messaging.sendOrdered(reader.readObjectDetached(), reader.readObjectDetached(), reader.readLong()); |
| |
| return TRUE; |
| |
| case OP_LOC_LISTEN: { |
| PlatformMessageLocalFilter filter = new PlatformMessageLocalFilter(reader.readLong(), platformCtx); |
| |
| Object topic = reader.readObjectDetached(); |
| |
| messaging.localListen(topic, filter); |
| |
| return TRUE; |
| } |
| |
| case OP_STOP_LOC_LISTEN: { |
| PlatformMessageLocalFilter filter = new PlatformMessageLocalFilter(reader.readLong(), platformCtx); |
| |
| Object topic = reader.readObjectDetached(); |
| |
| messaging.stopLocalListen(topic, filter); |
| |
| return TRUE; |
| } |
| |
| case OP_STOP_REMOTE_LISTEN: { |
| messaging.stopRemoteListen(reader.readUuid()); |
| |
| return TRUE; |
| } |
| |
| case OP_REMOTE_LISTEN_ASYNC: { |
| readAndListenFuture(reader, startRemoteListenAsync(reader, messaging)); |
| |
| return TRUE; |
| } |
| |
| case OP_STOP_REMOTE_LISTEN_ASYNC: { |
| readAndListenFuture(reader, messaging.stopRemoteListenAsync(reader.readUuid())); |
| |
| return TRUE; |
| } |
| |
| default: |
| return super.processInStreamOutLong(type, reader); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) |
| throws IgniteCheckedException { |
| switch (type) { |
| case OP_REMOTE_LISTEN: { |
| writer.writeUuid(startRemoteListen(reader, messaging)); |
| |
| break; |
| } |
| |
| default: |
| super.processInStreamOutStream(type, reader, writer); |
| } |
| } |
| |
| /** |
| * Starts the remote listener. |
| * @param reader Reader. |
| * @param messaging Messaging. |
| * @return Listen id. |
| */ |
| private UUID startRemoteListen(BinaryRawReaderEx reader, IgniteMessaging messaging) { |
| Object nativeFilter = reader.readObjectDetached(); |
| |
| long ptr = reader.readLong(); // interop pointer |
| |
| Object topic = reader.readObjectDetached(); |
| |
| PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr); |
| |
| return messaging.remoteListen(topic, filter); |
| } |
| |
| /** |
| * Starts the remote listener. |
| * @param reader Reader. |
| * @param messaging Messaging. |
| * @return Future of the operation. |
| */ |
| private IgniteFuture<UUID> startRemoteListenAsync(BinaryRawReaderEx reader, IgniteMessaging messaging) { |
| Object nativeFilter = reader.readObjectDetached(); |
| |
| long ptr = reader.readLong(); // interop pointer |
| |
| Object topic = reader.readObjectDetached(); |
| |
| PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr); |
| |
| return messaging.remoteListenAsync(topic, filter); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { |
| switch (type) { |
| case OP_WITH_ASYNC: |
| if (messaging.isAsync()) |
| return this; |
| |
| return new PlatformMessaging(platformCtx, messaging.withAsync()); |
| } |
| |
| return super.processOutObject(type); |
| } |
| } |