blob: b1c289fc86cd66d94aa743c1295eec518c253204 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.geode.kafka.source;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.Struct;
import org.geode.kafka.GeodeContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqEvent;
public class GeodeKafkaSourceTask extends SourceTask {
private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceTask.class);
private static final String TASK_PREFIX = "TASK";
private static final String DOT = ".";
// property string to pass in to identify task
private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
private GeodeContext geodeContext;
private GeodeSourceConnectorConfig geodeConnectorConfig;
private EventBufferSupplier eventBufferSupplier;
private Map<String, List<String>> regionToTopics;
private Map<String, Map<String, String>> sourcePartitions;
private int batchSize;
private static Map<String, Long> createOffset() {
Map<String, Long> offset = new HashMap<>();
offset.put("OFFSET", 0L);
return offset;
}
@Override
public String version() {
return null;
}
@Override
public void start(Map<String, String> props) {
try {
geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
regionToTopics = geodeConnectorConfig.getRegionToTopics();
geodeConnectorConfig.getCqsToRegister();
sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
String cqPrefix = geodeConnectorConfig.getCqPrefix();
boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix,
loadEntireRegion);
} catch (Exception e) {
e.printStackTrace();
logger.error("Unable to start source task", e);
throw e;
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
for (GeodeEvent event : events) {
String regionName = event.getRegionName();
List<String> topics = regionToTopics.get(regionName);
for (String topic : topics) {
records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic,
null, event.getKey(), null, event.getValue()));
}
}
return records;
}
return null;
}
@Override
public void stop() {
geodeContext.getClientCache().close(true);
}
void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext,
EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
boolean isDurable = geodeConnectorConfig.isDurable();
int taskId = geodeConnectorConfig.getTaskId();
for (String region : geodeConnectorConfig.getCqsToRegister()) {
installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix,
loadEntireRegion, isDurable);
}
if (isDurable) {
geodeContext.getClientCache().readyForEvents();
}
}
GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId,
EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion,
boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
cqAttributesFactory.addCqListener(listener);
CqAttributes cqAttributes = cqAttributesFactory.create();
try {
if (loadEntireRegion) {
CqResults events =
geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName),
"select * from /" + regionName, cqAttributes,
isDurable);
eventBuffer.get().addAll((Collection<GeodeEvent>)events.stream().map(e -> new GeodeEvent(regionName, ((Struct)e).get("key"), ((Struct)e).get("value"))).collect(Collectors.toList()));
} else {
geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
"select * from /" + regionName, cqAttributes,
isDurable);
}
} finally {
listener.signalInitialResultsLoaded();
}
return listener;
}
/**
* converts a list of regions names into a map of source partitions
*
* @param regionNames list of regionNames
* @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
*/
Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) {
return regionNames.stream().map(regionName -> {
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put(GeodeSourceConnectorConfig.REGION_PARTITION, regionName);
return sourcePartition;
}).collect(Collectors.toMap(s -> s.get(GeodeSourceConnectorConfig.REGION_PARTITION), s -> s));
}
String generateCqName(int taskId, String cqPrefix, String regionName) {
return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
}
}