blob: 9e206493bc151235c4b101420ab33bf9622d9d8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.management.cdc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
/**
* Task to forcefully resend all cache data to CDC.
* Iterates over caches and writes primary copies of data entries to the WAL to get captured by CDC.
*/
@GridInternal
public class CdcCacheDataResendTask extends VisorMultiNodeTask<CdcResendCommandArg, Void, Void> {
/** */
private static final long serialVersionUID = 0L;
/** Topology version when task was started. */
private AffinityTopologyVersion topVer;
/** {@inheritDoc} */
@Override protected VisorJob<CdcResendCommandArg, Void> job(CdcResendCommandArg arg) {
return new CdcCacheDataResendJob(arg, topVer);
}
/** {@inheritDoc} */
@Override protected Collection<UUID> jobNodes(VisorTaskArgument<CdcResendCommandArg> arg) {
// Check there is no rebalance.
GridDhtPartitionsExchangeFuture fut = ignite.context().cache().context().exchange().lastFinishedFuture();
if (!fut.rebalanced()) {
throw new IgniteException("CDC cache data resend cancelled. Rebalance sheduled " +
"[topVer=" + fut.topologyVersion() + ']');
}
// Cancel resend if affinity will change.
topVer = ignite.context().cache().context().exchange().lastAffinityChangedTopologyVersion(fut.topologyVersion());
return F.nodeIds(ignite.cluster().forServers().nodes());
}
/** {@inheritDoc} */
@Override protected @Nullable Void reduce0(List<ComputeJobResult> results) throws IgniteException {
for (ComputeJobResult res : results) {
if (res.getException() != null) {
throw new IgniteException("CDC cache data resend cancelled. Failed to resend cache data " +
"on the node [nodeId=" + res.getNode().id() + ']', res.getException());
}
}
return null;
}
/** */
private static class CdcCacheDataResendJob extends VisorJob<CdcResendCommandArg, Void> {
/** */
private static final long serialVersionUID = 0L;
/** Injected logger. */
@LoggerResource
protected IgniteLogger log;
/** */
private IgniteWriteAheadLogManager wal;
/** */
private GridCachePartitionExchangeManager<Object, Object> exchange;
/** Topology version when task was started. */
private final AffinityTopologyVersion topVer;
/** */
private GridDhtPartitionsExchangeFuture lastFut;
/**
* @param arg Job argument.
* @param topVer Topology version when task was started.
*/
protected CdcCacheDataResendJob(CdcResendCommandArg arg, AffinityTopologyVersion topVer) {
super(arg, false);
this.topVer = topVer;
}
/** {@inheritDoc} */
@Override protected Void run(CdcResendCommandArg arg) throws IgniteException {
if (F.isEmpty(arg.caches()))
throw new IllegalArgumentException("Caches are not specified.");
List<IgniteInternalCache<?, ?>> caches = new ArrayList<>();
for (String name : arg.caches()) {
IgniteInternalCache<?, ?> cache = ignite.context().cache().cache(name);
if (cache == null)
throw new IgniteException("Cache does not exist [cacheName=" + name + ']');
if (!cache.context().dataRegion().config().isCdcEnabled()) {
throw new IgniteException("CDC is not enabled for given cache [cacheName=" + name +
", dataRegionName=" + cache.context().dataRegion().config().getName() + ']');
}
caches.add(cache);
}
if (log.isInfoEnabled())
log.info("CDC cache data resend started [caches=" + String.join(", ", arg.caches()) + ']');
wal = ignite.context().cache().context().wal(true);
exchange = ignite.context().cache().context().exchange();
try {
Iterator<IgniteInternalCache<?, ?>> iter = caches.iterator();
while (iter.hasNext() && !isCancelled())
resendCacheData(iter.next());
wal.flush(null, true);
if (log.isInfoEnabled()) {
log.info("CDC cache data resend " + (isCancelled() ? "cancelled" : "finished") +
" [caches=" + String.join(", ", arg.caches()) + ']');
}
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
return null;
}
/** @param cache Cache. */
private void resendCacheData(IgniteInternalCache<?, ?> cache) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("CDC cache data resend started [cacheName=" + cache.name() + ']');
GridCacheContext<?, ?> cctx = cache.context();
GridIterator<CacheDataRow> locRows = cctx.offheap()
.cacheIterator(cctx.cacheId(), true, false, AffinityTopologyVersion.NONE, null);
long cnt = 0;
Set<Integer> parts = new TreeSet<>();
for (CacheDataRow row : locRows) {
if (isCancelled())
break;
ensureTopologyNotChanged();
KeyCacheObject key = row.key();
if (log.isTraceEnabled())
log.trace("Resend key: " + key);
CdcDataRecord rec = new CdcDataRecord(new DataEntry(
cctx.cacheId(),
key,
row.value(),
GridCacheOperation.CREATE,
null,
row.version(),
row.expireTime(),
key.partition(),
-1,
DataEntry.flags(true))
);
wal.log(rec);
parts.add(key.partition());
if ((++cnt % 1_000 == 0) && log.isDebugEnabled())
log.debug("Resend entries count: " + cnt);
}
if (log.isInfoEnabled()) {
if (isCancelled())
log.info("CDC cache data resend cancelled.");
else {
log.info("CDC cache data resend finished [cacheName=" + cache.name() +
", entriesCnt=" + cnt + ", parts=" + parts + ']');
}
}
}
/** */
private void ensureTopologyNotChanged() {
GridDhtPartitionsExchangeFuture fut = exchange.lastFinishedFuture();
if (lastFut != fut) {
AffinityTopologyVersion lastChanged = exchange.lastAffinityChangedTopologyVersion(fut.topologyVersion());
if (!topVer.equals(lastChanged)) {
throw new IgniteException("CDC cache data resend cancelled. Topology changed during resend " +
"[startTopVer=" + topVer + ", currentTopVer=" + fut.topologyVersion() + ']');
}
lastFut = fut;
}
}
}
}