blob: 184f309bd4919b4df551454ab2053cd742b1672a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.stream.kafka.connect;
import java.util.Collection;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Task to consume sequences of SinkRecords and write data to grid.
*/
public class IgniteSinkTask extends SinkTask {
/** Logger. */
private static final Logger log = LoggerFactory.getLogger(IgniteSinkTask.class);
/** Flag for stopped state. */
private static volatile boolean stopped = true;
/** Ignite grid configuration file. */
private static String igniteConfigFile;
/** Cache name. */
private static String cacheName;
/** Entry transformer. */
private static StreamSingleTupleExtractor<SinkRecord, Object, Object> extractor;
/** {@inheritDoc} */
@Override public String version() {
return new IgniteSinkConnector().version();
}
/**
* Initializes grid client from configPath.
*
* @param props Task properties.
*/
@Override public void start(Map<String, String> props) {
// Each task has the same parameters -- avoid setting more than once.
if (cacheName != null)
return;
cacheName = props.get(IgniteSinkConstants.CACHE_NAME);
igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH);
if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
StreamerContext.getStreamer().allowOverwrite(
Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));
if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
StreamerContext.getStreamer().perNodeBufferSize(
Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));
if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
StreamerContext.getStreamer().perNodeParallelOperations(
Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) {
String transformerCls = props.get(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
if (transformerCls != null && !transformerCls.isEmpty()) {
try {
Class<? extends StreamSingleTupleExtractor> clazz =
(Class<? extends StreamSingleTupleExtractor<SinkRecord, Object, Object>>)
Class.forName(transformerCls);
extractor = clazz.newInstance();
}
catch (Exception e) {
throw new ConnectException("Failed to instantiate the provided transformer!", e);
}
}
}
stopped = false;
}
/**
* Buffers records.
*
* @param records Records to inject into grid.
*/
@SuppressWarnings("unchecked")
@Override public void put(Collection<SinkRecord> records) {
try {
for (SinkRecord record : records) {
// Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
if (extractor != null) {
Map.Entry<Object, Object> entry = extractor.extract(record);
StreamerContext.getStreamer().addData(entry.getKey(), entry.getValue());
}
else {
if (record.key() != null) {
StreamerContext.getStreamer().addData(record.key(), record.value());
}
else {
log.error("Failed to stream a record with null key!");
}
}
}
}
catch (ConnectException e) {
log.error("Failed adding record", e);
throw new ConnectException(e);
}
}
/**
* Pushes buffered data to grid. Flush interval is configured by worker configurations.
*
* @param offsets Offset information.
*/
@Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (stopped)
return;
StreamerContext.getStreamer().flush();
}
/**
* Stops the grid client.
*/
@Override public void stop() {
if (stopped)
return;
stopped = true;
StreamerContext.getIgnite().close();
}
/**
* Used by unit test to avoid restart node and valid state of the <code>stopped</code> flag.
*
* @param stopped Stopped flag.
*/
protected static void setStopped(boolean stopped) {
IgniteSinkTask.stopped = stopped;
extractor = null;
}
/**
* Streamer context initializing grid and data streamer instances on demand.
*/
public static class StreamerContext {
/** Constructor. */
private StreamerContext() {
}
/** Instance holder. */
private static class Holder {
private static final Ignite IGNITE = Ignition.start(igniteConfigFile);
private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName);
}
/**
* Obtains grid instance.
*
* @return Grid instance.
*/
public static Ignite getIgnite() {
return Holder.IGNITE;
}
/**
* Obtains data streamer instance.
*
* @return Data streamer instance.
*/
public static IgniteDataStreamer getStreamer() {
return Holder.STREAMER;
}
}
}