blob: e45cdaae45bc97757edbb508143cc8361b2b3768 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.cdc;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cdc.AbstractCdcTest;
import org.apache.ignite.cdc.CdcCacheEvent;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.spi.metric.LongMetric;
import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX;
import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
import static org.apache.ignite.internal.util.IgniteUtils.MB;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** */
public class CdcIndexRebuildTest extends AbstractCdcTest {
/** */
public static final int VALS_CNT = 1024 * 30;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName).setDataStorageConfiguration(new DataStorageConfiguration()
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setCdcEnabled(true))
.setWalSegmentSize((int)(16 * MB))
);
}
/** */
@Test
public void testIndexRebuild() throws Exception {
IgniteEx srv = startGrid(0);
srv.cluster().state(ACTIVE);
IgniteCache<Integer, TestVal> cache = srv.getOrCreateCache(
new CacheConfiguration<Integer, TestVal>(DEFAULT_CACHE_NAME)
.setIndexedTypes(Integer.class, TestVal.class)
);
Map<Integer, TestVal> vals = new HashMap<>(VALS_CNT);
for (int i = 0; i < VALS_CNT; i++)
vals.put(i, new TestVal());
cache.putAll(vals);
CountingCdcConsumer cnsmr = new CountingCdcConsumer();
CdcMain cdc = createCdc(cnsmr, getConfiguration(srv.name()));
IgniteInternalFuture<Object> fut = runAsync(cdc);
assertTrue(waitForCondition(() -> cnsmr.commited && cnsmr.evtsCnt == VALS_CNT, getTestTimeout()));
waitForWalSegmentsHandling(srv, cdc);
cnsmr.reset();
forceRebuildIndexes(srv, srv.cachex(DEFAULT_CACHE_NAME).context());
indexRebuildFuture(srv, cacheId(DEFAULT_CACHE_NAME)).get(getTestTimeout());
waitForWalSegmentsHandling(srv, cdc);
fut.cancel();
}
/** @param cdc Cdc. */
private void waitForWalSegmentsHandling(IgniteEx srv, CdcMain cdc) throws IgniteInterruptedCheckedException {
assertTrue("Wal segments was not committed by CdcConsumer", waitForCondition(() -> {
long lastArchivedSeg = srv.context().cache().context().wal().lastArchivedSegment();
MetricRegistry mreg = getFieldValue(cdc, "mreg");
long committedCdcSegIdx = mreg.<LongMetric>findMetric(COMMITTED_SEG_IDX).value();
long curCdcSegIdx = mreg.<LongMetric>findMetric(CUR_SEG_IDX).value();
log.warning(String.format(">>>>>> Information about CDC and WAL: " +
"[lastArchivedSeg=%d, committedCdcSegIdx=%d, curCdcSegIdx=%d]",
lastArchivedSeg,
committedCdcSegIdx,
curCdcSegIdx));
return lastArchivedSeg == committedCdcSegIdx;
}, WAL_ARCHIVE_TIMEOUT * 4, WAL_ARCHIVE_TIMEOUT / 2));
}
/** */
public static class CountingCdcConsumer implements CdcConsumer {
/** Commited. */
volatile boolean commited;
/** Evts count. */
volatile int evtsCnt;
/** */
public void reset() {
commited = false;
evtsCnt = 0;
}
/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean onEvents(Iterator<CdcEvent> events) {
while (events.hasNext()) {
events.next();
evtsCnt++;
}
commited = true;
return true;
}
/** {@inheritDoc} */
@Override public void onTypes(Iterator<BinaryType> types) {
types.forEachRemaining(t -> { /* No-op */ });
}
/** {@inheritDoc} */
@Override public void onMappings(Iterator<TypeMapping> mappings) {
mappings.forEachRemaining(m -> { /* No-op */ });
}
/** {@inheritDoc} */
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
cacheEvents.forEachRemaining(e -> { /* No-op */ });
}
/** {@inheritDoc} */
@Override public void onCacheDestroy(Iterator<Integer> caches) {
caches.forEachRemaining(c -> { /* No-op */ });
}
/** {@inheritDoc} */
@Override public void stop() {
// No-op.
}
}
/** */
public static class TestVal {
/** Field 0. */
@QuerySqlField(index = true, inlineSize = 256)
private final String f0;
/** Field 1. */
@QuerySqlField(index = true, inlineSize = 256)
private final String f1;
/**
* Default constructor.
*/
public TestVal() {
f0 = UUID.randomUUID().toString();
f1 = UUID.randomUUID().toString();
}
}
}