blob: c37d1aa40cda6a08dcc213f301b2a82f1fc29058 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import javax.management.DynamicMBean;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryBasicIdMapper;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cdc.CdcConsumerState;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.CI3;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.MetricExporterSpi;
import org.apache.ignite.spi.metric.ObjectMetric;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
import static org.apache.ignite.internal.cdc.CdcMain.BINARY_META_DIR;
import static org.apache.ignite.internal.cdc.CdcMain.CDC_DIR;
import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX;
import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_OFFSET;
import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX;
import static org.apache.ignite.internal.cdc.CdcMain.LAST_SEG_CONSUMPTION_TIME;
import static org.apache.ignite.internal.cdc.CdcMain.MARSHALLER_DIR;
import static org.apache.ignite.internal.cdc.CdcMain.cdcInstanceName;
import static org.apache.ignite.internal.cdc.WalRecordsConsumer.EVTS_CNT;
import static org.apache.ignite.internal.cdc.WalRecordsConsumer.LAST_EVT_TIME;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** */
public abstract class AbstractCdcTest extends GridCommonAbstractTest {
/** */
public static final String JOHN = "John Connor";
/** */
public static final int WAL_ARCHIVE_TIMEOUT = 5_000;
/** Keys count. */
public static final int KEYS_CNT = 50;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
super.beforeTest();
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
stopAllGrids();
cleanPersistenceDir();
}
/** */
protected CdcMain createCdc(CdcConsumer cnsmr, IgniteConfiguration cfg) {
return createCdc(cnsmr, cfg, null);
}
/** */
protected CdcMain createCdc(
CdcConsumer cnsmr,
IgniteConfiguration cfg,
CountDownLatch latch,
GridAbsPredicate... conditions
) {
CdcConfiguration cdcCfg = new CdcConfiguration();
cdcCfg.setConsumer(cnsmr);
cdcCfg.setKeepBinary(keepBinary());
cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
return new CdcMain(cfg, null, cdcCfg) {
@Override protected CdcConsumerState createState(Path stateDir) {
return new CdcConsumerState(log, stateDir) {
@Override public void saveWal(T2<WALPointer, Integer> state) throws IOException {
super.saveWal(state);
if (!F.isEmpty(conditions)) {
for (GridAbsPredicate p : conditions) {
if (!p.apply())
return;
}
latch.countDown();
}
}
};
}
};
}
/** */
protected void addAndWaitForConsumption(
UserCdcConsumer cnsmr,
IgniteConfiguration cfg,
IgniteCache<Integer, CdcSelfTest.User> cache,
IgniteCache<Integer, CdcSelfTest.User> txCache,
CI3<IgniteCache<Integer, CdcSelfTest.User>, Integer, Integer> addData,
int from,
int to,
boolean waitForCommit
) throws Exception {
GridAbsPredicate cachePredicate = sizePredicate(to - from, cache.getName(), UPDATE, cnsmr);
GridAbsPredicate txPredicate = txCache == null
? null
: sizePredicate(to - from, txCache.getName(), UPDATE, cnsmr);
CdcMain cdc;
CountDownLatch latch = new CountDownLatch(1);
if (waitForCommit) {
cdc = txCache == null
? createCdc(cnsmr, cfg, latch, cachePredicate)
: createCdc(cnsmr, cfg, latch, cachePredicate, txPredicate);
}
else
cdc = createCdc(cnsmr, cfg);
IgniteInternalFuture<?> fut = runAsync(cdc);
addData.apply(cache, from, to);
if (txCache != null)
addData.apply(txCache, from, to);
if (waitForCommit)
latch.await(getTestTimeout(), MILLISECONDS);
else {
assertTrue(waitForCondition(cachePredicate, getTestTimeout()));
if (txCache != null)
assertTrue(waitForCondition(txPredicate, getTestTimeout()));
}
checkMetrics(cdc, txCache == null ? to : to * 2);
fut.cancel();
List<Integer> keys = cnsmr.data(UPDATE, cacheId(cache.getName()));
assertEquals(to - from, keys.size());
for (int i = from; i < to; i++)
assertTrue(Integer.toString(i), keys.contains(i));
assertTrue(cnsmr.stopped());
}
/** */
public void waitForSize(
int expSz,
String cacheName,
CdcSelfTest.ChangeEventType evtType,
TestCdcConsumer<?>... cnsmrs
) throws IgniteInterruptedCheckedException {
assertTrue(waitForCondition(sizePredicate(expSz, cacheName, evtType, cnsmrs), getTestTimeout()));
}
/** */
protected GridAbsPredicate sizePredicate(
int expSz,
String cacheName,
ChangeEventType evtType,
TestCdcConsumer<?>... cnsmrs
) {
return () -> {
int sum = Arrays.stream(cnsmrs).mapToInt(c -> F.size(c.data(evtType, cacheId(cacheName)))).sum();
return sum == expSz;
};
}
/** */
protected void checkMetrics(CdcMain cdc, int expCnt) throws Exception {
IgniteConfiguration cfg = getFieldValue(cdc, "igniteCfg");
DynamicMBean jmxCdcReg = metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), null, "cdc");
Function<String, ?> jmxVal = m -> {
try {
return jmxCdcReg.getAttribute(m);
}
catch (Exception e) {
throw new IgniteException(e);
}
};
checkMetrics(expCnt, (Function<String, Long>)jmxVal, (Function<String, String>)jmxVal);
MetricRegistry mreg = getFieldValue(cdc, "mreg");
assertNotNull(mreg);
checkMetrics(
expCnt,
m -> mreg.<LongMetric>findMetric(m).value(),
m -> mreg.<ObjectMetric<String>>findMetric(m).value()
);
}
/** */
private void checkMetrics(long expCnt, Function<String, Long> longMetric, Function<String, String> strMetric) {
long committedSegIdx = longMetric.apply(COMMITTED_SEG_IDX);
long curSegIdx = longMetric.apply(CUR_SEG_IDX);
assertTrue(committedSegIdx <= curSegIdx);
assertTrue(longMetric.apply(COMMITTED_SEG_OFFSET) >= 0);
assertTrue(longMetric.apply(LAST_SEG_CONSUMPTION_TIME) > 0);
assertTrue(longMetric.apply(LAST_EVT_TIME) > 0);
for (String m : new String[] {BINARY_META_DIR, MARSHALLER_DIR, CDC_DIR})
assertTrue(new File(strMetric.apply(m)).exists());
assertEquals(expCnt, (long)longMetric.apply(EVTS_CNT));
}
/** */
protected boolean keepBinary() {
return false;
}
/** */
protected MetricExporterSpi[] metricExporters() {
return null;
}
/** */
public abstract static class TestCdcConsumer<T> implements CdcConsumer {
/** Keys. */
final ConcurrentMap<IgniteBiTuple<ChangeEventType, Integer>, List<T>> data = new ConcurrentHashMap<>();
/** Cache events. */
protected final ConcurrentMap<Integer, CdcCacheEvent> caches = new ConcurrentHashMap<>();
/** */
private volatile boolean stopped;
/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
stopped = false;
}
/** {@inheritDoc} */
@Override public void stop() {
stopped = true;
}
/** {@inheritDoc} */
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
evts.forEachRemaining(evt -> {
if (!evt.primary())
return;
data.computeIfAbsent(
F.t(evt.value() == null ? DELETE : UPDATE, evt.cacheId()),
k -> new ArrayList<>()).add(extract(evt));
assertTrue(caches.containsKey(evt.cacheId()));
checkEvent(evt);
});
return commit();
}
/** {@inheritDoc} */
@Override public void onTypes(Iterator<BinaryType> types) {
types.forEachRemaining(t -> assertNotNull(t));
}
/** {@inheritDoc} */
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvts) {
cacheEvts.forEachRemaining(evt -> {
assertFalse(caches.containsKey(evt.cacheId()));
caches.put(evt.cacheId(), evt);
});
}
/** {@inheritDoc} */
@Override public void onCacheDestroy(Iterator<Integer> caches) {
caches.forEachRemaining(cacheId -> assertNotNull(this.caches.remove(cacheId)));
}
/** */
public abstract void checkEvent(CdcEvent evt);
/** */
public abstract T extract(CdcEvent evt);
/** */
protected boolean commit() {
return true;
}
/** @return Read keys. */
public List<T> data(ChangeEventType op, int cacheId) {
return data.get(F.t(op, cacheId));
}
/** */
public boolean stopped() {
return stopped;
}
}
/** */
public static class UserCdcConsumer extends TestCdcConsumer<Integer> {
/** */
protected boolean userTypeFound;
/** {@inheritDoc} */
@Override public void checkEvent(CdcEvent evt) {
assertTrue(userTypeFound);
assertNull(evt.version().otherClusterVersion());
if (evt.value() == null)
return;
User user = (User)evt.value();
assertTrue(user.getName().startsWith(JOHN));
assertTrue(user.getAge() >= 42);
}
/** {@inheritDoc} */
@Override public Integer extract(CdcEvent evt) {
return (Integer)evt.key();
}
/** {@inheritDoc} */
@Override public void onTypes(Iterator<BinaryType> types) {
types.forEachRemaining(t -> {
if (t.typeName().equals(User.class.getName())) {
userTypeFound = true;
assertNotNull(t.field("name"));
assertEquals(String.class.getSimpleName(), t.fieldTypeName("name"));
assertNotNull(t.field("age"));
assertEquals(int.class.getName(), t.fieldTypeName("age"));
assertNotNull(t.field("payload"));
assertEquals(byte[].class.getSimpleName(), t.fieldTypeName("payload"));
}
assertNotNull(t);
});
}
/** {@inheritDoc} */
@Override public void onMappings(Iterator<TypeMapping> mappings) {
BinaryBasicIdMapper mapper = new BinaryBasicIdMapper();
mappings.forEachRemaining(m -> {
assertNotNull(m);
String typeName = m.typeName();
assertFalse(typeName.isEmpty());
assertEquals(mapper.typeId(typeName), m.typeId());
});
}
}
/** */
public static class TrackCacheEventsConsumer implements CdcConsumer {
/** Cache events. */
public final Map<Integer, CdcCacheEvent> evts = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
cacheEvents.forEachRemaining(e -> {
log.info("TrackCacheEventsConsumer.add[cacheId=" + e.cacheId() + ", e=" + e + ']');
evts.put(e.cacheId(), e);
});
}
/** {@inheritDoc} */
@Override public void onCacheDestroy(Iterator<Integer> caches) {
caches.forEachRemaining(cacheId -> {
log.info("TrackCacheEventsConsumer.remove[cacheId=" + cacheId + ']');
evts.remove(cacheId);
});
}
/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
evts.forEachRemaining(e -> { /* No-op. */ });
return false;
}
/** {@inheritDoc} */
@Override public void onTypes(Iterator<BinaryType> types) {
types.forEachRemaining(e -> { /* No-op. */ });
}
/** {@inheritDoc} */
@Override public void onMappings(Iterator<TypeMapping> mappings) {
mappings.forEachRemaining(e -> { /* No-op. */ });
}
/** {@inheritDoc} */
@Override public void stop() {
// No-op.
}
}
/** */
protected static User createUser(int i) {
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
return new User(JOHN + " " + i, 42 + i, bytes);
}
/** */
public static class User {
/** */
private final String name;
/** */
private final int age;
/** */
private final byte[] payload;
/** */
public User(String name, int age, byte[] payload) {
this.name = name;
this.age = age;
this.payload = payload;
}
/** */
public String getName() {
return name;
}
/** */
public int getAge() {
return age;
}
/** */
public byte[] getPayload() {
return payload;
}
}
/** */
public enum ChangeEventType {
/** */
UPDATE,
/** */
DELETE
}
}