blob: 9972f9dba15418a459cb701616606908eccab5f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.util.ReflectionUtil;
/**
* Config helper methods related to systems.
*/
public class SystemConfig extends MapConfig {
private static final String SYSTEMS_PREFIX = "systems.";
public static final String SYSTEM_ID_PREFIX = SYSTEMS_PREFIX + "%s.";
private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
public static final String SYSTEM_FACTORY_FORMAT = SYSTEMS_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
public static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_ID_PREFIX + "default.stream.";
// If true, automatically delete committed messages from streams whose committed messages can be deleted.
// A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
// set streams.{streamId}.samza.delete.committed.messages to true in the configuration.
@VisibleForTesting
static final String DELETE_COMMITTED_MESSAGES = SYSTEM_ID_PREFIX + "samza.delete.committed.messages";
private static final String EMPTY = "";
static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming";
static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest";
private Map<String, SystemAdmin> systemAdminMap = new HashMap<>();
public SystemConfig(Config config) {
super(config);
}
public Optional<String> getSystemFactory(String systemName) {
if (systemName == null) {
return Optional.empty();
}
String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, systemName);
String value = get(systemFactory, null);
return (StringUtils.isBlank(value)) ? Optional.empty() : Optional.of(value);
}
/**
* Get a list of system names.
*
* @return A list system names
*/
public List<String> getSystemNames() {
Config subConf = subset(SYSTEMS_PREFIX, true);
ArrayList<String> systemNames = new ArrayList<>();
for (Map.Entry<String, String> entry : subConf.entrySet()) {
String key = entry.getKey();
if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) {
systemNames.add(key.replace(SYSTEM_FACTORY_SUFFIX, EMPTY));
}
}
return systemNames;
}
/**
* Get {@link SystemAdmin} instances for all the systems defined in this config.
*
* @return map of system name to {@link SystemAdmin}
*/
public Map<String, SystemAdmin> getSystemAdmins(String adminLabel) {
if (systemAdminMap.isEmpty()) {
systemAdminMap = getSystemFactories().entrySet()
.stream()
.collect(Collectors.toMap(Entry::getKey,
systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue()
.getAdmin(systemNameToFactoryEntry.getKey(), this, adminLabel)));
}
return systemAdminMap;
}
public Map<String, SystemAdmin> getSystemAdmins() {
return getSystemAdmins("");
}
/**
* Get {@link SystemAdmin} instance for given system name.
*
* @param systemName System name
* @return SystemAdmin of the system if it exists, otherwise null.
*/
public SystemAdmin getSystemAdmin(String systemName, String adminLabel) {
if (systemAdminMap.containsKey(systemName) && systemAdminMap.get(systemName).isStopped()) {
systemAdminMap.clear();
}
return getSystemAdmins(adminLabel).get(systemName);
}
public SystemAdmin getSystemAdmin(String systemName) {
return getSystemAdmin(systemName, "");
}
/**
* Get {@link SystemFactory} instances for all the systems defined in this config.
*
* @return a map from system name to {@link SystemFactory}
*/
public Map<String, SystemFactory> getSystemFactories() {
Map<String, SystemFactory> systemFactories = getSystemNames().stream().collect(Collectors.toMap(
systemName -> systemName,
systemName -> {
String systemFactoryClassName = getSystemFactory(systemName).orElseThrow(() -> new SamzaException(
String.format("A stream uses system %s, which is missing from the configuration.", systemName)));
return ReflectionUtil.getObj(systemFactoryClassName, SystemFactory.class);
}));
return systemFactories;
}
/**
* Gets the system-wide defaults for streams.
*
* @param systemName the name of the system for which the defaults will be returned.
* @return a subset of the config with the system prefix removed.
*/
public Config getDefaultStreamProperties(String systemName) {
return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, systemName), true);
}
/**
* Get system offset default value.
* systems.'system'.default.stream.samza.offset.default is the config.
* systems.'system'.samza.offset.default is the deprecated setting, but needs to be checked for backward compatibility.
* @param systemName get config value for this system.
* @return value of system reset or default ("upcoming") if none set.
*/
public String getSystemOffsetDefault(String systemName) {
// first check stream system default
String systemOffsetDefault = get(String.format("systems.%s.default.stream.samza.offset.default", systemName));
// if not set, check the deprecated setting
if (StringUtils.isBlank(systemOffsetDefault)) {
systemOffsetDefault = get(String.format("systems.%s.samza.offset.default", systemName));
if (StringUtils.isBlank(systemOffsetDefault)) {
return SAMZA_SYSTEM_OFFSET_UPCOMING;
}
}
return systemOffsetDefault;
}
/**
* @param systemName name of the system
* @return the key serde for the {@code systemName}, or empty if it was not found
*/
public Optional<String> getSystemKeySerde(String systemName) {
return getSystemDefaultStreamProperty(systemName, StreamConfig.KEY_SERDE);
}
/**
* @param systemName name of the system
* @return the message serde for the {@code systemName}, or empty if it was not found
*/
public Optional<String> getSystemMsgSerde(String systemName) {
return getSystemDefaultStreamProperty(systemName, StreamConfig.MSG_SERDE);
}
/**
* @param systemName name of the system
* @return if messages committed to this system should automatically be deleted
*/
public boolean deleteCommittedMessages(String systemName) {
return getBoolean(String.format(DELETE_COMMITTED_MESSAGES, systemName), false);
}
/**
* Gets the system-wide default for the {@code propertyName} for the {@code systemName}.
* This will check in a couple of different config locations for the value.
*/
private Optional<String> getSystemDefaultStreamProperty(String systemName, String propertyName) {
Map<String, String> defaultStreamProperties = getDefaultStreamProperties(systemName);
String defaultStreamProperty = defaultStreamProperties.get(propertyName);
if (StringUtils.isNotEmpty(defaultStreamProperty)) {
return Optional.of(defaultStreamProperty);
} else {
String fallbackStreamProperty = get(String.format(SYSTEM_ID_PREFIX, systemName) + propertyName);
if (StringUtils.isNotEmpty(fallbackStreamProperty)) {
return Optional.of(fallbackStreamProperty);
} else {
return Optional.empty();
}
}
}
}