blob: 01294c907a2d73aa433f80ad365195f4e72fa705 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.kafka.source;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.geode.kafka.GeodeConnectorConfig;
public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {
public static final ConfigDef SOURCE_CONFIG_DEF = configurables();
// Geode Configuration
public static final String DURABLE_CLIENT_ID_PREFIX = "durable-client-id-prefix";
public static final String DEFAULT_DURABLE_CLIENT_ID = "";
public static final String DURABLE_CLIENT_TIME_OUT = "durable-client-timeout";
public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
public static final String CQ_PREFIX = "cq-prefix";
public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
/**
* Used as a key for source partitions
*/
public static final String REGION_PARTITION = "regionPartition";
public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics";
public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]";
public static final String CQS_TO_REGISTER = "cqsToRegister"; // used internally so that only 1
// task will register a cq
public static final String BATCH_SIZE = "geode-connector-batch-size";
public static final String DEFAULT_BATCH_SIZE = "100";
public static final String QUEUE_SIZE = "geode-connector-queue-size";
public static final String DEFAULT_QUEUE_SIZE = "10000";
public static final String LOAD_ENTIRE_REGION = "load-entire-region";
public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
private final String durableClientId;
private final String durableClientTimeout;
private final String cqPrefix;
private final boolean loadEntireRegion;
private final int batchSize;
private final int queueSize;
private final Map<String, List<String>> regionToTopics;
private final Collection<String> cqsToRegister;
public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
super(SOURCE_CONFIG_DEF, connectorProperties);
cqsToRegister = parseRegionToTopics(getString(CQS_TO_REGISTER)).keySet();
regionToTopics = parseRegionToTopics(getString(REGION_TO_TOPIC_BINDINGS));
String durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
if (isDurable(durableClientIdPrefix)) {
durableClientId = durableClientIdPrefix + taskId;
} else {
durableClientId = "";
}
durableClientTimeout = getString(DURABLE_CLIENT_TIME_OUT);
cqPrefix = getString(CQ_PREFIX);
loadEntireRegion = getBoolean(LOAD_ENTIRE_REGION);
batchSize = getInt(BATCH_SIZE);
queueSize = getInt(QUEUE_SIZE);
}
protected static ConfigDef configurables() {
ConfigDef configDef = GeodeConnectorConfig.configurables();
configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH,
"Internally created and used parameter, for signalling a task to register cqs");
configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING,
DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH,
"A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"");
configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID,
ConfigDef.Importance.LOW,
"Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client");
configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT,
ConfigDef.Importance.LOW,
"How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated");
configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW,
"Prefix string to identify Connector cq's on a Geode server");
configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE,
ConfigDef.Importance.MEDIUM, "Maximum number of records to return on each poll");
configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE,
ConfigDef.Importance.MEDIUM,
"Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue ");
configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION,
ConfigDef.Importance.MEDIUM,
"Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq");
return configDef;
}
public boolean isDurable() {
return isDurable(durableClientId);
}
/**
* @param durableClientId or prefix can be passed in. Either both will be "" or both will have a
* value
*/
boolean isDurable(String durableClientId) {
return !durableClientId.equals("");
}
public int getTaskId() {
return taskId;
}
public String getDurableClientId() {
return durableClientId;
}
public String getDurableClientTimeout() {
return durableClientTimeout;
}
public String getCqPrefix() {
return cqPrefix;
}
public boolean getLoadEntireRegion() {
return loadEntireRegion;
}
public Map<String, List<String>> getRegionToTopics() {
return regionToTopics;
}
public Collection<String> getCqsToRegister() {
return cqsToRegister;
}
public int getBatchSize() {
return batchSize;
}
public int getQueueSize() {
return queueSize;
}
}