| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.platform.events; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.UUID; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteEvents; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.events.EventAdapter; |
| import org.apache.ignite.internal.binary.BinaryRawReaderEx; |
| import org.apache.ignite.internal.binary.BinaryRawWriterEx; |
| import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; |
| import org.apache.ignite.internal.processors.platform.PlatformContext; |
| import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; |
| import org.apache.ignite.internal.processors.platform.PlatformTarget; |
| import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgnitePredicate; |
| |
| /** |
| * Interop events. |
| */ |
| @SuppressWarnings("unchecked") |
| public class PlatformEvents extends PlatformAbstractTarget { |
| /** */ |
| private static final int OP_REMOTE_QUERY = 1; |
| |
| /** */ |
| private static final int OP_REMOTE_LISTEN = 2; |
| |
| /** */ |
| private static final int OP_STOP_REMOTE_LISTEN = 3; |
| |
| /** */ |
| private static final int OP_WAIT_FOR_LOCAL = 4; |
| |
| /** */ |
| private static final int OP_LOCAL_QUERY = 5; |
| |
| /** */ |
| private static final int OP_RECORD_LOCAL = 6; |
| |
| /** */ |
| private static final int OP_ENABLE_LOCAL = 8; |
| |
| /** */ |
| private static final int OP_DISABLE_LOCAL = 9; |
| |
| /** */ |
| private static final int OP_GET_ENABLED_EVENTS = 10; |
| |
| /** */ |
| private static final int OP_WITH_ASYNC = 11; |
| |
| /** */ |
| private static final int OP_IS_ENABLED = 12; |
| |
| /** */ |
| private static final int OP_LOCAL_LISTEN = 13; |
| |
| /** */ |
| private static final int OP_STOP_LOCAL_LISTEN = 14; |
| |
| /** */ |
| private static final int OP_REMOTE_QUERY_ASYNC = 15; |
| |
| /** */ |
| private static final int OP_WAIT_FOR_LOCAL_ASYNC = 16; |
| |
| /** */ |
| private final IgniteEvents events; |
| |
| /** */ |
| private final EventResultWriter eventResWriter; |
| |
| /** */ |
| private final EventCollectionResultWriter eventColResWriter; |
| |
| /** |
| * Ctor. |
| * |
| * @param platformCtx Context. |
| * @param events Ignite events. |
| */ |
| public PlatformEvents(PlatformContext platformCtx, IgniteEvents events) { |
| super(platformCtx); |
| |
| assert events != null; |
| |
| this.events = events; |
| |
| eventResWriter = new EventResultWriter(platformCtx); |
| eventColResWriter = new EventCollectionResultWriter(platformCtx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) |
| throws IgniteCheckedException { |
| switch (type) { |
| case OP_RECORD_LOCAL: |
| // TODO: IGNITE-1410. |
| return TRUE; |
| |
| case OP_ENABLE_LOCAL: |
| |
| events.enableLocal(readEventTypes(reader)); |
| |
| return TRUE; |
| |
| case OP_DISABLE_LOCAL: |
| |
| events.disableLocal(readEventTypes(reader)); |
| |
| return TRUE; |
| |
| case OP_STOP_REMOTE_LISTEN: |
| events.stopRemoteListen(reader.readUuid()); |
| |
| return TRUE; |
| |
| case OP_LOCAL_LISTEN: |
| events.localListen(localFilter(reader.readLong()), reader.readInt()); |
| |
| return TRUE; |
| |
| case OP_REMOTE_QUERY_ASYNC: |
| readAndListenFuture(reader, startRemoteQueryAsync(reader, events), eventColResWriter); |
| |
| return TRUE; |
| |
| case OP_WAIT_FOR_LOCAL_ASYNC: { |
| readAndListenFuture(reader, startWaitForLocalAsync(reader, events), eventResWriter); |
| |
| return TRUE; |
| } |
| |
| default: |
| return super.processInStreamOutLong(type, reader); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"unchecked"}) |
| @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) |
| throws IgniteCheckedException { |
| switch (type) { |
| case OP_LOCAL_QUERY: { |
| Collection<EventAdapter> result = |
| events.localQuery(F.<EventAdapter>alwaysTrue(), readEventTypes(reader)); |
| |
| writer.writeInt(result.size()); |
| |
| for (EventAdapter e : result) |
| platformCtx.writeEvent(writer, e); |
| |
| break; |
| } |
| |
| case OP_WAIT_FOR_LOCAL: { |
| EventAdapter result = startWaitForLocal(reader, events); |
| |
| platformCtx.writeEvent(writer, result); |
| |
| break; |
| } |
| |
| case OP_REMOTE_LISTEN: { |
| int bufSize = reader.readInt(); |
| |
| long interval = reader.readLong(); |
| |
| boolean autoUnsubscribe = reader.readBoolean(); |
| |
| boolean hasLocFilter = reader.readBoolean(); |
| |
| PlatformEventFilterListener locFilter = hasLocFilter ? localFilter(reader.readLong()) : null; |
| |
| boolean hasRmtFilter = reader.readBoolean(); |
| |
| UUID listenId; |
| |
| if (hasRmtFilter) { |
| PlatformEventFilterListener rmtFilter = platformCtx.createRemoteEventFilter( |
| reader.readObjectDetached(), readEventTypes(reader)); |
| |
| listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, rmtFilter); |
| } |
| else |
| listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, null, |
| readEventTypes(reader)); |
| |
| writer.writeUuid(listenId); |
| |
| break; |
| } |
| |
| case OP_REMOTE_QUERY: { |
| Collection<Event> result = startRemoteQuery(reader, events); |
| |
| eventColResWriter.write(writer, result, null); |
| |
| break; |
| } |
| |
| case OP_STOP_LOCAL_LISTEN: { |
| int id = reader.readInt(); |
| int[] types = reader.readIntArray(); |
| |
| IgnitePredicate lsnr = new PlatformLocalEventListener(id); |
| |
| boolean res = events.stopLocalListen(lsnr, types); |
| |
| writer.writeBoolean(res); |
| |
| break; |
| } |
| |
| default: |
| super.processInStreamOutStream(type, reader, writer); |
| } |
| } |
| |
| /** |
| * Starts the waitForLocal. |
| * |
| * @param reader Reader |
| * @param events Events. |
| * @return Result. |
| */ |
| private EventAdapter startWaitForLocal(BinaryRawReaderEx reader, IgniteEvents events) { |
| Long filterHnd = reader.readObject(); |
| |
| IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : null; |
| |
| int[] eventTypes = readEventTypes(reader); |
| |
| return (EventAdapter) events.waitForLocal(filter, eventTypes); |
| } |
| |
| /** |
| * Starts the waitForLocal asynchronously. |
| * |
| * @param reader Reader |
| * @param events Events. |
| * @return Result. |
| */ |
| private IgniteFuture<EventAdapter> startWaitForLocalAsync(BinaryRawReaderEx reader, IgniteEvents events) { |
| Long filterHnd = reader.readObject(); |
| |
| IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : null; |
| |
| int[] eventTypes = readEventTypes(reader); |
| |
| return events.waitForLocalAsync(filter, eventTypes); |
| } |
| |
| /** |
| * Starts the remote query. |
| * |
| * @param reader Reader. |
| * @param events Events. |
| * @return Result. |
| */ |
| private Collection<Event> startRemoteQuery(BinaryRawReaderEx reader, IgniteEvents events) { |
| Object pred = reader.readObjectDetached(); |
| |
| long timeout = reader.readLong(); |
| |
| int[] types = readEventTypes(reader); |
| |
| PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types); |
| |
| return events.remoteQuery(filter, timeout); |
| } |
| |
| /** |
| * Starts the remote query asynchronously. |
| * |
| * @param reader Reader. |
| * @param events Events. |
| * @return Result. |
| */ |
| private IgniteFuture<List<Event>> startRemoteQueryAsync(BinaryRawReaderEx reader, IgniteEvents events) { |
| Object pred = reader.readObjectDetached(); |
| |
| long timeout = reader.readLong(); |
| |
| int[] types = readEventTypes(reader); |
| |
| PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types); |
| |
| return events.remoteQueryAsync(filter, timeout); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { |
| switch (type) { |
| case OP_GET_ENABLED_EVENTS: |
| writeEventTypes(events.enabledEvents(), writer); |
| |
| break; |
| |
| default: |
| super.processOutStream(type, writer); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { |
| switch (type) { |
| case OP_WITH_ASYNC: |
| if (events.isAsync()) |
| return this; |
| |
| return new PlatformEvents(platformCtx, events.withAsync()); |
| } |
| |
| return super.processOutObject(type); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { |
| switch (type) { |
| case OP_IS_ENABLED: |
| return events.isEnabled((int)val) ? TRUE : FALSE; |
| |
| case OP_STOP_LOCAL_LISTEN: |
| return events.stopLocalListen(localFilter(val)) ? TRUE : FALSE; |
| } |
| |
| return super.processInLongOutLong(type, val); |
| } |
| |
| /** |
| * Reads event types array. |
| * |
| * @param reader Reader |
| * @return Event types, or null. |
| */ |
| private int[] readEventTypes(BinaryRawReaderEx reader) { |
| return reader.readIntArray(); |
| } |
| |
| /** |
| * Reads event types array. |
| * |
| * @param writer Writer |
| * @param types Types. |
| */ |
| private void writeEventTypes(int[] types, BinaryRawWriterEx writer) { |
| if (types == null) { |
| writer.writeIntArray(null); |
| |
| return; |
| } |
| |
| int[] resultTypes = new int[types.length]; |
| |
| int idx = 0; |
| |
| for (int t : types) |
| if (platformCtx.isEventTypeSupported(t)) |
| resultTypes[idx++] = t; |
| |
| writer.writeIntArray(Arrays.copyOf(resultTypes, idx)); |
| } |
| |
| /** |
| * Creates an interop filter from handle. |
| * |
| * @param hnd Handle. |
| * @return Interop filter. |
| */ |
| private PlatformEventFilterListener localFilter(long hnd) { |
| return platformCtx.createLocalEventFilter(hnd); |
| } |
| |
| /** |
| * Writes an EventBase. |
| */ |
| private static class EventResultWriter implements PlatformFutureUtils.Writer { |
| /** */ |
| private final PlatformContext platformCtx; |
| |
| /** |
| * Constructor. |
| * |
| * @param platformCtx Context. |
| */ |
| public EventResultWriter(PlatformContext platformCtx) { |
| assert platformCtx != null; |
| |
| this.platformCtx = platformCtx; |
| } |
| |
| /** <inheritDoc /> */ |
| @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) { |
| platformCtx.writeEvent(writer, (EventAdapter)obj); |
| } |
| |
| /** <inheritDoc /> */ |
| @Override public boolean canWrite(Object obj, Throwable err) { |
| return obj instanceof EventAdapter && err == null; |
| } |
| } |
| |
| /** |
| * Writes a collection of EventAdapter. |
| */ |
| private static class EventCollectionResultWriter implements PlatformFutureUtils.Writer { |
| /** */ |
| private final PlatformContext platformCtx; |
| |
| /** |
| * Constructor. |
| * |
| * @param platformCtx Context. |
| */ |
| public EventCollectionResultWriter(PlatformContext platformCtx) { |
| assert platformCtx != null; |
| |
| this.platformCtx = platformCtx; |
| } |
| |
| /** <inheritDoc /> */ |
| @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) { |
| Collection<Event> evts = (Collection<Event>)obj; |
| |
| if (obj != null) { |
| writer.writeInt(evts.size()); |
| |
| for (Event e : evts) |
| platformCtx.writeEvent(writer, e); |
| } |
| else |
| writer.writeInt(-1); |
| } |
| |
| /** <inheritDoc /> */ |
| @Override public boolean canWrite(Object obj, Throwable err) { |
| return obj instanceof Collection && err == null; |
| } |
| } |
| } |