blob: f8888f6ae3a648195298a845dbbfca8596f79c89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config;
import com.google.common.collect.ImmutableMap;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class TestSystemConfig {
private static final String MOCK_SYSTEM_NAME1 = "mocksystem1";
private static final String MOCK_SYSTEM_NAME2 = "mocksystem2";
private static final String MOCK_SYSTEM_FACTORY_NAME1 =
String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME1);
private static final String MOCK_SYSTEM_FACTORY_NAME2 =
String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME2);
private static final String MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1";
private static final String MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2";
private static final String SAMZA_OFFSET_DEFAULT = "samza.offset.default";
/**
* Placeholder to help make sure the correct {@link SystemAdmin} is returned by {@link MockSystemFactory}. Do not mock
* any methods of this mock.
*/
private static final SystemAdmin SYSTEM_ADMIN1 = mock(SystemAdmin.class);
/**
* Placeholder to help make sure the correct {@link SystemAdmin} is returned by {@link MockSystemFactory}. Do not mock
* any methods of this mock.
*/
private static final SystemAdmin SYSTEM_ADMIN2 = mock(SystemAdmin.class);
@Test
public void testGetSystemFactory() {
Map<String, String> map = new HashMap<>();
map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1).get());
}
@Test
public void testGetSystemFactoryEmptyClassName() {
Map<String, String> map = new HashMap<>();
map.put(MOCK_SYSTEM_FACTORY_NAME1, "");
map.put(MOCK_SYSTEM_FACTORY_NAME2, " ");
SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
assertFalse(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1).isPresent());
assertFalse(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME2).isPresent());
}
@Test
public void testGetSystemNames() {
Map<String, String> map = new HashMap<>();
map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
map.put(MOCK_SYSTEM_FACTORY_NAME2, MOCK_SYSTEM_FACTORY_CLASSNAME2);
SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
assertEquals(2, systemConfig.getSystemNames().size());
assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME1));
assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME2));
}
@Test
public void testGetSystemAdmins() {
Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName());
SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
Map<String, SystemAdmin> expected = ImmutableMap.of(MOCK_SYSTEM_NAME1, SYSTEM_ADMIN1);
assertEquals(expected, systemConfig.getSystemAdmins());
}
@Test
public void testGetSystemAdmin() {
Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName());
SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
assertEquals(SYSTEM_ADMIN1, systemConfig.getSystemAdmin(MOCK_SYSTEM_NAME1));
assertNull(systemConfig.getSystemAdmin(MOCK_SYSTEM_NAME2));
}
@Test
public void testGetSystemFactories() {
Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName());
SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
Map<String, SystemFactory> actual = systemConfig.getSystemFactories();
assertEquals(actual.size(), 1);
assertTrue(actual.get(MOCK_SYSTEM_NAME1) instanceof MockSystemFactory);
}
@Test
public void testGetDefaultStreamProperties() {
String defaultStreamPrefix = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1);
String system1ConfigKey = "config1-key";
String system1ConfigValue = "config1-value";
String system2ConfigKey = "config2-key";
String system2ConfigValue = "config2-value";
Config config = new MapConfig(ImmutableMap.of(defaultStreamPrefix + system1ConfigKey, system1ConfigValue,
defaultStreamPrefix + system2ConfigKey, system2ConfigValue));
Config expected =
new MapConfig(ImmutableMap.of(system1ConfigKey, system1ConfigValue, system2ConfigKey, system2ConfigValue));
SystemConfig systemConfig = new SystemConfig(config);
assertEquals(expected, systemConfig.getDefaultStreamProperties(MOCK_SYSTEM_NAME1));
assertEquals(new MapConfig(), systemConfig.getDefaultStreamProperties(MOCK_SYSTEM_NAME2));
}
@Test
public void testGetSystemOffsetDefault() {
String system1OffsetDefault = "offset-system-1";
String system2OffsetDefault = "offset-system-2";
Config config = new MapConfig(ImmutableMap.of(
// default.stream.samza.offset.default set
String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, MOCK_SYSTEM_NAME1) + SAMZA_OFFSET_DEFAULT,
system1OffsetDefault,
// should not use this value since default.stream.samza.offset.default is set
String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + SAMZA_OFFSET_DEFAULT, "wrong-value",
// only samza.offset.default set
String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME2) + SAMZA_OFFSET_DEFAULT, system2OffsetDefault));
SystemConfig systemConfig = new SystemConfig(config);
assertEquals(system1OffsetDefault, systemConfig.getSystemOffsetDefault(MOCK_SYSTEM_NAME1));
assertEquals(system2OffsetDefault, systemConfig.getSystemOffsetDefault(MOCK_SYSTEM_NAME2));
assertEquals(SystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING, systemConfig.getSystemOffsetDefault("other-system"));
}
@Test
public void testGetSystemKeySerde() {
String system1KeySerde = "system1-key-serde";
String defaultStreamPrefixSystem1 = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1);
// value specified explicitly
Config config =
new MapConfig(ImmutableMap.of(defaultStreamPrefixSystem1 + StreamConfig.KEY_SERDE, system1KeySerde));
SystemConfig systemConfig = new SystemConfig(config);
assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get());
// default stream property is unspecified, try fall back config key
config = new MapConfig(
ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE,
system1KeySerde));
systemConfig = new SystemConfig(config);
assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get());
// default stream property is empty string, try fall back config key
config = new MapConfig(ImmutableMap.of(
// default stream property is empty
defaultStreamPrefixSystem1 + StreamConfig.KEY_SERDE, "",
// fall back entry
String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE, system1KeySerde));
systemConfig = new SystemConfig(config);
assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get());
// default stream property is unspecified, fall back is also empty
config = new MapConfig(
ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE,
""));
systemConfig = new SystemConfig(config);
assertFalse(systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).isPresent());
// default stream property is unspecified, fall back is also unspecified
config = new MapConfig();
systemConfig = new SystemConfig(config);
assertFalse(systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).isPresent());
}
@Test
public void testGetSystemMsgSerde() {
String system1MsgSerde = "system1-msg-serde";
String defaultStreamPrefixSystem1 = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1);
// value specified explicitly
Config config =
new MapConfig(ImmutableMap.of(defaultStreamPrefixSystem1 + StreamConfig.MSG_SERDE, system1MsgSerde));
SystemConfig systemConfig = new SystemConfig(config);
assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get());
// default stream property is unspecified, try fall back config msg
config = new MapConfig(
ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE,
system1MsgSerde));
systemConfig = new SystemConfig(config);
assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get());
// default stream property is empty string, try fall back config msg
config = new MapConfig(ImmutableMap.of(
// default stream property is empty
defaultStreamPrefixSystem1 + StreamConfig.MSG_SERDE, "",
// fall back entry
String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE, system1MsgSerde));
systemConfig = new SystemConfig(config);
assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get());
// default stream property is unspecified, fall back is also empty
config = new MapConfig(
ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE,
""));
systemConfig = new SystemConfig(config);
assertFalse(systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).isPresent());
// default stream property is unspecified, fall back is also unspecified
config = new MapConfig();
systemConfig = new SystemConfig(config);
assertFalse(systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).isPresent());
}
@Test
public void testDeleteCommittedMessages() {
Config config = new MapConfig(ImmutableMap.of(
// value is "true"
String.format(SystemConfig.DELETE_COMMITTED_MESSAGES, MOCK_SYSTEM_NAME1), "true",
// value is explicitly "false"
String.format(SystemConfig.DELETE_COMMITTED_MESSAGES, MOCK_SYSTEM_NAME2), "false"));
SystemConfig systemConfig = new SystemConfig(config);
assertTrue(systemConfig.deleteCommittedMessages(MOCK_SYSTEM_NAME1));
assertFalse(systemConfig.deleteCommittedMessages(MOCK_SYSTEM_NAME2));
assertFalse(systemConfig.deleteCommittedMessages("other-system")); // value is not specified
}
public static class MockSystemFactory implements SystemFactory {
@Override
public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
throw new UnsupportedOperationException("Unnecessary for test");
}
@Override
public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
throw new UnsupportedOperationException("Unnecessary for test");
}
@Override
public SystemAdmin getAdmin(String systemName, Config config) {
switch (systemName) {
case MOCK_SYSTEM_NAME1:
return SYSTEM_ADMIN1;
case MOCK_SYSTEM_NAME2:
return SYSTEM_ADMIN2;
default:
throw new UnsupportedOperationException("System name unsupported: " + systemName);
}
}
}
private static String buildDefaultStreamPropertiesPrefix(String systemName) {
return String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, systemName);
}
}