blob: 2dd670afb814da1b7e0600e075e5a3866e0e79da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.source.flink;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Apache Flink Ignite source implemented as a RichParallelSourceFunction.
*/
public class IgniteSource extends RichParallelSourceFunction<CacheEvent> {
/** Serial version uid. */
private static final long serialVersionUID = 1L;
/** Logger. */
private static final Logger log = LoggerFactory.getLogger(IgniteSource.class);
/** Default max number of events taken from the buffer at once. */
private static final int DFLT_EVT_BATCH_SIZE = 1;
/** Default number of milliseconds timeout for event buffer queue operation. */
private static final int DFLT_EVT_BUFFER_TIMEOUT = 10;
/** Event buffer. */
private BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue<>();
/** Remote Listener id. */
private UUID rmtLsnrId;
/** Flag for isRunning state. */
private volatile boolean isRunning;
/** Max number of events taken from the buffer at once. */
private int evtBatchSize = DFLT_EVT_BATCH_SIZE;
/** Number of milliseconds timeout for event buffer queue operation. */
private int evtBufTimeout = DFLT_EVT_BUFFER_TIMEOUT;
/** Local listener. */
private final TaskLocalListener locLsnr = new TaskLocalListener();
/** Ignite instance. */
@IgniteInstanceResource
private transient Ignite ignite;
/** Cache name. */
private final String cacheName;
/**
* Sets Ignite instance.
*
* @param ignite Ignite instance.
*/
public void setIgnite(Ignite ignite) {
this.ignite = ignite;
}
/**
* Sets Event Batch Size.
*
* @param evtBatchSize Event Batch Size.
*/
public void setEvtBatchSize(int evtBatchSize) {
this.evtBatchSize = evtBatchSize;
}
/**
* Sets Event Buffer timeout.
*
* @param evtBufTimeout Event Buffer timeout.
*/
public void setEvtBufTimeout(int evtBufTimeout) {
this.evtBufTimeout = evtBufTimeout;
}
/**
* @return Local Task Listener
*/
TaskLocalListener getLocLsnr() {
return locLsnr;
}
/**
* Default IgniteSource constructor.
*
* @param cacheName Cache name.
*/
public IgniteSource(String cacheName) {
this.cacheName = cacheName;
}
/**
* Starts Ignite source.
*
* @param filter User defined filter.
* @param cacheEvts Converts comma-delimited cache events strings to Ignite internal representation.
*/
@SuppressWarnings("unchecked")
public void start(IgnitePredicate<CacheEvent> filter, int... cacheEvts) {
A.notNull(cacheName, "Cache name");
TaskRemoteFilter rmtLsnr = new TaskRemoteFilter(cacheName, filter);
try {
synchronized (this) {
if (isRunning)
return;
isRunning = true;
rmtLsnrId = ignite.events(ignite.cluster().forCacheNodes(cacheName))
.remoteListen(locLsnr, rmtLsnr, cacheEvts);
}
}
catch (IgniteException e) {
log.error("Failed to register event listener!", e);
throw e;
}
}
/**
* Transfers data from grid.
*
* @param ctx SourceContext.
*/
@Override public void run(SourceContext<CacheEvent> ctx) {
List<CacheEvent> evts = new ArrayList<>(evtBatchSize);
try {
while (isRunning) {
// block here for some time if there is no events from source
CacheEvent firstEvt = evtBuf.poll(1, TimeUnit.SECONDS);
if (firstEvt != null)
evts.add(firstEvt);
if (evtBuf.drainTo(evts, evtBatchSize) > 0) {
synchronized (ctx.getCheckpointLock()) {
for (CacheEvent evt : evts)
ctx.collect(evt);
evts.clear();
}
}
}
}
catch (Exception e) {
if (X.hasCause(e, InterruptedException.class))
return; // Executing thread can be interrupted see cancel() javadoc.
log.error("Error while processing cache event of " + cacheName, e);
}
}
/** {@inheritDoc} */
@Override public void cancel() {
synchronized (this) {
if (!isRunning)
return;
isRunning = false;
if (rmtLsnrId != null && ignite != null) {
ignite.events(ignite.cluster().forCacheNodes(cacheName))
.stopRemoteListen(rmtLsnrId);
rmtLsnrId = null;
}
}
}
/**
* Local listener buffering cache events to be further sent to Flink.
*/
private class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> {
/** {@inheritDoc} */
@Override public boolean apply(UUID id, CacheEvent evt) {
try {
if (!evtBuf.offer(evt, evtBufTimeout, TimeUnit.MILLISECONDS))
log.error("Failed to buffer event {}", evt.name());
}
catch (InterruptedException ignored) {
log.error("Failed to buffer event using local task listener {}", evt.name());
Thread.currentThread().interrupt(); // Restore interrupt flag.
}
return true;
}
}
}