| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.kafka.source; |
| |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.kafka.common.config.ConfigDef; |
| |
| import org.apache.geode.kafka.GeodeConnectorConfig; |
| |
| public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { |
| |
| public static final ConfigDef SOURCE_CONFIG_DEF = configurables(); |
| |
| // Geode Configuration |
| public static final String DURABLE_CLIENT_ID_PREFIX = "durable-client-id-prefix"; |
| public static final String DEFAULT_DURABLE_CLIENT_ID = ""; |
| public static final String DURABLE_CLIENT_TIME_OUT = "durable-client-timeout"; |
| public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000"; |
| |
| public static final String CQ_PREFIX = "cq-prefix"; |
| public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka"; |
| |
| /** |
| * Used as a key for source partitions |
| */ |
| public static final String REGION_PARTITION = "regionPartition"; |
| public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics"; |
| public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]"; |
| public static final String CQS_TO_REGISTER = "cqsToRegister"; // used internally so that only 1 |
| // task will register a cq |
| |
| public static final String BATCH_SIZE = "geode-connector-batch-size"; |
| public static final String DEFAULT_BATCH_SIZE = "100"; |
| |
| public static final String QUEUE_SIZE = "geode-connector-queue-size"; |
| public static final String DEFAULT_QUEUE_SIZE = "10000"; |
| |
| public static final String LOAD_ENTIRE_REGION = "load-entire-region"; |
| public static final String DEFAULT_LOAD_ENTIRE_REGION = "false"; |
| |
| private final String durableClientId; |
| private final String durableClientTimeout; |
| private final String cqPrefix; |
| private final boolean loadEntireRegion; |
| private final int batchSize; |
| private final int queueSize; |
| |
| private final Map<String, List<String>> regionToTopics; |
| private final Collection<String> cqsToRegister; |
| |
| public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) { |
| super(SOURCE_CONFIG_DEF, connectorProperties); |
| cqsToRegister = parseRegionToTopics(getString(CQS_TO_REGISTER)).keySet(); |
| regionToTopics = parseRegionToTopics(getString(REGION_TO_TOPIC_BINDINGS)); |
| String durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX); |
| if (isDurable(durableClientIdPrefix)) { |
| durableClientId = durableClientIdPrefix + taskId; |
| } else { |
| durableClientId = ""; |
| } |
| durableClientTimeout = getString(DURABLE_CLIENT_TIME_OUT); |
| cqPrefix = getString(CQ_PREFIX); |
| loadEntireRegion = getBoolean(LOAD_ENTIRE_REGION); |
| batchSize = getInt(BATCH_SIZE); |
| queueSize = getInt(QUEUE_SIZE); |
| } |
| |
| protected static ConfigDef configurables() { |
| ConfigDef configDef = GeodeConnectorConfig.configurables(); |
| configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, |
| "Internally created and used parameter, for signalling a task to register cqs"); |
| configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING, |
| DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH, |
| "A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\""); |
| configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID, |
| ConfigDef.Importance.LOW, |
| "Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client"); |
| configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT, |
| ConfigDef.Importance.LOW, |
| "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated"); |
| configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW, |
| "Prefix string to identify Connector cq's on a Geode server"); |
| configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE, |
| ConfigDef.Importance.MEDIUM, "Maximum number of records to return on each poll"); |
| configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE, |
| ConfigDef.Importance.MEDIUM, |
| "Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue "); |
| configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION, |
| ConfigDef.Importance.MEDIUM, |
| "Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq"); |
| return configDef; |
| } |
| |
| public boolean isDurable() { |
| return isDurable(durableClientId); |
| } |
| |
| /** |
| * @param durableClientId or prefix can be passed in. Either both will be "" or both will have a |
| * value |
| */ |
| boolean isDurable(String durableClientId) { |
| return !durableClientId.equals(""); |
| } |
| |
| public int getTaskId() { |
| return taskId; |
| } |
| |
| public String getDurableClientId() { |
| return durableClientId; |
| } |
| |
| public String getDurableClientTimeout() { |
| return durableClientTimeout; |
| } |
| |
| public String getCqPrefix() { |
| return cqPrefix; |
| } |
| |
| public boolean getLoadEntireRegion() { |
| return loadEntireRegion; |
| } |
| |
| public Map<String, List<String>> getRegionToTopics() { |
| return regionToTopics; |
| } |
| |
| public Collection<String> getCqsToRegister() { |
| return cqsToRegister; |
| } |
| |
| public int getBatchSize() { |
| return batchSize; |
| } |
| |
| public int getQueueSize() { |
| return queueSize; |
| } |
| } |