blob: 421016273e36532de348c85f17b221a24610f77e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.kafka.source;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.BATCH_SIZE;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.BATCH_SIZE_DISPLAY_NAME;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.BATCH_SIZE_DOCUMENTATION;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER_DISPLAY_NAME;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER_DOCUMENTATION;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQ_PREFIX;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQ_PREFIX_DISPLAY_NAME;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQ_PREFIX_DOCUMENTATION;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_BATCH_SIZE;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_CQ_PREFIX;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_DURABLE_CLIENT_ID;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_DURABLE_CLIENT_TIMEOUT;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_LOAD_ENTIRE_REGION;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_QUEUE_SIZE;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_REGION_TO_TOPIC_BINDING;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_ID_PREFIX;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_ID_PREFIX_DISPLAY_NAME;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_TIME_OUT;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_TIME_OUT_DISPLAY_NAME;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_TIME_OUT_DOCUMENTATION;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.LOAD_ENTIRE_REGION;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.LOAD_ENTIRE_REGION_DISPLAY_NAME;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.LOAD_ENTIRE_REGION_DOCUMENTATION;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.QUEUE_SIZE;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.QUEUE_SIZE_DISPLAY_NAME;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.QUEUE_SIZE_DOCUMENTATION;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS_DISPLAY_NAME;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS_DOCUMENTATION;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.SOURCE_GROUP;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.geode.kafka.GeodeConnectorConfig;
public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {
public static final ConfigDef SOURCE_CONFIG_DEF = configurables();
private final String durableClientId;
private final String durableClientTimeout;
private final String cqPrefix;
private final boolean loadEntireRegion;
private final int batchSize;
private final int queueSize;
private final Map<String, List<String>> regionToTopics;
private final Collection<String> cqsToRegister;
public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
super(SOURCE_CONFIG_DEF, connectorProperties);
cqsToRegister = parseRegionToTopics(getString(CQS_TO_REGISTER)).keySet();
regionToTopics = parseRegionToTopics(getString(REGION_TO_TOPIC_BINDINGS));
String durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
if (isDurable(durableClientIdPrefix)) {
durableClientId = durableClientIdPrefix + taskId;
} else {
durableClientId = "";
}
durableClientTimeout = getString(DURABLE_CLIENT_TIME_OUT);
cqPrefix = getString(CQ_PREFIX);
loadEntireRegion = getBoolean(LOAD_ENTIRE_REGION);
batchSize = getInt(BATCH_SIZE);
queueSize = getInt(QUEUE_SIZE);
}
protected static ConfigDef configurables() {
ConfigDef configDef = GeodeConnectorConfig.configurables();
configDef.define(
CQS_TO_REGISTER,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
CQS_TO_REGISTER_DOCUMENTATION,
SOURCE_GROUP,
1,
ConfigDef.Width.LONG,
CQS_TO_REGISTER_DISPLAY_NAME);
configDef.define(
REGION_TO_TOPIC_BINDINGS,
ConfigDef.Type.STRING,
DEFAULT_REGION_TO_TOPIC_BINDING,
ConfigDef.Importance.HIGH,
REGION_TO_TOPIC_BINDINGS_DOCUMENTATION,
SOURCE_GROUP,
2,
ConfigDef.Width.LONG,
REGION_TO_TOPIC_BINDINGS_DISPLAY_NAME);
configDef.define(
DURABLE_CLIENT_ID_PREFIX,
ConfigDef.Type.STRING,
DEFAULT_DURABLE_CLIENT_ID,
ConfigDef.Importance.LOW,
DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION,
SOURCE_GROUP,
3,
ConfigDef.Width.MEDIUM,
DURABLE_CLIENT_ID_PREFIX_DISPLAY_NAME);
configDef.define(DURABLE_CLIENT_TIME_OUT,
ConfigDef.Type.STRING,
DEFAULT_DURABLE_CLIENT_TIMEOUT,
ConfigDef.Importance.LOW,
DURABLE_CLIENT_TIME_OUT_DOCUMENTATION,
SOURCE_GROUP,
4,
ConfigDef.Width.MEDIUM,
DURABLE_CLIENT_TIME_OUT_DISPLAY_NAME);
configDef.define(CQ_PREFIX,
ConfigDef.Type.STRING,
DEFAULT_CQ_PREFIX,
ConfigDef.Importance.LOW,
CQ_PREFIX_DOCUMENTATION,
SOURCE_GROUP,
5,
ConfigDef.Width.MEDIUM,
CQ_PREFIX_DISPLAY_NAME);
configDef.define(
BATCH_SIZE,
ConfigDef.Type.INT,
DEFAULT_BATCH_SIZE,
ConfigDef.Importance.MEDIUM,
BATCH_SIZE_DOCUMENTATION,
SOURCE_GROUP,
6,
ConfigDef.Width.MEDIUM,
BATCH_SIZE_DISPLAY_NAME);
configDef.define(
QUEUE_SIZE,
ConfigDef.Type.INT,
DEFAULT_QUEUE_SIZE,
ConfigDef.Importance.MEDIUM,
QUEUE_SIZE_DOCUMENTATION,
SOURCE_GROUP,
7,
ConfigDef.Width.MEDIUM,
QUEUE_SIZE_DISPLAY_NAME);
configDef.define(LOAD_ENTIRE_REGION,
ConfigDef.Type.BOOLEAN,
DEFAULT_LOAD_ENTIRE_REGION,
ConfigDef.Importance.MEDIUM,
LOAD_ENTIRE_REGION_DOCUMENTATION,
SOURCE_GROUP,
8,
ConfigDef.Width.MEDIUM,
LOAD_ENTIRE_REGION_DISPLAY_NAME);
return configDef;
}
public boolean isDurable() {
return isDurable(durableClientId);
}
/**
* @param durableClientId or prefix can be passed in. Either both will be "" or both will have a
* value
*/
boolean isDurable(String durableClientId) {
return !durableClientId.equals("");
}
public int getTaskId() {
return taskId;
}
public String getDurableClientId() {
return durableClientId;
}
public String getDurableClientTimeout() {
return durableClientTimeout;
}
public String getCqPrefix() {
return cqPrefix;
}
public boolean getLoadEntireRegion() {
return loadEntireRegion;
}
public Map<String, List<String>> getRegionToTopics() {
return regionToTopics;
}
public Collection<String> getCqsToRegister() {
return cqsToRegister;
}
public int getBatchSize() {
return batchSize;
}
public int getQueueSize() {
return queueSize;
}
}