/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.cdc;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BooleanSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;

/**
 * Contains logic to process {@link CdcEvent} and apply them to the provided by {@link #ignite()} cluster.
 */
public abstract class CdcEventsApplier {
    /** Maximum batch size. */
    protected int maxBatchSize;

    /** Caches. */
    private final Map<Integer, IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new HashMap<>();

    /** Update batch. */
    private final Map<KeyCacheObject, GridCacheDrInfo> updBatch = new HashMap<>();

    /** Remove batch. */
    private final Map<KeyCacheObject, GridCacheVersion> rmvBatch = new HashMap<>();

    /** */
    private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);

    /** */
    private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);

    /**
     * @param maxBatchSize Maximum batch size.
     */
    public CdcEventsApplier(int maxBatchSize) {
        this.maxBatchSize = maxBatchSize;
    }

    /**
     * @param evts Events to process.
     * @return Number of applied events.
     * @throws IgniteCheckedException If failed.
     */
    protected int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
        IgniteInternalCache<BinaryObject, BinaryObject> currCache = null;

        int evtsApplied = 0;

        for (CdcEvent evt : evts) {
            if (log().isDebugEnabled())
                log().debug("Event received [evt=" + evt + ']');

            IgniteInternalCache<BinaryObject, BinaryObject> cache = ignCaches.computeIfAbsent(evt.cacheId(), cacheId -> {
                for (String cacheName : ignite().cacheNames()) {
                    if (CU.cacheId(cacheName) == cacheId) {
                        // IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
                        ignite().cache(cacheName);

                        IgniteInternalCache<Object, Object> cache0 = ignite().cachex(cacheName);

                        assert cache0 != null;

                        return cache0.keepBinary();
                    }
                }

                throw new IllegalStateException("Cache with id not found [cacheId=" + cacheId + ']');
            });

            if (cache != currCache) {
                evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);

                currCache = cache;
            }

            CacheEntryVersion order = evt.version();

            KeyCacheObject key;

            if (evt.key() instanceof KeyCacheObject)
                key = (KeyCacheObject)evt.key();
            else
                key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());

            if (evt.value() != null) {
                evtsApplied += applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);

                CacheObject val;

                if (evt.value() instanceof CacheObject)
                    val = (CacheObject)evt.value();
                else
                    val = new CacheObjectImpl(evt.value(), null);

                updBatch.put(key, new GridCacheDrInfo(val,
                    new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
            }
            else {
                evtsApplied += applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));

                rmvBatch.put(key,
                    new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId()));
            }
        }

        if (currCache != null)
            evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);

        return evtsApplied;
    }

    /**
     * Applies data from {@link #updBatch} or {@link #rmvBatch} to Ignite if required.
     *
     * @param cache Current cache.
     * @param applyUpd Apply update batch flag supplier.
     * @param applyRmv Apply remove batch flag supplier.
     * @return Number of applied events.
     * @throws IgniteCheckedException In case of error.
     */
    private int applyIf(
        IgniteInternalCache<BinaryObject, BinaryObject> cache,
        BooleanSupplier applyUpd,
        BooleanSupplier applyRmv
    ) throws IgniteCheckedException {
        int evtsApplied = 0;

        if (applyUpd.getAsBoolean()) {
            if (log().isDebugEnabled())
                log().debug("Applying put batch [cache=" + cache.name() + ']');

            cache.putAllConflict(updBatch);

            evtsApplied += updBatch.size();

            updBatch.clear();
        }

        if (applyRmv.getAsBoolean()) {
            if (log().isDebugEnabled())
                log().debug("Applying remove batch [cache=" + cache.name() + ']');

            cache.removeAllConflict(rmvBatch);

            evtsApplied += rmvBatch.size();

            rmvBatch.clear();
        }

        return evtsApplied;
    }

    /** @return {@code True} if update batch should be applied. */
    private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) {
        return map.size() >= maxBatchSize || map.containsKey(key);
    }

    /**
     * Register {@code meta} inside {@code ign} instance.
     *
     * @param ign Ignite instance.
     * @param log Logger.
     * @param meta Binary metadata to register.
     */
    public static void registerBinaryMeta(IgniteEx ign, IgniteLogger log, BinaryMetadata meta) {
        ign.context().cacheObjects().addMeta(
            meta.typeId(),
            new BinaryTypeImpl(
                ((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext(),
                meta
            ),
            false
        );

        if (log.isInfoEnabled())
            log.info("BinaryMeta[meta=" + meta + ']');
    }

    /**
     * Register {@code mapping} inside {@code ign} instance.
     *
     * @param ign Ignite instance.
     * @param log Logger.
     * @param mapping Type mapping to register.
     */
    public static void registerMapping(IgniteEx ign, IgniteLogger log, TypeMapping mapping) {
        assert mapping.platformType().ordinal() <= Byte.MAX_VALUE;

        try {
            ign.context().marshallerContext().registerClassName(
                (byte)mapping.platformType().ordinal(),
                mapping.typeId(),
                mapping.typeName(),
                false
            );

            if (log.isInfoEnabled())
                log.info("Mapping[mapping=" + mapping + ']');
        }
        catch (IgniteCheckedException e) {
            throw new IgniteException(e);
        }
    }

    /** @return Ignite instance. */
    protected abstract IgniteEx ignite();

    /** @return Logger. */
    protected abstract IgniteLogger log();
}
