blob: 93b712f029ffb0c9fcb15365c55ce4eafadb702e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.execution;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigRewriter;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.storage.SideInputsProcessor;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.table.descriptors.TestLocalTableDescriptor.MockLocalTableDescriptor;
import org.apache.samza.table.descriptors.TestLocalTableDescriptor.MockTableProviderFactory;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.operators.impl.store.TimestampedValueSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.StreamSpec;
import org.junit.Test;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
/**
* Unit test for {@link JobNodeConfigurationGenerator}
*/
public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase {
@Test
public void testConfigureSerdesWithRepartitionJoinApplication() {
mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig);
configureJobNode(mockStreamAppDesc);
// create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode
JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
// Verify the results
Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
validateJobConfig(expectedJobConfig, jobConfig);
// additional, check the computed window.ms for join
assertEquals("3600000", jobConfig.get(TaskConfig.WINDOW_MS));
Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 5);
validateStreamConfigures(jobConfig, deserializedSerdes);
validateJoinStoreConfigures(jobConfig, deserializedSerdes);
}
@Test
public void testConfigureSerdesForRepartitionWithNoDefaultSystem() {
// set the application to RepartitionOnlyStreamApplication
mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionOnlyStreamApplication(), mockConfig);
configureJobNode(mockStreamAppDesc);
// create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode
JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
// Verify the results
Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
validateJobConfig(expectedJobConfig, jobConfig);
Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 2);
validateStreamConfigures(jobConfig, null);
String partitionByKeySerde = jobConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde");
String partitionByMsgSerde = jobConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde");
assertTrue("Serialized serdes should not contain intermediate stream key serde",
!deserializedSerdes.containsKey(partitionByKeySerde));
assertTrue("Serialized serdes should not contain intermediate stream msg serde",
!deserializedSerdes.containsKey(partitionByMsgSerde));
}
@Test
public void testGenerateJobConfigWithTaskApplication() {
// set the application to TaskApplication, which still wire up all input/output/intermediate streams
TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getTaskApplication(), mockConfig);
configureJobNode(taskAppDesc);
// create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode
JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
// Verify the results
Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
validateJobConfig(expectedJobConfig, jobConfig);
Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 2);
validateStreamConfigures(jobConfig, deserializedSerdes);
}
@Test
public void testGenerateJobConfigWithLegacyTaskApplication() {
TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getLegacyTaskApplication(), mockConfig);
configureJobNode(taskAppDesc);
Map<String, String> originConfig = new HashMap<>(mockConfig);
// create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode
JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "");
// jobConfig should be exactly the same as original config
Map<String, String> generatedConfig = new HashMap<>(jobConfig);
assertEquals(originConfig, generatedConfig);
}
@Test
public void testBroadcastStreamApplication() {
// set the application to BroadcastStreamApplication
mockStreamAppDesc = new StreamApplicationDescriptorImpl(getBroadcastOnlyStreamApplication(defaultSerde), mockConfig);
configureJobNode(mockStreamAppDesc);
// create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode
JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
validateJobConfig(expectedJobConfig, jobConfig);
Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 2);
validateStreamSerdeConfigure(broadcastInputDesriptor.getStreamId(), jobConfig, deserializedSerdes);
validateIntermediateStreamConfigure(broadcastInputDesriptor.getStreamId(), broadcastInputDesriptor.getPhysicalName().get(), jobConfig);
}
@Test
public void testStreamApplicationWithTableAndSideInput() {
mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig);
// add table to the RepartitionJoinStreamApplication
GenericInputDescriptor<KV<String, Object>> sideInput1 = inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde);
BaseTableDescriptor mockTableDescriptor = new MockLocalTableDescriptor("testTable", defaultSerde)
.withSideInputs(Arrays.asList(sideInput1.getStreamId()))
.withSideInputsProcessor(mock(SideInputsProcessor.class, withSettings().serializable()))
.withConfig("mock.table.provider.config", "mock.config.value");
// add side input and terminate at table in the appplication
mockStreamAppDesc.getInputStream(sideInput1).sendTo(mockStreamAppDesc.getTable(mockTableDescriptor));
StreamEdge sideInputEdge = new StreamEdge(new StreamSpec(sideInput1.getStreamId(), "sideInput1",
inputSystemDescriptor.getSystemName()), false, false, mockConfig);
// need to put the sideInput related stream configuration to the original config
// TODO: this is confusing since part of the system and stream related configuration is generated outside the JobGraphConfigureGenerator
// It would be nice if all system and stream related configuration is generated in one place and only intermediate stream
// configuration is generated by JobGraphConfigureGenerator
Map<String, String> configs = new HashMap<>(mockConfig);
configs.putAll(sideInputEdge.generateConfig());
mockConfig = spy(new MapConfig(configs));
configureJobNode(mockStreamAppDesc);
// create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode
JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
validateJobConfig(expectedJobConfig, jobConfig);
Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 5);
validateTableConfigure(jobConfig, deserializedSerdes, mockTableDescriptor);
}
@Test
public void testTaskApplicationWithTableAndSideInput() {
// add table to the RepartitionJoinStreamApplication
GenericInputDescriptor<KV<String, Object>> sideInput1 = inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde);
BaseTableDescriptor mockTableDescriptor = new MockLocalTableDescriptor("testTable", defaultSerde)
.withSideInputs(Arrays.asList(sideInput1.getStreamId()))
.withSideInputsProcessor(mock(SideInputsProcessor.class, withSettings().serializable()))
.withConfig("mock.table.provider.config", "mock.config.value");
StreamEdge sideInputEdge = new StreamEdge(new StreamSpec(sideInput1.getStreamId(), "sideInput1",
inputSystemDescriptor.getSystemName()), false, false, mockConfig);
// need to put the sideInput related stream configuration to the original config
// TODO: this is confusing since part of the system and stream related configuration is generated outside the JobGraphConfigureGenerator
// It would be nice if all system and stream related configuration is generated in one place and only intermediate stream
// configuration is generated by JobGraphConfigureGenerator
Map<String, String> configs = new HashMap<>(mockConfig);
configs.putAll(sideInputEdge.generateConfig());
mockConfig = spy(new MapConfig(configs));
// set the application to TaskApplication, which still wire up all input/output/intermediate streams
TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getTaskApplication(), mockConfig);
// add table to the task application
taskAppDesc.withTable(mockTableDescriptor);
taskAppDesc.withInputStream(inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde));
configureJobNode(taskAppDesc);
// create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode
JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
// Verify the results
Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
validateJobConfig(expectedJobConfig, jobConfig);
Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 2);
validateStreamConfigures(jobConfig, deserializedSerdes);
validateTableConfigure(jobConfig, deserializedSerdes, mockTableDescriptor);
}
@Test
public void testConfigRewriter() {
Map<String, String> configs = new HashMap<>(mockConfig);
String streamCfgToOverride = String.format("streams.%s.samza.system", intermediateInputDescriptor.getStreamId());
configs.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, "mock"), MockConfigRewriter.class.getName());
configs.put(JobConfig.CONFIG_REWRITERS, "mock");
configs.put(String.format("job.config.rewriter.mock.%s", streamCfgToOverride), "rewritten-system");
mockConfig = spy(new MapConfig(configs));
mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig);
configureJobNode(mockStreamAppDesc);
JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
Config expectedConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
validateJobConfig(expectedConfig, jobConfig);
assertEquals("rewritten-system", jobConfig.get(streamCfgToOverride));
}
@Test(expected = SamzaException.class)
public void testJobNameConfigValidation() {
ImmutableMap<String, String> userConfigs =
ImmutableMap.of("job.name", "samza-job", "job.id", "1", "app.name", "samza-app");
ImmutableMap<String, String> generatedConfigs =
ImmutableMap.of("job.name", "samza-app", "job.id", "1", "app.name", "samza-app");
JobNodeConfigurationGenerator.validateJobConfigs(userConfigs, generatedConfigs);
}
@Test(expected = SamzaException.class)
public void testJobIdConfigValidation() {
ImmutableMap<String, String> userConfigs =
ImmutableMap.of("job.id", "1", "app.id", "this-should-take-precedence", "app.name", "samza-app");
ImmutableMap<String, String> generatedConfigs =
ImmutableMap.of("job.name", "samza-app", "job.id", "this-should-take-precedence", "app.name", "samza-app");
JobNodeConfigurationGenerator.validateJobConfigs(userConfigs, generatedConfigs);
}
private void validateTableConfigure(JobConfig jobConfig, Map<String, Serde> deserializedSerdes,
TableDescriptor tableDescriptor) {
Config tableConfig = jobConfig.subset(String.format("tables.%s.", tableDescriptor.getTableId()));
assertEquals(MockTableProviderFactory.class.getName(), tableConfig.get("provider.factory"));
assertEquals("mock.config.value", jobConfig.get("mock.table.provider.config"));
validateTableSerdeConfigure(tableDescriptor.getTableId(), jobConfig, deserializedSerdes);
}
private Config getExpectedJobConfig(Config originConfig, Map<String, StreamEdge> inputEdges) {
Map<String, String> configMap = new HashMap<>(originConfig);
Set<String> inputs = new HashSet<>();
Set<String> broadcasts = new HashSet<>();
for (StreamEdge inputEdge : inputEdges.values()) {
if (inputEdge.isBroadcast()) {
broadcasts.add(inputEdge.getName() + "#0");
} else {
inputs.add(inputEdge.getName());
}
}
if (!inputs.isEmpty()) {
configMap.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(inputs));
}
if (!broadcasts.isEmpty()) {
configMap.put(TaskConfig.BROADCAST_INPUT_STREAMS, Joiner.on(',').join(broadcasts));
}
return new MapConfig(configMap);
}
private Map<String, Serde> validateAndGetDeserializedSerdes(Config jobConfig, int numSerdes) {
Config serializers = jobConfig.subset("serializers.registry.", true);
// make sure that the serializers deserialize correctly
SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
assertEquals(numSerdes, serializers.size());
return serializers.entrySet().stream().collect(Collectors.toMap(
e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX, ""),
e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
));
}
private void validateJobConfig(Config expectedConfig, JobConfig jobConfig) {
assertEquals(expectedConfig.get(JobConfig.JOB_NAME), jobConfig.getName().get());
assertEquals(expectedConfig.get(JobConfig.JOB_ID), jobConfig.getJobId());
assertEquals("testJobGraphJson", jobConfig.get(JobNodeConfigurationGenerator.CONFIG_INTERNAL_EXECUTION_PLAN));
assertEquals(expectedConfig.get(TaskConfig.INPUT_STREAMS), jobConfig.get(TaskConfig.INPUT_STREAMS));
assertEquals(expectedConfig.get(TaskConfig.BROADCAST_INPUT_STREAMS), jobConfig.get(TaskConfig.BROADCAST_INPUT_STREAMS));
}
private void validateStreamSerdeConfigure(String streamId, Config config, Map<String, Serde> deserializedSerdes) {
Config streamConfig = config.subset(String.format("streams.%s.samza.", streamId));
String keySerdeName = streamConfig.get("key.serde");
String valueSerdeName = streamConfig.get("msg.serde");
assertTrue(String.format("Serialized serdes should contain %s key serde", streamId), deserializedSerdes.containsKey(keySerdeName));
assertTrue(String.format("Serialized %s key serde should be a StringSerde", streamId), keySerdeName.startsWith(StringSerde.class.getSimpleName()));
assertTrue(String.format("Serialized serdes should contain %s msg serde", streamId), deserializedSerdes.containsKey(valueSerdeName));
assertTrue(String.format("Serialized %s msg serde should be a JsonSerdeV2", streamId), valueSerdeName.startsWith(JsonSerdeV2.class.getSimpleName()));
}
private void validateTableSerdeConfigure(String tableId, Config config, Map<String, Serde> deserializedSerdes) {
Config streamConfig = config.subset(String.format("stores.%s.", tableId));
String keySerdeName = streamConfig.get("key.serde");
String valueSerdeName = streamConfig.get("msg.serde");
assertTrue(String.format("Serialized serdes should contain %s key serde", tableId), deserializedSerdes.containsKey(keySerdeName));
assertTrue(String.format("Serialized %s key serde should be a StringSerde", tableId), keySerdeName.startsWith(StringSerde.class.getSimpleName()));
assertTrue(String.format("Serialized serdes should contain %s value serde", tableId), deserializedSerdes.containsKey(valueSerdeName));
assertTrue(String.format("Serialized %s msg serde should be a JsonSerdeV2", tableId), valueSerdeName.startsWith(JsonSerdeV2.class.getSimpleName()));
}
private void validateIntermediateStreamConfigure(String streamId, String physicalName, Config config) {
Config intStreamConfig = config.subset(String.format("streams.%s.", streamId), true);
assertEquals("intermediate-system", intStreamConfig.get("samza.system"));
assertEquals(String.valueOf(Integer.MAX_VALUE), intStreamConfig.get("samza.priority"));
assertEquals("true", intStreamConfig.get("samza.delete.committed.messages"));
assertEquals(physicalName, intStreamConfig.get("samza.physical.name"));
assertEquals("true", intStreamConfig.get("samza.intermediate"));
}
private void validateStreamConfigures(Config config, Map<String, Serde> deserializedSerdes) {
if (deserializedSerdes != null) {
validateStreamSerdeConfigure(input1Descriptor.getStreamId(), config, deserializedSerdes);
validateStreamSerdeConfigure(input2Descriptor.getStreamId(), config, deserializedSerdes);
validateStreamSerdeConfigure(outputDescriptor.getStreamId(), config, deserializedSerdes);
validateStreamSerdeConfigure(intermediateInputDescriptor.getStreamId(), config, deserializedSerdes);
}
// generated stream config for intermediate stream
String physicalName = intermediateInputDescriptor.getPhysicalName().isPresent() ?
intermediateInputDescriptor.getPhysicalName().get() : null;
validateIntermediateStreamConfigure(intermediateInputDescriptor.getStreamId(), physicalName, config);
}
private void validateJoinStoreConfigures(MapConfig mapConfig, Map<String, Serde> deserializedSerdes) {
String leftJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-L.key.serde");
String leftJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-L.msg.serde");
assertTrue("Serialized serdes should contain left join store key serde",
deserializedSerdes.containsKey(leftJoinStoreKeySerde));
assertTrue("Serialized left join store key serde should be a StringSerde",
leftJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
assertTrue("Serialized serdes should contain left join store msg serde",
deserializedSerdes.containsKey(leftJoinStoreMsgSerde));
assertTrue("Serialized left join store msg serde should be a TimestampedValueSerde",
leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
String rightJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-R.key.serde");
String rightJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-R.msg.serde");
assertTrue("Serialized serdes should contain right join store key serde",
deserializedSerdes.containsKey(rightJoinStoreKeySerde));
assertTrue("Serialized right join store key serde should be a StringSerde",
rightJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
assertTrue("Serialized serdes should contain right join store msg serde",
deserializedSerdes.containsKey(rightJoinStoreMsgSerde));
assertTrue("Serialized right join store msg serde should be a TimestampedValueSerde",
rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
Config leftJoinStoreConfig = mapConfig.subset("stores.jobName-jobId-join-j1-L.", true);
validateJoinStoreConfigure(leftJoinStoreConfig, "jobName-jobId-join-j1-L");
Config rightJoinStoreConfig = mapConfig.subset("stores.jobName-jobId-join-j1-R.", true);
validateJoinStoreConfigure(rightJoinStoreConfig, "jobName-jobId-join-j1-R");
}
private void validateJoinStoreConfigure(Config joinStoreConfig, String changelogName) {
assertEquals("org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory", joinStoreConfig.get("factory"));
assertEquals(changelogName, joinStoreConfig.get("changelog"));
assertEquals("delete", joinStoreConfig.get("changelog.kafka.cleanup.policy"));
assertEquals("3600000", joinStoreConfig.get("changelog.kafka.retention.ms"));
assertEquals("3600000", joinStoreConfig.get("rocksdb.ttl.ms"));
}
public static class MockConfigRewriter implements ConfigRewriter {
@Override
public Config rewrite(String name, Config config) {
Map<String, String> configMap = new HashMap<>(config);
configMap.putAll(config.subset(String.format("job.config.rewriter.%s.", name)));
return new MapConfig(configMap);
}
}
}