blob: c76f41fe54cd026f3847485fb06c6e3e7b3e13ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.cdc;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cdc.CdcCacheEvent;
import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridLoggerProxy;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneSpiContext;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.platform.PlatformType;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
import static org.apache.ignite.internal.IgniteKernal.NL;
import static org.apache.ignite.internal.IgniteKernal.SITE;
import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
import static org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer;
import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_STOP_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex;
import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents;
import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
/**
* Change Data Capture (CDC) application.
* The application runs independently of Ignite node process and provides the ability
* for the {@link CdcConsumer} to consume events({@link CdcEvent}) from WAL segments.
* The user should provide {@link CdcConsumer} implementation with custom consumption logic.
*
* Ignite node should be explicitly configured for using {@link CdcMain}.
* <ol>
* <li>Set {@link DataRegionConfiguration#setCdcEnabled(boolean)} to true.</li>
* <li>Optional: Set {@link DataStorageConfiguration#setCdcWalPath(String)} to path to the directory
* to store WAL segments for CDC.</li>
* <li>Optional: Set {@link DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout for
* force WAL rollover, so new events will be available for consumptions with the predicted time.</li>
* </ol>
*
* When {@link DataStorageConfiguration#getCdcWalPath()} is true then Ignite node on each WAL segment
* rollover creates hard link to archive WAL segment in
* {@link DataStorageConfiguration#getCdcWalPath()} directory. {@link CdcMain} application takes
* segment file and consumes events from it.
* After successful consumption (see {@link CdcConsumer#onEvents(Iterator)}) WAL segment will be deleted
* from directory.
*
* Several Ignite nodes can be started on the same host.
* If your deployment done with custom consistent id then you should specify it via
* {@link IgniteConfiguration#setConsistentId(Serializable)} in provided {@link IgniteConfiguration}.
*
* Application works as follows:
* <ol>
* <li>Searches node work directory based on provided {@link IgniteConfiguration}.</li>
* <li>Awaits for the creation of CDC directory if it not exists.</li>
* <li>Acquires file lock to ensure exclusive consumption.</li>
* <li>Loads state of consumption if it exists.</li>
* <li>Infinitely waits for new available segment and processes it.</li>
* </ol>
*
* @see DataRegionConfiguration#setCdcEnabled(boolean)
* @see DataStorageConfiguration#setCdcWalPath(String)
* @see DataStorageConfiguration#setWalForceArchiveTimeout(long)
* @see CdcCommandLineStartup
* @see CdcConsumer
* @see DataStorageConfiguration#DFLT_WAL_CDC_PATH
*/
public class CdcMain implements Runnable {
/** */
public static final String ERR_MSG = "Persistence and CDC disabled. Capture Data Change can't run!";
/** State dir. */
public static final String STATE_DIR = "state";
/** Current segment index metric name. */
public static final String CUR_SEG_IDX = "CurrentSegmentIndex";
/** Committed segment index metric name. */
public static final String COMMITTED_SEG_IDX = "CommittedSegmentIndex";
/** Committed segment offset metric name. */
public static final String COMMITTED_SEG_OFFSET = "CommittedSegmentOffset";
/** Last segment consumption time. */
public static final String LAST_SEG_CONSUMPTION_TIME = "LastSegmentConsumptionTime";
/** Metadata update time. */
public static final String META_UPDATE = "MetadataUpdateTime";
/** Event capture time. */
public static final String EVT_CAPTURE_TIME = "EventCaptureTime";
/** Binary metadata metric name. */
public static final String BINARY_META_DIR = "BinaryMetaDir";
/** Marshaller metric name. */
public static final String MARSHALLER_DIR = "MarshallerDir";
/** Cdc directory metric name. */
public static final String CDC_DIR = "CdcDir";
/** Cdc mode metric name. */
public static final String CDC_MODE = "CdcMode";
/** Filter for consumption in {@link CdcMode#IGNITE_NODE_ACTIVE} mode. */
private static final IgniteBiPredicate<WALRecord.RecordType, WALPointer> PASSIVE_RECS =
(type, ptr) -> type == CDC_MANAGER_STOP_RECORD || type == CDC_MANAGER_RECORD;
/** Filter for consumption in {@link CdcMode#CDC_UTILITY_ACTIVE} mode. */
private static final IgniteBiPredicate<WALRecord.RecordType, WALPointer> ACTIVE_RECS =
(type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD;
/** Ignite configuration. */
private final IgniteConfiguration igniteCfg;
/** Spring resource context. */
private final GridSpringResourceContext ctx;
/** CDC metrics registry. */
private MetricRegistryImpl mreg;
/** Current segment index metric. */
private AtomicLongMetric curSegmentIdx;
/** Committed state segment index metric. */
private AtomicLongMetric committedSegmentIdx;
/** Committed state segment offset metric. */
private AtomicLongMetric committedSegmentOffset;
/** Time of last segment consumption. */
private AtomicLongMetric lastSegmentConsumptionTs;
/** Metadata update time. */
private HistogramMetricImpl metaUpdate;
/**
* Metric represents time between creating {@link DataRecord}, containing the data change events, and capturing them
* by {@link CdcConsumer}.
*/
private HistogramMetricImpl evtCaptureTime;
/** Change Data Capture configuration. */
protected final CdcConfiguration cdcCfg;
/** Events consumer. */
private final WalRecordsConsumer<?, ?> consumer;
/** Logger. */
private final IgniteLogger log;
/** Change Data Capture directory. */
private Path cdcDir;
/** Database directory. */
private File dbDir;
/** Binary meta directory. */
private File binaryMeta;
/** Marshaller directory. */
private File marshaller;
/** Standalone kernal context. */
private StandaloneGridKernalContext kctx;
/** Change Data Capture state. */
private CdcConsumerState state;
/**
* Saved state to start from. Points to the last committed offset. Set to {@code null} after failover on start and
* switching from {@link CdcMode#IGNITE_NODE_ACTIVE} to {@link CdcMode#CDC_UTILITY_ACTIVE}.
*
* @see #removeProcessedOnFailover(Path)
* @see #consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder)
*/
private T2<WALPointer, Integer> walState;
/** Types state. */
private Map<Integer, Long> typesState;
/** Mappings state. */
private Set<T2<Integer, Byte>> mappingsState;
/** Caches state. */
private Map<Integer, Long> cachesState;
/** CDC mode state. */
private volatile CdcMode cdcModeState;
/** Stopped flag. */
private volatile boolean started;
/** Stopped flag. */
private volatile boolean stopped;
/** Already processed segments. */
private final Set<Path> processedSegments = new HashSet<>();
/**
* @param cfg Ignite configuration.
* @param ctx Spring resource context.
* @param cdcCfg Change Data Capture configuration.
*/
public CdcMain(
IgniteConfiguration cfg,
GridSpringResourceContext ctx,
CdcConfiguration cdcCfg
) {
igniteCfg = new IgniteConfiguration(cfg);
this.ctx = ctx;
this.cdcCfg = cdcCfg;
try {
U.initWorkDir(igniteCfg);
log = U.initLogger(igniteCfg, "ignite-cdc");
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
consumer = new WalRecordsConsumer<>(cdcCfg.getConsumer(), log);
}
/** Runs Change Data Capture. */
@Override public void run() {
synchronized (this) {
if (stopped)
return;
}
try {
runX();
}
catch (Throwable e) {
log.error("Cdc error", e);
throw new IgniteException(e);
}
}
/** Runs Change Data Capture application with possible exception. */
public void runX() throws Exception {
ackAsciiLogo();
if (!CU.isCdcEnabled(igniteCfg)) {
log.error(ERR_MSG);
throw new IllegalArgumentException(ERR_MSG);
}
try (CdcFileLockHolder lock = lockPds()) {
String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 1).toString();
Files.createDirectories(cdcDir.resolve(STATE_DIR));
binaryMeta = CacheObjectBinaryProcessorImpl.binaryWorkDir(igniteCfg.getWorkDirectory(), consIdDir);
marshaller = MarshallerContextImpl.mappingFileStoreWorkDir(igniteCfg.getWorkDirectory());
if (log.isInfoEnabled()) {
log.info("Change Data Capture [dir=" + cdcDir + ']');
log.info("Ignite node Binary meta [dir=" + binaryMeta + ']');
log.info("Ignite node Marshaller [dir=" + marshaller + ']');
}
kctx = startStandaloneKernal();
initMetrics();
try {
kctx.resource().injectGeneric(consumer.consumer());
state = createState(cdcDir.resolve(STATE_DIR));
walState = state.loadWalState();
typesState = state.loadTypesState();
mappingsState = state.loadMappingsState();
cachesState = state.loadCaches();
cdcModeState = state.loadCdcMode();
if (walState != null) {
committedSegmentIdx.value(walState.get1().index());
committedSegmentOffset.value(walState.get1().fileOffset());
}
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")));
started = true;
try {
consumeWalSegmentsUntilStopped();
}
finally {
stop();
}
}
finally {
closeAllComponents(kctx);
if (log.isInfoEnabled())
log.info("Ignite Change Data Capture Application stopped.");
}
}
}
/** Creates consumer state. */
protected CdcConsumerState createState(Path stateDir) {
return new CdcConsumerState(log, stateDir);
}
/**
* @return Kernal instance.
* @throws IgniteCheckedException If failed.
*/
private StandaloneGridKernalContext startStandaloneKernal() throws IgniteCheckedException {
StandaloneGridKernalContext kctx = new StandaloneGridKernalContext(log, binaryMeta, marshaller) {
@Override protected IgniteConfiguration prepareIgniteConfiguration() {
IgniteConfiguration cfg = super.prepareIgniteConfiguration();
cfg.setIgniteInstanceName(cdcInstanceName(igniteCfg.getIgniteInstanceName()));
cfg.setWorkDirectory(igniteCfg.getWorkDirectory());
if (!F.isEmpty(cdcCfg.getMetricExporterSpi()))
cfg.setMetricExporterSpi(cdcCfg.getMetricExporterSpi());
else {
cfg.setMetricExporterSpi(U.IGNITE_MBEANS_DISABLED
? new NoopMetricExporterSpi()
: new JmxMetricExporterSpi());
}
initializeDefaultMBeanServer(cfg);
return cfg;
}
/** {@inheritDoc} */
@Override public String igniteInstanceName() {
return config().getIgniteInstanceName();
}
};
kctx.resource().setSpringContext(ctx);
startAllComponents(kctx);
for (IgniteSpi metricSpi : kctx.config().getMetricExporterSpi()) {
metricSpi.onContextInitialized(new StandaloneSpiContext());
}
mreg = kctx.metric().registry("cdc");
return kctx;
}
/** Initialize metrics. */
private void initMetrics() {
mreg.objectMetric(BINARY_META_DIR, String.class, "Binary meta directory").value(binaryMeta.getAbsolutePath());
mreg.objectMetric(MARSHALLER_DIR, String.class, "Marshaller directory").value(marshaller.getAbsolutePath());
mreg.objectMetric(CDC_DIR, String.class, "CDC directory").value(cdcDir.toFile().getAbsolutePath());
curSegmentIdx = mreg.longMetric(CUR_SEG_IDX, "Current segment index");
committedSegmentIdx = mreg.longMetric(COMMITTED_SEG_IDX, "Committed segment index");
committedSegmentOffset = mreg.longMetric(COMMITTED_SEG_OFFSET, "Committed segment offset");
lastSegmentConsumptionTs =
mreg.longMetric(LAST_SEG_CONSUMPTION_TIME, "Last time of consumption of WAL segment");
metaUpdate = mreg.histogram(META_UPDATE, new long[] {100, 500, 1000}, "Metadata update time");
evtCaptureTime = mreg.histogram(
EVT_CAPTURE_TIME,
new long[] {5_000, 10_000, 15_000, 30_000, 60_000},
"Time between creating an event on Ignite node and capturing it by CdcConsumer");
mreg.register(CDC_MODE, () -> cdcModeState.name(), String.class, "CDC mode");
}
/**
* @return CDC lock holder for specifi folder.
* @throws IgniteCheckedException If failed.
*/
private CdcFileLockHolder lockPds() throws IgniteCheckedException {
PdsFolderSettings<CdcFileLockHolder> settings =
new PdsFolderResolver<>(igniteCfg, log, igniteCfg.getConsistentId(), this::tryLock).resolve();
if (settings == null) {
throw new IgniteException("Can't find the folder to read WAL segments from! " +
"[workDir=" + igniteCfg.getWorkDirectory() + ", consistentId=" + igniteCfg.getConsistentId() + ']');
}
CdcFileLockHolder lock = settings.getLockedFileLockHolder();
if (lock == null) {
File consIdDir = settings.persistentStoreNodePath();
lock = tryLock(consIdDir);
if (lock == null) {
throw new IgniteException(
"Can't acquire lock for Change Data Capture folder [dir=" + consIdDir.getAbsolutePath() + ']'
);
}
}
return lock;
}
/** Waits and consumes new WAL segments until stopped. */
public void consumeWalSegmentsUntilStopped() {
try {
Set<Path> seen = new HashSet<>();
AtomicLong lastSgmnt = new AtomicLong(-1);
while (!stopped) {
if (!consumer.alive()) {
log.warning("Consumer is not alive. Ignite Change Data Capture Application will be stopped.");
return;
}
try (Stream<Path> cdcFiles = Files.list(cdcDir)) {
Set<Path> exists = new HashSet<>();
Iterator<Path> segments = cdcFiles
.peek(exists::add) // Store files that exists in cdc dir.
// Need unseen WAL segments only.
.filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
.peek(seen::add) // Adds to seen.
.sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)) // Sort by segment index.
.peek(p -> {
long nextSgmnt = segmentIndex(p);
if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) {
throw new IgniteException("Found missed segments. Some events are missed. Exiting! " +
"[lastSegment=" + lastSgmnt.get() + ", nextSegment=" + nextSgmnt + ']');
}
lastSgmnt.set(nextSgmnt);
}).iterator();
while (segments.hasNext()) {
Path segment = segments.next();
if (walState != null && removeProcessedOnFailover(segment))
continue;
if (consumeSegment(segment)) {
// CDC mode switched. Reset partitions info to handle them again actively.
seen.clear();
lastSgmnt.set(-1);
walState = state.loadWalState();
break;
}
walState = null;
}
seen.removeIf(p -> !exists.contains(p)); // Clean up seen set.
if (lastSgmnt.get() == -1) //Forcefully updating metadata if no new segments found.
updateMetadata();
}
if (!stopped)
U.sleep(cdcCfg.getCheckFrequency());
}
}
catch (IOException | IgniteInterruptedCheckedException e) {
throw new IgniteException(e);
}
}
/**
* Reads all available records from segment.
*
* @return {@code true} if mode switched.
*/
private boolean consumeSegment(Path segment) {
updateMetadata();
if (log.isInfoEnabled())
log.info("Processing WAL segment [segment=" + segment + ']');
IgniteWalIteratorFactory.IteratorParametersBuilder builder =
new IgniteWalIteratorFactory.IteratorParametersBuilder()
.log(log)
.binaryMetadataFileStoreDir(binaryMeta)
.marshallerMappingFileStoreDir(marshaller)
.igniteConfigurationModifier((cfg) -> cfg.setPluginProviders(igniteCfg.getPluginProviders()))
.keepBinary(cdcCfg.isKeepBinary())
.filesOrDirs(segment.toFile());
if (igniteCfg.getDataStorageConfiguration().getPageSize() != 0)
builder.pageSize(igniteCfg.getDataStorageConfiguration().getPageSize());
if (walState != null)
builder.from(walState.get1());
long segmentIdx = segmentIndex(segment);
lastSegmentConsumptionTs.value(System.currentTimeMillis());
curSegmentIdx.value(segmentIdx);
if (cdcModeState == CdcMode.IGNITE_NODE_ACTIVE) {
if (consumeSegmentPassively(builder))
return true;
}
else
consumeSegmentActively(builder);
processedSegments.add(segment);
return false;
}
/**
* Consumes CDC events in {@link CdcMode#CDC_UTILITY_ACTIVE} mode.
*/
private void consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder builder) {
try (DataEntryIterator iter = new DataEntryIterator(
new IgniteWalIteratorFactory(log).iterator(builder.addFilter(ACTIVE_RECS)),
evtCaptureTime)
) {
if (walState != null)
iter.init(walState.get2());
boolean interrupted;
do {
boolean commit = consumer.onRecords(iter, WalRecordsConsumer.CDC_EVENT_TRANSFORMER, null);
if (commit)
saveStateAndRemoveProcessed(iter.state());
interrupted = Thread.interrupted();
} while (iter.hasNext() && !interrupted);
if (interrupted)
throw new IgniteException("Change Data Capture Application interrupted");
}
catch (IgniteCheckedException | IOException e) {
throw new IgniteException(e);
}
}
/**
* Consumes CDC events in {@link CdcMode#IGNITE_NODE_ACTIVE} mode.
*
* @return {@code true} if mode switched.
*/
private boolean consumeSegmentPassively(IgniteWalIteratorFactory.IteratorParametersBuilder builder) {
try (WALIterator iter = new IgniteWalIteratorFactory(log).iterator(builder.addFilter(PASSIVE_RECS))) {
boolean interrupted = false;
while (iter.hasNext() && !interrupted) {
IgniteBiTuple<WALPointer, WALRecord> next = iter.next();
WALRecord walRecord = next.get2();
switch (walRecord.type()) {
case CDC_MANAGER_RECORD:
saveStateAndRemoveProcessed(((CdcManagerRecord)walRecord).walState());
break;
case CDC_MANAGER_STOP_RECORD:
state.saveCdcMode((cdcModeState = CdcMode.CDC_UTILITY_ACTIVE));
if (log.isInfoEnabled())
log.info("CDC mode switched [mode=" + cdcModeState + ']');
return true;
default:
throw new IgniteException("Unexpected record [type=" + walRecord.type() + ']');
}
interrupted = Thread.interrupted();
}
if (interrupted)
throw new IgniteException("Change Data Capture Application interrupted");
return false;
}
catch (IgniteCheckedException | IOException e) {
throw new IgniteException(e);
}
}
/** Metadata update. */
private void updateMetadata() {
long start = System.currentTimeMillis();
updateMappings();
updateTypes();
updateCaches();
metaUpdate.value(System.currentTimeMillis() - start);
}
/** Search for new or changed {@link BinaryType} and notifies the consumer. */
private void updateTypes() {
try {
File[] files = binaryMeta.listFiles();
if (files == null)
return;
Iterator<BinaryType> changedTypes = Arrays.stream(files)
.filter(p -> p.toString().endsWith(METADATA_FILE_SUFFIX))
.map(f -> {
int typeId = BinaryUtils.typeId(f.getName());
long lastModified = f.lastModified();
// Filter out files already in `typesState` with the same last modify date.
if (typesState.containsKey(typeId) && lastModified == typesState.get(typeId))
return null;
typesState.put(typeId, lastModified);
try {
kctx.cacheObjects().cacheMetadataLocally(binaryMeta, typeId);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
return kctx.cacheObjects().metadata(typeId);
})
.filter(Objects::nonNull)
.iterator();
if (!changedTypes.hasNext())
return;
consumer.onTypes(changedTypes);
if (changedTypes.hasNext())
throw new IllegalStateException("Consumer should handle all changed types");
state.saveTypes(typesState);
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/** Search for new or changed {@link TypeMapping} and notifies the consumer. */
private void updateMappings() {
try {
File[] files = marshaller.listFiles(BinaryUtils::notTmpFile);
if (files == null)
return;
Iterator<TypeMapping> changedMappings = typeMappingIterator(
files,
tm -> mappingsState.add(new T2<>(tm.typeId(), (byte)tm.platformType().ordinal()))
);
if (!changedMappings.hasNext())
return;
consumer.onMappings(changedMappings);
if (changedMappings.hasNext())
throw new IllegalStateException("Consumer should handle all changed mappings");
state.saveMappings(mappingsState);
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/** Search for new or changed {@link CdcCacheEvent} and notifies the consumer. */
private void updateCaches() {
try {
if (!dbDir.exists())
return;
Set<Integer> destroyed = new HashSet<>(cachesState.keySet());
Iterator<CdcCacheEvent> cacheEvts = GridLocalConfigManager
.readCachesData(
dbDir,
MarshallerUtils.jdkMarshaller(kctx.igniteInstanceName()),
igniteCfg)
.entrySet().stream()
.map(data -> {
int cacheId = data.getValue().cacheId();
long lastModified = data.getKey().lastModified();
destroyed.remove(cacheId);
Long lastModified0 = cachesState.get(cacheId);
if (lastModified0 != null && lastModified0 == lastModified)
return null;
cachesState.put(cacheId, lastModified);
return (CdcCacheEvent)data.getValue();
})
.filter(Objects::nonNull)
.iterator();
consumer.onCacheEvents(cacheEvts);
if (cacheEvts.hasNext())
throw new IllegalStateException("Consumer should handle all cache change events");
if (!destroyed.isEmpty()) {
Iterator<Integer> destroyedIter = destroyed.iterator();
consumer.onCacheDestroyEvents(destroyedIter);
if (destroyedIter.hasNext())
throw new IllegalStateException("Consumer should handle all cache destroy events");
}
state.saveCaches(cachesState);
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/**
* Remove segment file if it already processed. {@link #walState} points to the last committed offset so all files
* before this offset can be removed.
*
* @param segment Segment to check.
* @return {@code True} if segment file was deleted, {@code false} otherwise.
*/
private boolean removeProcessedOnFailover(Path segment) {
long segmentIdx = segmentIndex(segment);
if (segmentIdx > walState.get1().index()) {
throw new IgniteException("Found segment greater then saved state. Some events are missed. Exiting! " +
"[state=" + walState + ", segment=" + segmentIdx + ']');
}
if (segmentIdx < walState.get1().index()) {
if (log.isInfoEnabled()) {
log.info("Already processed segment found. Skipping and deleting the file [segment=" +
segmentIdx + ", state=" + walState.get1().index() + ']');
}
// WAL segment is a hard link to a segment file in the special Change Data Capture folder.
// So, we can safely delete it after processing.
try {
Files.delete(segment);
return true;
}
catch (IOException e) {
throw new IgniteException(e);
}
}
return false;
}
/** Saves WAL consumption state and delete segments that no longer required. */
private void saveStateAndRemoveProcessed(T2<WALPointer, Integer> curState) throws IOException {
if (curState == null)
return;
if (log.isDebugEnabled())
log.debug("Saving state [curState=" + curState + ']');
state.saveWal(curState);
committedSegmentIdx.value(curState.get1().index());
committedSegmentOffset.value(curState.get1().fileOffset());
Iterator<Path> rmvIter = processedSegments.iterator();
while (rmvIter.hasNext()) {
Path processedSegment = rmvIter.next();
// Can't delete current segment, because state points to it.
if (segmentIndex(processedSegment) >= curState.get1().index())
continue;
// WAL segment is a hard link to a segment file in a specifal Change Data Capture folder.
// So we can safely delete it after success processing.
Files.delete(processedSegment);
rmvIter.remove();
}
}
/**
* Try locks Change Data Capture directory.
*
* @param dbStoreDirWithSubdirectory Root PDS directory.
* @return Lock or null if lock failed.
*/
private CdcFileLockHolder tryLock(File dbStoreDirWithSubdirectory) {
if (!dbStoreDirWithSubdirectory.exists()) {
log.warning("DB store directory not exists. Should be created by Ignite Node " +
" [dir=" + dbStoreDirWithSubdirectory + ']');
return null;
}
File cdcRoot = new File(igniteCfg.getDataStorageConfiguration().getCdcWalPath());
if (!cdcRoot.isAbsolute()) {
cdcRoot = new File(
igniteCfg.getWorkDirectory(),
igniteCfg.getDataStorageConfiguration().getCdcWalPath()
);
}
if (!cdcRoot.exists()) {
log.warning("CDC root directory not exists. Should be created by Ignite Node. " +
"Is Change Data Capture enabled in IgniteConfiguration? [dir=" + cdcRoot + ']');
return null;
}
Path cdcDir = Paths.get(cdcRoot.getAbsolutePath(), dbStoreDirWithSubdirectory.getName());
if (!Files.exists(cdcDir)) {
log.warning("CDC directory not exists. Should be created by Ignite Node. " +
"Is Change Data Capture enabled in IgniteConfiguration? [dir=" + cdcDir + ']');
return null;
}
this.cdcDir = cdcDir;
this.dbDir = dbStoreDirWithSubdirectory;
CdcFileLockHolder lock = new CdcFileLockHolder(cdcDir.toString(), "cdc.lock", log);
try {
lock.tryLock(cdcCfg.getLockTimeout());
return lock;
}
catch (IgniteCheckedException e) {
U.closeQuiet(lock);
if (log.isInfoEnabled()) {
log.info("Unable to acquire lock to lock CDC folder [dir=" + cdcRoot + "]" + NL +
"Reason: " + e.getMessage());
}
return null;
}
}
/** Stops the application. */
public void stop() {
synchronized (this) {
if (stopped || !started)
return;
if (log.isInfoEnabled())
log.info("Stopping Change Data Capture service instance");
stopped = true;
consumer.stop();
}
}
/** */
private void ackAsciiLogo() {
String ver = "ver. " + ACK_VER_STR;
if (log.isInfoEnabled()) {
log.info(NL + NL +
">>> __________ ________________ ________ _____" + NL +
">>> / _/ ___/ |/ / _/_ __/ __/ / ___/ _ \\/ ___/" + NL +
">>> _/ // (7 7 // / / / / _/ / /__/ // / /__ " + NL +
">>> /___/\\___/_/|_/___/ /_/ /___/ \\___/____/\\___/ " + NL +
">>> " + NL +
">>> " + ver + NL +
">>> " + COPYRIGHT + NL +
">>> " + NL +
">>> Ignite documentation: " + "http://" + SITE + NL +
">>> Consumer: " + U.toStringSafe(consumer.consumer()) + NL +
">>> ConsistentId: " + igniteCfg.getConsistentId() + NL
);
}
if (log.isQuiet()) {
U.quiet(false,
" __________ ________________ ________ _____",
" / _/ ___/ |/ / _/_ __/ __/ / ___/ _ \\/ ___/",
" _/ // (7 7 // / / / / _/ / /__/ // / /__ ",
"/___/\\___/_/|_/___/ /_/ /___/ \\___/____/\\___/ ",
"",
ver,
COPYRIGHT,
"",
"Ignite documentation: " + "http://" + SITE,
"Consumer: " + U.toStringSafe(consumer.consumer()),
"ConsistentId: " + igniteCfg.getConsistentId(),
"",
"Quiet mode.");
String fileName = log.fileName();
if (fileName != null)
U.quiet(false, " ^-- Logging to file '" + fileName + '\'');
if (log instanceof GridLoggerProxy)
U.quiet(false, " ^-- Logging by '" + ((GridLoggerProxy)log).getLoggerInfo() + '\'');
U.quiet(false,
" ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or \"-v\" to ignite-cdc.{sh|bat}",
"");
}
}
/** */
public static String cdcInstanceName(String igniteInstanceName) {
return "cdc-" + igniteInstanceName;
}
/**
* @param files Mapping files.
* @param filter Filter.
* @return Type mapping iterator.
*/
public static Iterator<TypeMapping> typeMappingIterator(File[] files, Predicate<TypeMapping> filter) {
return Arrays.stream(files)
.map(f -> {
String fileName = f.getName();
int typeId = BinaryUtils.mappedTypeId(fileName);
byte platformId = BinaryUtils.mappedFilePlatformId(fileName);
return (TypeMapping)new TypeMappingImpl(
typeId,
BinaryUtils.readMapping(f),
platformId == 0 ? PlatformType.JAVA : PlatformType.DOTNET);
})
.filter(filter)
.filter(Objects::nonNull)
.iterator();
}
}