/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.Marshaller;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.events.EventType.EVTS_ALL;

/**
 * Continuous routine handler for remote event listening.
 */
class GridEventConsumeHandler implements GridContinuousHandler {
    /** */
    private static final long serialVersionUID = 0L;

    /** Default callback. */
    private static final IgniteBiPredicate<UUID, Event> DFLT_CALLBACK = new P2<UUID, Event>() {
        @Override public boolean apply(UUID uuid, Event e) {
            return true;
        }
    };

    /** Local callback. */
    private IgniteBiPredicate<UUID, Event> cb;

    /** Filter. */
    private IgnitePredicate<Event> filter;

    /** Serialized filter. */
    private byte[] filterBytes;

    /** Deployment class name. */
    private String clsName;

    /** Deployment info. */
    private GridDeploymentInfo depInfo;

    /** Types. */
    private int[] types;

    /** Listener. */
    private GridLocalEventListener lsnr;

    /** P2P unmarshalling future. */
    private IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();

    /**
     * Required by {@link Externalizable}.
     */
    public GridEventConsumeHandler() {
        // No-op.
    }

    /**
     * @param cb Local callback.
     * @param filter Filter.
     * @param types Types.
     */
    GridEventConsumeHandler(@Nullable IgniteBiPredicate<UUID, Event> cb, @Nullable IgnitePredicate<Event> filter,
        @Nullable int[] types) {
        this.cb = cb == null ? DFLT_CALLBACK : cb;
        this.filter = filter;
        this.types = types;
    }

    /** {@inheritDoc} */
    @Override public boolean isEvents() {
        return true;
    }

    /** {@inheritDoc} */
    @Override public boolean isMessaging() {
        return false;
    }

    /** {@inheritDoc} */
    @Override public boolean isQuery() {
        return false;
    }

    /** {@inheritDoc} */
    @Override public boolean keepBinary() {
        return false;
    }

    /** {@inheritDoc} */
    @Override public String cacheName() {
        throw new IllegalStateException();
    }

    /** {@inheritDoc} */
    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
        Map<Integer, T2<Long, Long>> cntrs) {
        // No-op.
    }

    /** {@inheritDoc} */
    @Override public Map<Integer, T2<Long, Long>> updateCounters() {
        return Collections.emptyMap();
    }

    /**
     * Performs remote filter initialization.
     *
     * @param filter Remote filter.
     * @param ctx Kernal context.
     * @throws IgniteCheckedException In case if initialization failed.
     */
    private void initFilter(IgnitePredicate<Event> filter, GridKernalContext ctx) throws IgniteCheckedException {
        if (filter != null)
            ctx.resource().injectGeneric(filter);

        if (filter instanceof PlatformEventFilterListener)
            ((PlatformEventFilterListener)filter).initialize(ctx);
    }

    /** {@inheritDoc} */
    @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
        throws IgniteCheckedException {
        assert nodeId != null;
        assert routineId != null;
        assert ctx != null;

        if (cb != null)
            ctx.resource().injectGeneric(cb);

        final boolean loc = nodeId.equals(ctx.localNodeId());

        lsnr = new GridLocalEventListener() {
            /** node ID, routine ID, event */
            private final Queue<T3<UUID, UUID, Event>> notificationQueue = new LinkedList<>();

            private boolean notificationInProgress;

            @Override public void onEvent(Event evt) {
                if (filterDropsEvent(evt))
                    return;

                if (loc) {
                    if (!cb.apply(nodeId, evt))
                        ctx.continuous().stopRoutine(routineId);
                }
                else {
                    if (ctx.discovery().node(nodeId) == null)
                        return;

                    synchronized (notificationQueue) {
                        notificationQueue.add(new T3<>(nodeId, routineId, evt));

                        if (!notificationInProgress) {
                            ctx.pools().getSystemExecutorService().execute(new Runnable() {
                                @Override public void run() {
                                    if (!ctx.continuous().lockStopping())
                                        return;

                                    try {
                                        while (true) {
                                            T3<UUID, UUID, Event> t3;

                                            synchronized (notificationQueue) {
                                                t3 = notificationQueue.poll();

                                                if (t3 == null) {
                                                    notificationInProgress = false;

                                                    return;
                                                }
                                            }

                                            try {
                                                Event evt = t3.get3();

                                                EventWrapper wrapper = new EventWrapper(evt);

                                                if (evt instanceof CacheEvent) {
                                                    String cacheName = ((CacheEvent)evt).cacheName();

                                                    ClusterNode node = ctx.discovery().node(t3.get1());

                                                    if (node == null)
                                                        continue;

                                                    if (ctx.config().isPeerClassLoadingEnabled() &&
                                                        ctx.discovery().cacheNode(node, cacheName)) {
                                                        GridCacheAdapter cache = ctx.cache().internalCache(cacheName);

                                                        if (cache != null && cache.context().deploymentEnabled()) {
                                                            wrapper.p2pMarshal(ctx.config().getMarshaller());

                                                            wrapper.cacheName = cacheName;

                                                            cache.context().deploy().prepare(wrapper);
                                                        }
                                                    }
                                                }

                                                ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
                                                    false, false);
                                            }
                                            catch (ClusterTopologyCheckedException ignored) {
                                                // No-op.
                                            }
                                            catch (Throwable e) {
                                                U.error(ctx.log(GridEventConsumeHandler.class),
                                                    "Failed to send event notification to node: " + nodeId, e);
                                            }
                                        }
                                    }
                                    finally {
                                        ctx.continuous().unlockStopping();
                                    }
                                }
                            });

                            notificationInProgress = true;
                        }
                    }
                }
            }
        };

        if (F.isEmpty(types))
            types = EVTS_ALL;

        p2pUnmarshalFut.listen(() -> {
            if (p2pUnmarshalFut.error() == null) {
                try {
                    initFilter(filter, ctx);
                }
                catch (IgniteCheckedException e) {
                    throw F.wrap(e);
                }

                ctx.event().addLocalEventListener(lsnr, types);
            }
        });

        return RegisterStatus.REGISTERED;
    }

    /**
     * Returns {@code true} if there is a filter and this filter filters the given event out.
     *
     * @param evt event to check
     * @return {@code true} if there is a filter and this filter filters the given event out
     */
    private boolean filterDropsEvent(Event evt) {
        try {
            return filter != null && !filter.apply(evt);
        }
        catch (NoClassDefFoundError e) {
            // Filter might be installed using P2P class loading, so let's be careful and avoid a NCDFE from getting
            // to a Failure Handler.

            P2PClassLoadingIssues.rethrowDisarmedP2PClassLoadingFailure(e);
            return true;
        }
    }

    /** {@inheritDoc} */
    @Override public void unregister(UUID routineId, GridKernalContext ctx) {
        assert routineId != null;
        assert ctx != null;

        if (lsnr != null)
            ctx.event().removeLocalEventListener(lsnr, types);

        RuntimeException err = null;

        try {
            if (filter instanceof PlatformEventFilterListener)
                ((PlatformEventFilterListener)filter).onClose();
        }
        catch (RuntimeException ex) {
            err = ex;
        }

        try {
            if (cb instanceof PlatformEventFilterListener)
                ((PlatformEventFilterListener)cb).onClose();
        }
        catch (RuntimeException ex) {
            if (err == null)
                err = ex;
        }

        if (err != null)
            throw err;
    }

    /**
     * @param nodeId Node ID.
     * @param objs Notification objects.
     */
    @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) {
        assert nodeId != null;
        assert routineId != null;
        assert objs != null;
        assert ctx != null;

        for (Object obj : objs) {
            assert obj instanceof EventWrapper;

            EventWrapper wrapper = (EventWrapper)obj;

            if (wrapper.bytes != null) {
                assert ctx.config().isPeerClassLoadingEnabled();

                GridCacheAdapter cache = ctx.cache().internalCache(wrapper.cacheName);

                ClassLoader ldr = null;

                try {
                    if (cache != null) {
                        GridCacheDeploymentManager depMgr = cache.context().deploy();

                        GridDeploymentInfo depInfo = wrapper.depInfo;

                        if (depInfo != null) {
                            depMgr.p2pContext(
                                nodeId,
                                depInfo.classLoaderId(),
                                depInfo.userVersion(),
                                depInfo.deployMode(),
                                depInfo.participants()
                            );
                        }

                        ldr = depMgr.globalLoader();
                    }
                    else {
                        U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " +
                            "when peer class loading is enabled: " + wrapper.cacheName + ". Will try to unmarshal " +
                            "with default class loader.");
                    }

                    wrapper.p2pUnmarshal(ctx.config().getMarshaller(), U.resolveClassLoader(ldr, ctx.config()));
                }
                catch (IgniteCheckedException e) {
                    U.error(ctx.log(getClass()), "Failed to unmarshal event.", e);
                }
            }

            if (!cb.apply(nodeId, wrapper.evt)) {
                ctx.continuous().stopRoutine(routineId);

                break;
            }
        }
    }

    /** {@inheritDoc} */
    @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
        assert ctx != null;
        assert ctx.config().isPeerClassLoadingEnabled();

        if (filter != null) {
            Class cls = U.detectClass(filter);

            clsName = cls.getName();

            GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));

            if (dep == null)
                throw new IgniteDeploymentCheckedException("Failed to deploy event filter: " + filter);

            depInfo = new GridDeploymentInfoBean(dep);

            filterBytes = U.marshal(ctx.config().getMarshaller(), filter);
        }
    }

    /** {@inheritDoc} */
    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
        assert nodeId != null;
        assert ctx != null;
        assert ctx.config().isPeerClassLoadingEnabled();

        if (filterBytes != null) {
            try {
                GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
                    depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);

                if (dep == null)
                    throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);

                filter = U.unmarshal(ctx, filterBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));

                ((GridFutureAdapter)p2pUnmarshalFut).onDone();
            }
            catch (IgniteCheckedException e) {
                ((GridFutureAdapter)p2pUnmarshalFut).onDone(e);

                throw e;
            }
            catch (ExceptionInInitializerError e) {
                ((GridFutureAdapter)p2pUnmarshalFut).onDone(e);

                throw new IgniteCheckedException("Failed to unmarshal deployable object.", e);
            }
        }
    }

    /** {@inheritDoc} */
    @Override public GridContinuousBatch createBatch() {
        return new GridContinuousBatchAdapter();
    }

    /** {@inheritDoc} */
    @Override public void onClientDisconnected() {
        // No-op.
    }

    /** {@inheritDoc} */
    @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
        // No-op.
    }

    /** {@inheritDoc} */
    @Nullable @Override public Object orderedTopic() {
        return null;
    }

    /** {@inheritDoc} */
    @Override public GridContinuousHandler clone() {
        try {
            return (GridContinuousHandler)super.clone();
        }
        catch (CloneNotSupportedException e) {
            throw new IllegalStateException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public void writeExternal(ObjectOutput out) throws IOException {
        boolean b = filterBytes != null;

        out.writeBoolean(b);

        if (b) {
            U.writeByteArray(out, filterBytes);
            U.writeString(out, clsName);
            out.writeObject(depInfo);
        }
        else
            out.writeObject(filter);

        out.writeObject(types);
    }

    /** {@inheritDoc} */
    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        boolean b = in.readBoolean();

        if (b) {
            p2pUnmarshalFut = new GridFutureAdapter<>();
            filterBytes = U.readByteArray(in);
            clsName = U.readString(in);
            depInfo = (GridDeploymentInfo)in.readObject();
        }
        else
            filter = (IgnitePredicate<Event>)in.readObject();

        types = (int[])in.readObject();
    }

    /**
     * Event wrapper.
     */
    private static class EventWrapper implements GridCacheDeployable, Externalizable {
        /** */
        private static final long serialVersionUID = 0L;

        /** Event. */
        private Event evt;

        /** Serialized event. */
        private byte[] bytes;

        /** Cache name (for cache events only). */
        private String cacheName;

        /** Deployment info. */
        private GridDeploymentInfo depInfo;

        /**
         * Required by {@link Externalizable}.
         */
        public EventWrapper() {
            // No-op.
        }

        /**
         * @param evt Event.
         */
        EventWrapper(Event evt) {
            assert evt != null;

            this.evt = evt;
        }

        /**
         * @param marsh Marshaller.
         * @throws IgniteCheckedException In case of error.
         */
        void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
            assert marsh != null;

            bytes = U.marshal(marsh, evt);
        }

        /**
         * @param marsh Marshaller.
         * @param ldr Class loader.
         * @throws IgniteCheckedException In case of error.
         */
        void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
            assert marsh != null;

            assert evt == null;
            assert bytes != null;

            evt = U.unmarshal(marsh, bytes, ldr);
        }

        /** {@inheritDoc} */
        @Override public void prepare(GridDeploymentInfo depInfo) {
            assert evt instanceof CacheEvent;

            this.depInfo = depInfo;
        }

        /** {@inheritDoc} */
        @Override public GridDeploymentInfo deployInfo() {
            return depInfo;
        }

        /** {@inheritDoc} */
        @Override public void writeExternal(ObjectOutput out) throws IOException {
            boolean b = bytes != null;

            out.writeBoolean(b);

            if (b) {
                U.writeByteArray(out, bytes);
                U.writeString(out, cacheName);
                out.writeObject(depInfo);
            }
            else
                out.writeObject(evt);
        }

        /** {@inheritDoc} */
        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            boolean b = in.readBoolean();

            if (b) {
                bytes = U.readByteArray(in);
                cacheName = U.readString(in);
                depInfo = (GridDeploymentInfo)in.readObject();
            }
            else
                evt = (Event)in.readObject();
        }
    }
}
