blob: 30fd1e9332e133535d24358e5fa021d713173162 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.plugin.AbstractCachePluginProvider;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
import org.apache.ignite.plugin.CachePluginContext;
import org.apache.ignite.plugin.CachePluginProvider;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** */
public class CdcCacheVersionTest extends AbstractCdcTest {
/** */
public static final String FOR_OTHER_CLUSTER_ID = "for-other-cluster-id";
/** */
public static final byte DFLT_CLUSTER_ID = 1;
/** */
public static final byte OTHER_CLUSTER_ID = 2;
/** */
public static final int KEY_TO_UPD = 42;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setCdcEnabled(true)
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)));
cfg.setPluginProviders(new AbstractTestPluginProvider() {
@Override public String name() {
return "ConflictResolverProvider";
}
@Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
if (!ctx.igniteCacheConfiguration().getName().equals(FOR_OTHER_CLUSTER_ID))
return null;
return new AbstractCachePluginProvider() {
@Override public @Nullable Object createComponent(Class cls) {
if (cls != CacheConflictResolutionManager.class)
return null;
return new TestCacheConflictResolutionManager();
}
};
}
});
return cfg;
}
/** Simplest CDC test with usage of {@link IgniteInternalCache#putAllConflict(Map)}. */
@Test
public void testReadAllKeysFromOtherCluster() throws Exception {
IgniteConfiguration cfg = getConfiguration("ignite-conflict-resolver");
IgniteEx ign = startGrid(cfg);
ign.context().cache().context().versions().dataCenterId(DFLT_CLUSTER_ID);
ign.cluster().state(ACTIVE);
UserCdcConsumer cnsmr = new UserCdcConsumer() {
@Override public void checkEvent(CdcEvent evt) {
assertEquals(DFLT_CLUSTER_ID, evt.version().clusterId());
assertEquals(OTHER_CLUSTER_ID, evt.version().otherClusterVersion().clusterId());
}
};
CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
IgniteCache<Integer, User> cache = ign.getOrCreateCache(FOR_OTHER_CLUSTER_ID);
addAndWaitForConsumption(cnsmr, cdc, cache, null, this::addConflictData, 0, KEYS_CNT, getTestTimeout());
}
/** */
@Test
public void testOrderIncrease() throws Exception {
IgniteConfiguration cfg = getConfiguration("ignite-0");
IgniteEx ign = startGrid(cfg);
ign.cluster().state(ACTIVE);
AtomicLong updCntr = new AtomicLong(0);
CdcConsumer cnsmr = new CdcConsumer() {
private long order = -1;
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
evts.forEachRemaining(evt -> {
assertEquals(KEY_TO_UPD, evt.key());
assertTrue(evt.version().order() > order);
order = evt.version().order();
updCntr.incrementAndGet();
});
return true;
}
@Override public void start() {
// No-op.
}
@Override public void stop() {
// No-op.
}
};
CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
IgniteCache<Integer, User> cache = ign.getOrCreateCache("my-cache");
IgniteInternalFuture<?> fut = runAsync(cdc);
// Update the same key several time.
// Expect {@link CacheEntryVersion#order()} will monotically increase.
for (int i = 0; i < KEYS_CNT; i++)
cache.put(KEY_TO_UPD, createUser(i));
assertTrue(waitForCondition(() -> updCntr.get() == KEYS_CNT, getTestTimeout()));
fut.cancel();
}
/** */
private void addConflictData(IgniteCache<Integer, User> cache, int from, int to) {
try {
IgniteEx ign = (IgniteEx)G.allGrids().get(0);
IgniteInternalCache<Integer, User> intCache = ign.cachex(cache.getName());
Map<KeyCacheObject, GridCacheDrInfo> drMap = new HashMap<>();
for (int i = from; i < to; i++) {
KeyCacheObject key = new KeyCacheObjectImpl(i, null, intCache.affinity().partition(i));
CacheObject val =
new CacheObjectImpl(createUser(i), null);
val.prepareMarshal(intCache.context().cacheObjectContext());
drMap.put(key, new GridCacheDrInfo(val, new GridCacheVersion(1, i, 1, OTHER_CLUSTER_ID)));
}
intCache.putAllConflict(drMap);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** */
public static class TestCacheConflictResolutionManager<K, V> extends GridCacheManagerAdapter<K, V>
implements CacheConflictResolutionManager<K, V> {
/** {@inheritDoc} */
@Override public CacheVersionConflictResolver conflictResolver() {
return new CacheVersionConflictResolver() {
@Override public <K1, V1> GridCacheVersionConflictContext<K1, V1> resolve(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K1, V1> oldEntry,
GridCacheVersionedEntryEx<K1, V1> newEntry,
boolean atomicVerComparator
) {
GridCacheVersionConflictContext<K1, V1> res =
new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
res.useNew();
return res;
}
};
}
}
}