| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.cdc; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.function.BooleanSupplier; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.binary.BinaryObject; |
| import org.apache.ignite.cache.CacheEntryVersion; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.binary.BinaryMetadata; |
| import org.apache.ignite.internal.binary.BinaryTypeImpl; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.CacheObjectImpl; |
| import org.apache.ignite.internal.processors.cache.IgniteInternalCache; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; |
| import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; |
| import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; |
| import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE; |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED; |
| |
| /** |
| * Contains logic to process {@link CdcEvent} and apply them to the provided by {@link #ignite()} cluster. |
| */ |
| public abstract class CdcEventsApplier { |
| /** Maximum batch size. */ |
| protected int maxBatchSize; |
| |
| /** Caches. */ |
| private final Map<Integer, IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new HashMap<>(); |
| |
| /** Update batch. */ |
| private final Map<KeyCacheObject, GridCacheDrInfo> updBatch = new HashMap<>(); |
| |
| /** Remove batch. */ |
| private final Map<KeyCacheObject, GridCacheVersion> rmvBatch = new HashMap<>(); |
| |
| /** */ |
| private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch); |
| |
| /** */ |
| private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch); |
| |
| /** |
| * @param maxBatchSize Maximum batch size. |
| */ |
| public CdcEventsApplier(int maxBatchSize) { |
| this.maxBatchSize = maxBatchSize; |
| } |
| |
| /** |
| * @param evts Events to process. |
| * @return Number of applied events. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException { |
| IgniteInternalCache<BinaryObject, BinaryObject> currCache = null; |
| |
| int evtsApplied = 0; |
| |
| for (CdcEvent evt : evts) { |
| if (log().isDebugEnabled()) |
| log().debug("Event received [evt=" + evt + ']'); |
| |
| IgniteInternalCache<BinaryObject, BinaryObject> cache = ignCaches.computeIfAbsent(evt.cacheId(), cacheId -> { |
| for (String cacheName : ignite().cacheNames()) { |
| if (CU.cacheId(cacheName) == cacheId) { |
| // IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call. |
| ignite().cache(cacheName); |
| |
| IgniteInternalCache<Object, Object> cache0 = ignite().cachex(cacheName); |
| |
| assert cache0 != null; |
| |
| return cache0.keepBinary(); |
| } |
| } |
| |
| throw new IllegalStateException("Cache with id not found [cacheId=" + cacheId + ']'); |
| }); |
| |
| if (cache != currCache) { |
| evtsApplied += applyIf(currCache, hasUpdates, hasRemoves); |
| |
| currCache = cache; |
| } |
| |
| CacheEntryVersion order = evt.version(); |
| |
| KeyCacheObject key; |
| |
| if (evt.key() instanceof KeyCacheObject) |
| key = (KeyCacheObject)evt.key(); |
| else |
| key = new KeyCacheObjectImpl(evt.key(), null, evt.partition()); |
| |
| if (evt.value() != null) { |
| evtsApplied += applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves); |
| |
| CacheObject val; |
| |
| if (evt.value() instanceof CacheObject) |
| val = (CacheObject)evt.value(); |
| else |
| val = new CacheObjectImpl(evt.value(), null); |
| |
| GridCacheVersion ver = new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId()); |
| |
| GridCacheDrInfo drVal = currCache.configuration().getExpiryPolicyFactory() != null ? |
| new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED, EXPIRE_TIME_CALCULATE) |
| : new GridCacheDrInfo(val, ver); |
| |
| updBatch.put(key, drVal); |
| } |
| else { |
| evtsApplied += applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key)); |
| |
| rmvBatch.put(key, |
| new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())); |
| } |
| } |
| |
| if (currCache != null) |
| evtsApplied += applyIf(currCache, hasUpdates, hasRemoves); |
| |
| return evtsApplied; |
| } |
| |
| /** |
| * Applies data from {@link #updBatch} or {@link #rmvBatch} to Ignite if required. |
| * |
| * @param cache Current cache. |
| * @param applyUpd Apply update batch flag supplier. |
| * @param applyRmv Apply remove batch flag supplier. |
| * @return Number of applied events. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private int applyIf( |
| IgniteInternalCache<BinaryObject, BinaryObject> cache, |
| BooleanSupplier applyUpd, |
| BooleanSupplier applyRmv |
| ) throws IgniteCheckedException { |
| int evtsApplied = 0; |
| |
| if (applyUpd.getAsBoolean()) { |
| if (log().isDebugEnabled()) |
| log().debug("Applying put batch [cache=" + cache.name() + ']'); |
| |
| cache.putAllConflict(updBatch); |
| |
| evtsApplied += updBatch.size(); |
| |
| updBatch.clear(); |
| } |
| |
| if (applyRmv.getAsBoolean()) { |
| if (log().isDebugEnabled()) |
| log().debug("Applying remove batch [cache=" + cache.name() + ']'); |
| |
| cache.removeAllConflict(rmvBatch); |
| |
| evtsApplied += rmvBatch.size(); |
| |
| rmvBatch.clear(); |
| } |
| |
| return evtsApplied; |
| } |
| |
| /** @return {@code True} if update batch should be applied. */ |
| private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) { |
| return map.size() >= maxBatchSize || map.containsKey(key); |
| } |
| |
| /** |
| * Register {@code meta} inside {@code ign} instance. |
| * |
| * @param ign Ignite instance. |
| * @param log Logger. |
| * @param meta Binary metadata to register. |
| */ |
| public static void registerBinaryMeta(IgniteEx ign, IgniteLogger log, BinaryMetadata meta) { |
| ign.context().cacheObjects().addMeta( |
| meta.typeId(), |
| new BinaryTypeImpl( |
| ((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext(), |
| meta |
| ), |
| false |
| ); |
| |
| if (log.isInfoEnabled()) |
| log.info("BinaryMeta[meta=" + meta + ']'); |
| } |
| |
| /** |
| * Register {@code mapping} inside {@code ign} instance. |
| * |
| * @param ign Ignite instance. |
| * @param log Logger. |
| * @param mapping Type mapping to register. |
| */ |
| public static void registerMapping(IgniteEx ign, IgniteLogger log, TypeMapping mapping) { |
| assert mapping.platformType().ordinal() <= Byte.MAX_VALUE; |
| |
| try { |
| ign.context().marshallerContext().registerClassName( |
| (byte)mapping.platformType().ordinal(), |
| mapping.typeId(), |
| mapping.typeName(), |
| false |
| ); |
| |
| if (log.isInfoEnabled()) |
| log.info("Mapping[mapping=" + mapping + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** @return Ignite instance. */ |
| protected abstract IgniteEx ignite(); |
| |
| /** @return Logger. */ |
| protected abstract IgniteLogger log(); |
| } |