blob: 88fbbe0a6cc4ac0456b1d7f27c6aa429e283575c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaException;
import org.junit.Test;
import static org.apache.samza.config.StorageConfig.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestStorageConfig {
private static final String STORE_NAME0 = "store0";
private static final String STORE_NAME1 = "store1";
@Test
public void testGetStoreNames() {
// empty config, so no stores
assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig()).getStoreNames());
Set<String> expectedStoreNames = ImmutableSet.of(STORE_NAME0, STORE_NAME1);
// has stores
StorageConfig storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "store0.factory.class",
String.format(StorageConfig.FACTORY, STORE_NAME1), "store1.factory.class")));
List<String> actual = storageConfig.getStoreNames();
// ordering shouldn't matter
assertEquals(2, actual.size());
assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
//has side input stores
StorageConfig config = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(FACTORY, STORE_NAME0), "store0.factory.class",
String.format(StorageConfig.SIDE_INPUTS_PROCESSOR_FACTORY, STORE_NAME1), "store1.factory.class")));
actual = storageConfig.getStoreNames();
assertEquals(2, actual.size());
assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
}
@Test
public void testGetChangelogStream() {
// empty config, so no changelog stream
assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getChangelogStream(STORE_NAME0));
// store has empty string for changelog stream
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "")));
assertEquals(Optional.empty(), storageConfig.getChangelogStream(STORE_NAME0));
// store has full changelog system-stream defined
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0),
"changelog-system.changelog-stream0")));
assertEquals(Optional.of("changelog-system.changelog-stream0"), storageConfig.getChangelogStream(STORE_NAME0));
// store has changelog stream defined, but system comes from job.changelog.system
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "changelog-stream0",
StorageConfig.CHANGELOG_SYSTEM, "changelog-system")));
assertEquals(Optional.of("changelog-system.changelog-stream0"), storageConfig.getChangelogStream(STORE_NAME0));
// batch mode: create unique stream name
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0),
"changelog-system.changelog-stream0", ApplicationConfig.APP_MODE,
ApplicationConfig.ApplicationMode.BATCH.name().toLowerCase(), ApplicationConfig.APP_RUN_ID, "run-id")));
assertEquals(Optional.of("changelog-system.changelog-stream0-run-id"),
storageConfig.getChangelogStream(STORE_NAME0));
// job has no changelog stream defined
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", JobConfig.JOB_DEFAULT_SYSTEM,
"should-not-be-used")));
assertEquals(Optional.empty(), storageConfig.getChangelogStream(STORE_NAME0));
// job.changelog.system takes precedence over job.default.system when changelog is specified as just streamName
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", JobConfig.JOB_DEFAULT_SYSTEM,
"should-not-be-used", String.format(CHANGELOG_STREAM, STORE_NAME0), "streamName")));
assertEquals("changelog-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
// job.changelog.system takes precedence over job.default.system when changelog is specified as {systemName}.{streamName}
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", JobConfig.JOB_DEFAULT_SYSTEM,
"should-not-be-used", String.format(CHANGELOG_STREAM, STORE_NAME0), "changelog-system.streamName")));
assertEquals("changelog-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
// systemName specified using stores.{storeName}.changelog = {systemName}.{streamName} should take precedence even
// when job.changelog.system and job.default.system are specified
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "default-changelog-system",
JobConfig.JOB_DEFAULT_SYSTEM, "default-system",
String.format(CHANGELOG_STREAM, STORE_NAME0), "nondefault-changelog-system.streamName")));
assertEquals("nondefault-changelog-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
// fall back to job.default.system if job.changelog.system is not specified
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "default-system", String.format(CHANGELOG_STREAM, STORE_NAME0), "streamName")));
assertEquals("default-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
}
@Test(expected = SamzaException.class)
public void testGetChangelogStreamMissingSystem() {
StorageConfig storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "changelog-stream0")));
storageConfig.getChangelogStream(STORE_NAME0);
}
@Test
public void testGetAccessLogEnabled() {
// empty config, access log disabled
assertFalse(new StorageConfig(new MapConfig()).getAccessLogEnabled(STORE_NAME0));
assertFalse(new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.ACCESSLOG_ENABLED, STORE_NAME0), "false"))).getAccessLogEnabled(
STORE_NAME0));
assertTrue(new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.ACCESSLOG_ENABLED, STORE_NAME0), "true"))).getAccessLogEnabled(
STORE_NAME0));
}
@Test
public void testGetAccessLogStream() {
String changelogStream = "changelog-stream";
assertEquals(changelogStream + "-" + StorageConfig.ACCESSLOG_STREAM_SUFFIX,
new StorageConfig(new MapConfig()).getAccessLogStream(changelogStream));
}
@Test
public void testGetAccessLogSamplingRatio() {
// empty config, return default sampling ratio
assertEquals(StorageConfig.DEFAULT_ACCESSLOG_SAMPLING_RATIO,
new StorageConfig(new MapConfig()).getAccessLogSamplingRatio(STORE_NAME0));
assertEquals(40, new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.ACCESSLOG_SAMPLING_RATIO, STORE_NAME0),
"40"))).getAccessLogSamplingRatio(STORE_NAME0));
}
@Test
public void testGetStorageFactoryClassName() {
// empty config, so no factory
assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getStorageFactoryClassName(STORE_NAME0));
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "my.factory.class")));
assertEquals(Optional.of("my.factory.class"), storageConfig.getStorageFactoryClassName(STORE_NAME0));
}
@Test
public void testGetStorageKeySerde() {
// empty config, so no key serde
assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getStorageKeySerde(STORE_NAME0));
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(StorageConfig.KEY_SERDE, STORE_NAME0), "my.key.serde.class")));
assertEquals(Optional.of("my.key.serde.class"), storageConfig.getStorageKeySerde(STORE_NAME0));
}
@Test
public void testGetStorageMsgSerde() {
// empty config, so no msg serde
assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getStorageMsgSerde(STORE_NAME0));
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(StorageConfig.MSG_SERDE, STORE_NAME0), "my.msg.serde.class")));
assertEquals(Optional.of("my.msg.serde.class"), storageConfig.getStorageMsgSerde(STORE_NAME0));
}
@Test
public void testGetSideInputs() {
// empty config, so no system
assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig()).getSideInputs(STORE_NAME0));
// single side input
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(StorageConfig.SIDE_INPUTS, STORE_NAME0), "side-input")));
assertEquals(Collections.singletonList("side-input"), storageConfig.getSideInputs(STORE_NAME0));
// multiple side inputs
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.SIDE_INPUTS, STORE_NAME0), "side-input0,side-input1")));
assertEquals(ImmutableList.of("side-input0", "side-input1"), storageConfig.getSideInputs(STORE_NAME0));
// ignore whitespace
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.SIDE_INPUTS, STORE_NAME0), ", side-input0 ,,side-input1,")));
assertEquals(ImmutableList.of("side-input0", "side-input1"), storageConfig.getSideInputs(STORE_NAME0));
}
@Test
public void testGetSideInputsProcessorFactory() {
// empty config, so no factory
assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getSideInputsProcessorFactory(STORE_NAME0));
StorageConfig storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.SIDE_INPUTS_PROCESSOR_FACTORY, STORE_NAME0),
"my.side.inputs.factory.class")));
assertEquals(Optional.of("my.side.inputs.factory.class"), storageConfig.getSideInputsProcessorFactory(STORE_NAME0));
}
@Test
public void testGetSideInputsProcessorSerializedInstance() {
// empty config, so no factory
assertEquals(Optional.empty(),
new StorageConfig(new MapConfig()).getSideInputsProcessorSerializedInstance(STORE_NAME0));
StorageConfig storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, STORE_NAME0),
"serialized_instance")));
assertEquals(Optional.of("serialized_instance"),
storageConfig.getSideInputsProcessorSerializedInstance(STORE_NAME0));
}
@Test
public void testGetChangeLogDeleteRetentionInMs() {
// empty config, return default sampling ratio
assertEquals(StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS,
new StorageConfig(new MapConfig()).getChangeLogDeleteRetentionInMs(STORE_NAME0));
StorageConfig storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.CHANGELOG_DELETE_RETENTION_MS, STORE_NAME0),
Long.toString(StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS * 2))));
assertEquals(StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS * 2,
storageConfig.getChangeLogDeleteRetentionInMs(STORE_NAME0));
}
@Test
public void testIsChangelogSystem() {
StorageConfig storageConfig = new StorageConfig(new MapConfig(ImmutableMap.of(
// store0 has a changelog stream
String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class",
String.format(CHANGELOG_STREAM, STORE_NAME0), "system0.changelog-stream",
// store1 does not have a changelog stream
String.format(StorageConfig.FACTORY, STORE_NAME1), "factory.class")));
assertTrue(storageConfig.isChangelogSystem("system0"));
assertFalse(storageConfig.isChangelogSystem("other-system"));
}
@Test
public void testHasDurableStores() {
// no changelog, which means no durable stores
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class")));
assertFalse(storageConfig.hasDurableStores());
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class",
String.format(CHANGELOG_STREAM, STORE_NAME0), "system0.changelog-stream")));
assertTrue(storageConfig.hasDurableStores());
}
@Test
public void testGetChangelogMaxMsgSizeBytes() {
// empty config, return default size
assertEquals(StorageConfig.DEFAULT_CHANGELOG_MAX_MSG_SIZE_BYTES, new StorageConfig(new MapConfig()).getChangelogMaxMsgSizeBytes(STORE_NAME0));
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(StorageConfig.CHANGELOG_MAX_MSG_SIZE_BYTES, STORE_NAME0), "10")));
assertEquals(10, storageConfig.getChangelogMaxMsgSizeBytes(STORE_NAME0));
}
@Test
public void testGetDisallowLargeMessages() {
// empty config, return default size
assertEquals(StorageConfig.DEFAULT_DISALLOW_LARGE_MESSAGES, new StorageConfig(new MapConfig()).getDisallowLargeMessages(STORE_NAME0));
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(StorageConfig.DISALLOW_LARGE_MESSAGES, STORE_NAME0), "true")));
assertEquals(true, storageConfig.getDisallowLargeMessages(STORE_NAME0));
}
@Test
public void testGetDropLargeMessages() {
// empty config, return default size
assertEquals(StorageConfig.DEFAULT_DROP_LARGE_MESSAGES, new StorageConfig(new MapConfig()).getDropLargeMessages(STORE_NAME0));
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(StorageConfig.DROP_LARGE_MESSAGES, STORE_NAME0), "true")));
assertEquals(true, storageConfig.getDropLargeMessages(STORE_NAME0));
}
@Test
public void testGetChangelogMinCompactionLagMs() {
// empty config, return default lag ms
Map<String, String> configMap = new HashMap<>();
assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS,
new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
// override with configured default
long defaultLagOverride = TimeUnit.HOURS.toMillis(8);
configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, "default"), String.valueOf(defaultLagOverride));
assertEquals(defaultLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
// override for specific store
long storeSpecificLagOverride = TimeUnit.HOURS.toMillis(6);
configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), String.valueOf(storeSpecificLagOverride));
assertEquals(storeSpecificLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
}
}