blob: a2bb53475e576bd271e558859e76646311de3378 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.cdc;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridLoggerProxy;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.resource.GridResourceIoc;
import org.apache.ignite.internal.processors.resource.GridResourceLoggerInjector;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.SpringApplicationContextResource;
import org.apache.ignite.resources.SpringResource;
import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
import static org.apache.ignite.internal.IgniteKernal.NL;
import static org.apache.ignite.internal.IgniteKernal.SITE;
import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
/**
* Change Data Capture (CDC) application.
* The application runs independently of Ignite node process and provides the ability
* for the {@link CdcConsumer} to consume events({@link CdcEvent}) from WAL segments.
* The user should provide {@link CdcConsumer} implementation with custom consumption logic.
*
* Ignite node should be explicitly configured for using {@link CdcMain}.
* <ol>
* <li>Set {@link DataStorageConfiguration#setCdcEnabled(boolean)} to true.</li>
* <li>Optional: Set {@link DataStorageConfiguration#setCdcWalPath(String)} to path to the directory
* to store WAL segments for CDC.</li>
* <li>Optional: Set {@link DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout for
* force WAL rollover, so new events will be available for consumptions with the predicted time.</li>
* </ol>
*
* When {@link DataStorageConfiguration#getCdcWalPath()} is true then Ignite node on each WAL segment
* rollover creates hard link to archive WAL segment in
* {@link DataStorageConfiguration#getCdcWalPath()} directory. {@link CdcMain} application takes
* segment file and consumes events from it.
* After successful consumption (see {@link CdcConsumer#onEvents(Iterator)}) WAL segment will be deleted
* from directory.
*
* Several Ignite nodes can be started on the same host.
* If your deployment done with custom consistent id then you should specify it via
* {@link IgniteConfiguration#setConsistentId(Serializable)} in provided {@link IgniteConfiguration}.
*
* Application works as follows:
* <ol>
* <li>Searches node work directory based on provided {@link IgniteConfiguration}.</li>
* <li>Awaits for the creation of CDC directory if it not exists.</li>
* <li>Acquires file lock to ensure exclusive consumption.</li>
* <li>Loads state of consumption if it exists.</li>
* <li>Infinitely waits for new available segment and processes it.</li>
* </ol>
*
* @see DataStorageConfiguration#setCdcEnabled(boolean)
* @see DataStorageConfiguration#setCdcWalPath(String)
* @see DataStorageConfiguration#setWalForceArchiveTimeout(long)
* @see CdcCommandLineStartup
* @see CdcConsumer
* @see DataStorageConfiguration#DFLT_WAL_CDC_PATH
*/
public class CdcMain implements Runnable {
/** */
public static final String ERR_MSG = "Persistence disabled. Capture Data Change can't run!";
/** State dir. */
public static final String STATE_DIR = "state";
/** Ignite configuration. */
private final IgniteConfiguration igniteCfg;
/** Spring resource context. */
private final GridSpringResourceContext ctx;
/** Change Data Capture configuration. */
private final CdcConfiguration cdcCfg;
/** WAL iterator factory. */
private final IgniteWalIteratorFactory factory;
/** Events consumer. */
private final WalRecordsConsumer<?, ?> consumer;
/** Logger. */
private final IgniteLogger log;
/** Change Data Capture directory. */
private Path cdcDir;
/** Binary meta directory. */
private File binaryMeta;
/** Marshaller directory. */
private File marshaller;
/** Change Data Capture state. */
private CdcConsumerState state;
/** Save state to start from. */
private WALPointer initState;
/** Stopped flag. */
private volatile boolean stopped;
/** Already processed segments. */
private final Set<Path> processedSegments = new HashSet<>();
/**
* @param cfg Ignite configuration.
* @param ctx Spring resource context.
* @param cdcCfg Change Data Capture configuration.
*/
public CdcMain(
IgniteConfiguration cfg,
GridSpringResourceContext ctx,
CdcConfiguration cdcCfg) {
igniteCfg = new IgniteConfiguration(cfg);
this.ctx = ctx;
this.cdcCfg = cdcCfg;
try {
U.initWorkDir(igniteCfg);
log = U.initLogger(igniteCfg, "ignite-cdc");
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
consumer = new WalRecordsConsumer<>(cdcCfg.getConsumer(), log);
factory = new IgniteWalIteratorFactory(log);
}
/** Runs Change Data Capture. */
@Override public void run() {
synchronized (this) {
if (stopped)
return;
}
try {
runX();
}
catch (Throwable e) {
log.error("Cdc error", e);
throw new IgniteException(e);
}
}
/** Runs Change Data Capture application with possible exception. */
public void runX() throws Exception {
ackAsciiLogo();
if (!CU.isPersistenceEnabled(igniteCfg)) {
log.error(ERR_MSG);
throw new IllegalArgumentException(ERR_MSG);
}
PdsFolderSettings<CdcFileLockHolder> settings =
new PdsFolderResolver<>(igniteCfg, log, null, this::tryLock).resolve();
if (settings == null) {
throw new IgniteException("Can't find folder to read WAL segments from based on provided configuration! " +
"[workDir=" + igniteCfg.getWorkDirectory() + ", consistentId=" + igniteCfg.getConsistentId() + ']');
}
CdcFileLockHolder lock = settings.getLockedFileLockHolder();
if (lock == null) {
File consIdDir = settings.persistentStoreNodePath();
lock = tryLock(consIdDir);
if (lock == null) {
throw new IgniteException(
"Can't acquire lock for Change Data Capture folder [dir=" + consIdDir.getAbsolutePath() + ']'
);
}
}
try {
String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 1).toString();
Files.createDirectories(cdcDir.resolve(STATE_DIR));
binaryMeta = CacheObjectBinaryProcessorImpl.binaryWorkDir(igniteCfg.getWorkDirectory(), consIdDir);
marshaller = MarshallerContextImpl.mappingFileStoreWorkDir(igniteCfg.getWorkDirectory());
if (log.isInfoEnabled()) {
log.info("Change Data Capture [dir=" + cdcDir + ']');
log.info("Ignite node Binary meta [dir=" + binaryMeta + ']');
log.info("Ignite node Marshaller [dir=" + marshaller + ']');
}
injectResources(consumer.consumer());
state = new CdcConsumerState(cdcDir.resolve(STATE_DIR));
initState = state.load();
if (initState != null && log.isInfoEnabled())
log.info("Initial state loaded [state=" + initState + ']');
consumer.start();
try {
consumeWalSegmentsUntilStopped();
}
finally {
consumer.stop();
if (log.isInfoEnabled())
log.info("Ignite Change Data Capture Application stopped.");
}
}
finally {
U.closeQuiet(lock);
}
}
/** Waits and consumes new WAL segments until stopped. */
public void consumeWalSegmentsUntilStopped() {
try {
Set<Path> seen = new HashSet<>();
AtomicLong lastSgmnt = new AtomicLong(-1);
while (!stopped) {
try (Stream<Path> cdcFiles = Files.walk(cdcDir, 1)) {
Set<Path> exists = new HashSet<>();
cdcFiles
.peek(exists::add) // Store files that exists in cdc dir.
// Need unseen WAL segments only.
.filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
.peek(seen::add) // Adds to seen.
.sorted(Comparator.comparingLong(this::segmentIndex)) // Sort by segment index.
.peek(p -> {
long nextSgmnt = segmentIndex(p);
assert lastSgmnt.get() == -1 || nextSgmnt - lastSgmnt.get() == 1;
lastSgmnt.set(nextSgmnt);
})
.forEach(this::consumeSegment); // Consuming segments.
seen.removeIf(p -> !exists.contains(p)); // Clean up seen set.
}
if (!stopped)
U.sleep(cdcCfg.getCheckFrequency());
}
}
catch (IOException | IgniteInterruptedCheckedException e) {
throw new IgniteException(e);
}
}
/** Reads all available records from segment. */
private void consumeSegment(Path segment) {
if (log.isInfoEnabled())
log.info("Processing WAL segment [segment=" + segment + ']');
IgniteWalIteratorFactory.IteratorParametersBuilder builder =
new IgniteWalIteratorFactory.IteratorParametersBuilder()
.log(log)
.binaryMetadataFileStoreDir(binaryMeta)
.marshallerMappingFileStoreDir(marshaller)
.keepBinary(cdcCfg.isKeepBinary())
.filesOrDirs(segment.toFile())
.addFilter((type, ptr) -> type == DATA_RECORD_V2);
if (initState != null) {
long segmentIdx = segmentIndex(segment);
if (segmentIdx > initState.index()) {
throw new IgniteException("Found segment greater then saved state. Some events are missed. Exiting! " +
"[state=" + initState + ", segment=" + segmentIdx + ']');
}
if (segmentIdx < initState.index()) {
if (log.isInfoEnabled()) {
log.info("Already processed segment found. Skipping and deleting the file [segment=" +
segmentIdx + ", state=" + initState.index() + ']');
}
// WAL segment is a hard link to a segment file in the special Change Data Capture folder.
// So, we can safely delete it after processing.
try {
Files.delete(segment);
return;
}
catch (IOException e) {
throw new IgniteException(e);
}
}
builder.from(initState);
initState = null;
}
try (WALIterator it = factory.iterator(builder)) {
boolean interrupted = Thread.interrupted();
while (it.hasNext() && !interrupted) {
Iterator<DataRecord> iter = F.iterator(it.iterator(), t -> (DataRecord)t.get2(), true);
boolean commit = consumer.onRecords(iter);
if (commit) {
assert it.lastRead().isPresent();
state.save(it.lastRead().get());
// Can delete after new file state save.
if (!processedSegments.isEmpty()) {
// WAL segment is a hard link to a segment file in a specifal Change Data Capture folder.
// So we can safely delete it after success processing.
for (Path processedSegment : processedSegments) {
// Can't delete current segment, because state points to it.
if (processedSegment.equals(segment))
continue;
Files.delete(processedSegment);
}
processedSegments.clear();
}
}
interrupted = Thread.interrupted();
}
if (interrupted)
throw new IgniteException("Change Data Capture Application interrupted");
processedSegments.add(segment);
} catch (IgniteCheckedException | IOException e) {
throw new IgniteException(e);
}
}
/**
* Try locks Change Data Capture directory.
*
* @param dbStoreDirWithSubdirectory Root PDS directory.
* @return Lock or null if lock failed.
*/
private CdcFileLockHolder tryLock(File dbStoreDirWithSubdirectory) {
if (!dbStoreDirWithSubdirectory.exists()) {
log.warning("DB store directory not exists [dir=" + dbStoreDirWithSubdirectory + ']');
return null;
}
File cdcRoot = new File(igniteCfg.getDataStorageConfiguration().getCdcWalPath());
if (!cdcRoot.isAbsolute()) {
cdcRoot = new File(
igniteCfg.getWorkDirectory(),
igniteCfg.getDataStorageConfiguration().getCdcWalPath()
);
}
if (!cdcRoot.exists()) {
log.warning("CDC root directory not exists. Should be created by Ignite Node. " +
"Is Change Data Capture enabled in IgniteConfiguration? [dir=" + cdcRoot + ']');
return null;
}
Path cdcDir = Paths.get(cdcRoot.getAbsolutePath(), dbStoreDirWithSubdirectory.getName());
if (!Files.exists(cdcDir)) {
log.warning("CDC directory not exists. Should be created by Ignite Node. " +
"Is Change Data Capture enabled in IgniteConfiguration? [dir=" + cdcDir + ']');
return null;
}
this.cdcDir = cdcDir;
CdcFileLockHolder lock = new CdcFileLockHolder(cdcDir.toString(), "cdc.lock", log);
try {
lock.tryLock(cdcCfg.getLockTimeout());
return lock;
}
catch (IgniteCheckedException e) {
U.closeQuiet(lock);
if (log.isInfoEnabled()) {
log.info("Unable to acquire lock to lock CDC folder [dir=" + cdcRoot + "]" + NL +
"Reason: " + e.getMessage());
}
return null;
}
}
/**
* @param segment WAL segment file.
* @return Segment index.
*/
public long segmentIndex(Path segment) {
String fn = segment.getFileName().toString();
return Long.parseLong(fn.substring(0, fn.indexOf('.')));
}
/** Stops the application. */
public void stop() {
synchronized (this) {
if (log.isInfoEnabled())
log.info("Stopping Change Data Capture service instance");
stopped = true;
}
}
/** */
private void injectResources(CdcConsumer dataConsumer) throws IgniteCheckedException {
GridResourceIoc ioc = new GridResourceIoc();
ioc.inject(
dataConsumer,
LoggerResource.class,
new GridResourceLoggerInjector(log),
null,
null
);
if (ctx != null) {
ioc.inject(
dataConsumer,
SpringResource.class,
ctx.springBeanInjector(),
null,
null
);
ioc.inject(
dataConsumer,
SpringApplicationContextResource.class,
ctx.springContextInjector(),
null,
null
);
}
}
/** */
private void ackAsciiLogo() {
String ver = "ver. " + ACK_VER_STR;
if (log.isInfoEnabled()) {
log.info(NL + NL +
">>> __________ ________________ ________ _____" + NL +
">>> / _/ ___/ |/ / _/_ __/ __/ / ___/ _ \\/ ___/" + NL +
">>> _/ // (7 7 // / / / / _/ / /__/ // / /__ " + NL +
">>> /___/\\___/_/|_/___/ /_/ /___/ \\___/____/\\___/ " + NL +
">>> " + NL +
">>> " + ver + NL +
">>> " + COPYRIGHT + NL +
">>> " + NL +
">>> Ignite documentation: " + "http://" + SITE + NL +
">>> Consumer: " + U.toStringSafe(consumer.consumer()) + NL +
">>> ConsistentId: " + igniteCfg.getConsistentId() + NL
);
}
if (log.isQuiet()) {
U.quiet(false,
" __________ ________________ ________ _____",
" / _/ ___/ |/ / _/_ __/ __/ / ___/ _ \\/ ___/",
" _/ // (7 7 // / / / / _/ / /__/ // / /__ ",
"/___/\\___/_/|_/___/ /_/ /___/ \\___/____/\\___/ ",
"",
ver,
COPYRIGHT,
"",
"Ignite documentation: " + "http://" + SITE,
"Consumer: " + U.toStringSafe(consumer.consumer()),
"ConsistentId: " + igniteCfg.getConsistentId(),
"",
"Quiet mode.");
String fileName = log.fileName();
if (fileName != null)
U.quiet(false, " ^-- Logging to file '" + fileName + '\'');
if (log instanceof GridLoggerProxy)
U.quiet(false, " ^-- Logging by '" + ((GridLoggerProxy)log).getLoggerInfo() + '\'');
U.quiet(false,
" ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or \"-v\" to ignite-cdc.{sh|bat}",
"");
}
}
}