blob: 6e3d43ba659be6606f4be3a9cf255f046afef814 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.stream.kafka.connect;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Task to consume remote cluster cache events from the grid and inject them into Kafka.
* <p>
* Note that a task will create a bounded queue in the grid for more reliable data transfer.
* Queue size can be changed by {@link IgniteSourceConstants#INTL_BUF_SIZE}.
*/
public class IgniteSourceTask extends SourceTask {
/** Logger. */
private static final Logger log = LoggerFactory.getLogger(IgniteSourceTask.class);
/** Tasks static monitor. */
private static final Object lock = new Object();
/** Event buffer size. */
private static int evtBufSize = 100000;
/** Event buffer. */
private static BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue<>(evtBufSize);
/** Max number of events taken from the buffer at once. */
private static int evtBatchSize = 100;
/** Flag for stopped state. */
private static volatile boolean stopped = true;
/** Ignite grid configuration file. */
private static String igniteCfgFile;
/** Cache name. */
private static String cacheName;
/** Remote Listener id. */
private static UUID rmtLsnrId;
/** Local listener. */
private static TaskLocalListener locLsnr = new TaskLocalListener();
/** User-defined filter. */
private static IgnitePredicate<CacheEvent> filter;
/** Topic. */
private static String topics[];
/** Offset. */
private static final Map<String, Long> offset = Collections.singletonMap("offset", 0L);
/** Partition. */
private static final Map<String, String> srcPartition = Collections.singletonMap("cache", null);
/** {@inheritDoc} */
@Override public String version() {
return new IgniteSinkConnector().version();
}
/**
* Filtering is done remotely. Local listener buffers data for injection into Kafka.
*
* @param props Task properties.
*/
@Override public void start(Map<String, String> props) {
synchronized (lock) {
// Each task has the same parameters -- avoid setting more than once.
// Nothing to do if the task has been already started.
if (!stopped)
return;
cacheName = props.get(IgniteSourceConstants.CACHE_NAME);
igniteCfgFile = props.get(IgniteSourceConstants.CACHE_CFG_PATH);
topics = props.get(IgniteSourceConstants.TOPIC_NAMES).split("\\s*,\\s*");
if (props.containsKey(IgniteSourceConstants.INTL_BUF_SIZE))
evtBufSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BUF_SIZE));
if (props.containsKey(IgniteSourceConstants.INTL_BATCH_SIZE))
evtBatchSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BATCH_SIZE));
if (props.containsKey(IgniteSourceConstants.CACHE_FILTER_CLASS)) {
String filterCls = props.get(IgniteSourceConstants.CACHE_FILTER_CLASS);
if (filterCls != null && !filterCls.isEmpty()) {
try {
Class<? extends IgnitePredicate<CacheEvent>> clazz =
(Class<? extends IgnitePredicate<CacheEvent>>)Class.forName(filterCls);
filter = clazz.newInstance();
}
catch (Exception e) {
log.error("Failed to instantiate the provided filter! " +
"User-enabled filtering is ignored!", e);
}
}
}
TaskRemoteFilter rmtLsnr = new TaskRemoteFilter(cacheName);
try {
int[] evts = cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS));
rmtLsnrId = IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
.remoteListen(locLsnr, rmtLsnr, evts);
}
catch (Exception e) {
log.error("Failed to register event listener!", e);
throw new ConnectException(e);
}
finally {
stopped = false;
}
}
}
/** {@inheritDoc} */
@Override public List<SourceRecord> poll() throws InterruptedException {
ArrayList<SourceRecord> records = new ArrayList<>(evtBatchSize);
ArrayList<CacheEvent> evts = new ArrayList<>(evtBatchSize);
if (stopped)
return records;
try {
if (evtBuf.drainTo(evts, evtBatchSize) > 0) {
for (CacheEvent evt : evts) {
// schema and keys are ignored.
for (String topic : topics)
records.add(new SourceRecord(srcPartition, offset, topic, null, evt));
}
return records;
}
}
catch (IgniteException e) {
log.error("Error when polling event queue!", e);
}
// for shutdown.
return null;
}
/**
* Converts comma-delimited cache events strings to Ignite internal representation.
*
* @param evtPropsStr Comma-delimited cache event names.
* @return Ignite internal representation of cache events to be registered with the remote listener.
* @throws Exception If error.
*/
private int[] cacheEvents(String evtPropsStr) throws Exception {
String[] evtStr = evtPropsStr.split("\\s*,\\s*");
if (evtStr.length == 0)
return EventType.EVTS_CACHE;
int[] evts = new int[evtStr.length];
try {
for (int i = 0; i < evtStr.length; i++)
evts[i] = CacheEvt.valueOf(evtStr[i].toUpperCase()).getId();
}
catch (Exception e) {
log.error("Failed to recognize the provided cache event!", e);
throw new Exception(e);
}
return evts;
}
/**
* Stops the grid client.
*/
@Override public synchronized void stop() {
if (stopped)
return;
stopped = true;
stopRemoteListen();
IgniteGrid.getIgnite().close();
}
/**
* Stops the remote listener.
*/
protected void stopRemoteListen() {
if (rmtLsnrId != null)
IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
.stopRemoteListen(rmtLsnrId);
rmtLsnrId = null;
}
/**
* Used by unit test to avoid restart node and valid state of the <code>stopped</code> flag.
*
* @param stopped Stopped flag.
*/
protected static void setStopped(boolean stopped) {
IgniteSourceTask.stopped = stopped;
}
/**
* Local listener buffering cache events to be further sent to Kafka.
*/
private static class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> {
/** {@inheritDoc} */
@Override public boolean apply(UUID id, CacheEvent evt) {
try {
if (!evtBuf.offer(evt, 10, TimeUnit.MILLISECONDS))
log.error("Failed to buffer event {}", evt.name());
}
catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
}
/**
* Remote filter.
*/
private static class TaskRemoteFilter implements IgnitePredicate<CacheEvent> {
/** */
@IgniteInstanceResource
Ignite ignite;
/** Cache name. */
private final String cacheName;
/**
* @param cacheName Cache name.
*/
TaskRemoteFilter(String cacheName) {
this.cacheName = cacheName;
}
/** {@inheritDoc} */
@Override public boolean apply(CacheEvent evt) {
Affinity<Object> affinity = ignite.affinity(cacheName);
if (affinity.isPrimary(ignite.cluster().localNode(), evt.key())) {
// Process this event. Ignored on backups.
if (filter != null && filter.apply(evt))
return false;
return true;
}
return false;
}
}
/**
* Grid instance initialized on demand.
*/
private static class IgniteGrid {
/** Constructor. */
private IgniteGrid() {
// No-op.
}
/** Instance holder. */
private static class Holder {
/** */
private static final Ignite IGNITE = Ignition.start(igniteCfgFile);
}
/**
* Obtains grid instance.
*
* @return Grid instance.
*/
private static Ignite getIgnite() {
return Holder.IGNITE;
}
}
/** Cache events available for listening. */
private enum CacheEvt {
/** */
CREATED(EventType.EVT_CACHE_ENTRY_CREATED),
/** */
DESTROYED(EventType.EVT_CACHE_ENTRY_DESTROYED),
/** */
PUT(EventType.EVT_CACHE_OBJECT_PUT),
/** */
READ(EventType.EVT_CACHE_OBJECT_READ),
/** */
REMOVED(EventType.EVT_CACHE_OBJECT_REMOVED),
/** */
LOCKED(EventType.EVT_CACHE_OBJECT_LOCKED),
/** */
UNLOCKED(EventType.EVT_CACHE_OBJECT_UNLOCKED),
/** */
EXPIRED(EventType.EVT_CACHE_OBJECT_EXPIRED);
/** Internal Ignite event id. */
private final int id;
/**
* Constructor.
*
* @param id Internal Ignite event id.
*/
CacheEvt(int id) {
this.id = id;
}
/**
* Gets Ignite event id.
*
* @return Ignite event id.
*/
int getId() {
return id;
}
}
}