blob: 38c29383996473f5b06fc15e97ee19c898902234 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.cdc;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgnitePredicate;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
/**
* Transform {@link DataEntry} to {@link CdcEvent} and sends it to {@link CdcConsumer}.
*
* @see CdcMain
* @see CdcConsumer
*/
public class WalRecordsConsumer<K, V> {
/** Ignite logger. */
private final IgniteLogger log;
/** Data change events consumer. */
private final CdcConsumer consumer;
/** Operations types we interested in. */
private static final EnumSet<GridCacheOperation> OPERATIONS_TYPES = EnumSet.of(CREATE, UPDATE, DELETE, TRANSFORM);
/** Operations filter. */
private static final IgnitePredicate<? super DataEntry> OPERATIONS_FILTER = e -> {
if (!(e instanceof UnwrappedDataEntry))
throw new IllegalStateException("Unexpected data entry [type=" + e.getClass().getName() + ']');
if ((e.flags() & DataEntry.PRELOAD_FLAG) != 0 ||
(e.flags() & DataEntry.FROM_STORE_FLAG) != 0)
return false;
return OPERATIONS_TYPES.contains(e.op());
};
/**
* @param consumer User provided CDC consumer.
* @param log Logger.
*/
public WalRecordsConsumer(CdcConsumer consumer, IgniteLogger log) {
this.consumer = consumer;
this.log = log;
}
/**
* Handles record from the WAL.
* If this method return {@code true} then current offset in WAL will be stored and WAL iteration will be
* started from it on CDC application fail/restart.
*
* @param recs WAL records iterator.
* @return {@code True} if current offset in WAL should be commited.
*/
public boolean onRecords(Iterator<DataRecord> recs) {
Iterator<CdcEvent> evts = new Iterator<CdcEvent>() {
/** */
private Iterator<CdcEvent> entries;
@Override public boolean hasNext() {
advance();
return hasCurrent();
}
@Override public CdcEvent next() {
advance();
if (!hasCurrent())
throw new NoSuchElementException();
return entries.next();
}
private void advance() {
if (hasCurrent())
return;
while (recs.hasNext()) {
entries =
F.iterator(recs.next().writeEntries().iterator(), this::transform, true, OPERATIONS_FILTER);
if (entries.hasNext())
break;
entries = null;
}
}
private boolean hasCurrent() {
return entries != null && entries.hasNext();
}
/** */
private CdcEvent transform(DataEntry e) {
UnwrappedDataEntry ue = (UnwrappedDataEntry)e;
return new CdcEventImpl(
ue.unwrappedKey(),
ue.unwrappedValue(),
(e.flags() & DataEntry.PRIMARY_FLAG) != 0,
e.partitionId(),
e.writeVersion(),
e.cacheId()
);
}
};
return consumer.onEvents(evts);
}
/**
* Starts the consumer.
*
* @throws IgniteCheckedException If failed.
*/
public void start() throws IgniteCheckedException {
consumer.start();
if (log.isDebugEnabled())
log.debug("WalRecordsConsumer started [consumer=" + consumer.getClass() + ']');
}
/**
* Stops the consumer.
* This methods can be invoked only after {@link #start()}.
*/
public void stop() {
consumer.stop();
if (log.isInfoEnabled())
log.info("WalRecordsConsumer stopped [consumer=" + consumer.getClass() + ']');
}
/** @return Change Data Capture Consumer. */
public CdcConsumer consumer() {
return consumer;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(WalRecordsConsumer.class, this);
}
}