blob: d238855557fa57db1c96620e30044b15c42de2db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.execution;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericOutputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.task.IdentityStreamTask;
import org.junit.Before;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
/**
* Unit test base class to set up commonly used test application and configuration.
*/
class ExecutionPlannerTestBase {
protected StreamApplicationDescriptorImpl mockStreamAppDesc;
protected Config mockConfig;
protected JobNode mockJobNode;
protected KVSerde<String, Object> defaultSerde;
protected GenericSystemDescriptor inputSystemDescriptor;
protected GenericSystemDescriptor outputSystemDescriptor;
protected GenericSystemDescriptor intermediateSystemDescriptor;
protected GenericInputDescriptor<KV<String, Object>> input1Descriptor;
protected GenericInputDescriptor<KV<String, Object>> input2Descriptor;
protected GenericInputDescriptor<KV<String, Object>> intermediateInputDescriptor;
protected GenericInputDescriptor<KV<String, Object>> broadcastInputDesriptor;
protected GenericOutputDescriptor<KV<String, Object>> outputDescriptor;
protected GenericOutputDescriptor<KV<String, Object>> intermediateOutputDescriptor;
@Before
public void setUp() {
defaultSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>());
inputSystemDescriptor = new GenericSystemDescriptor("input-system", "mockSystemFactoryClassName");
outputSystemDescriptor = new GenericSystemDescriptor("output-system", "mockSystemFactoryClassName");
intermediateSystemDescriptor = new GenericSystemDescriptor("intermediate-system", "mockSystemFactoryClassName");
input1Descriptor = inputSystemDescriptor.getInputDescriptor("input1", defaultSerde);
input2Descriptor = inputSystemDescriptor.getInputDescriptor("input2", defaultSerde);
outputDescriptor = outputSystemDescriptor.getOutputDescriptor("output", defaultSerde);
intermediateInputDescriptor = intermediateSystemDescriptor.getInputDescriptor("jobName-jobId-partition_by-p1", defaultSerde)
.withPhysicalName("jobName-jobId-partition_by-p1");
intermediateOutputDescriptor = intermediateSystemDescriptor.getOutputDescriptor("jobName-jobId-partition_by-p1", defaultSerde)
.withPhysicalName("jobName-jobId-partition_by-p1");
broadcastInputDesriptor = intermediateSystemDescriptor.getInputDescriptor("jobName-jobId-broadcast-b1", defaultSerde)
.withPhysicalName("jobName-jobId-broadcast-b1");
Map<String, String> configs = new HashMap<>();
configs.put(JobConfig.JOB_NAME, "jobName");
configs.put(JobConfig.JOB_ID, "jobId");
configs.putAll(input1Descriptor.toConfig());
configs.putAll(input2Descriptor.toConfig());
configs.putAll(outputDescriptor.toConfig());
configs.putAll(inputSystemDescriptor.toConfig());
configs.putAll(outputSystemDescriptor.toConfig());
configs.putAll(intermediateSystemDescriptor.toConfig());
configs.put(JobConfig.JOB_DEFAULT_SYSTEM, intermediateSystemDescriptor.getSystemName());
mockConfig = spy(new MapConfig(configs));
mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig);
}
String getJobNameAndId() {
return "jobName-jobId";
}
void configureJobNode(ApplicationDescriptorImpl mockStreamAppDesc) {
JobGraph jobGraph = new ExecutionPlanner(mockConfig, mock(StreamManager.class))
.createJobGraph(mockStreamAppDesc);
mockJobNode = spy(jobGraph.getJobNodes().get(0));
}
StreamApplication getRepartitionOnlyStreamApplication() {
return appDesc -> {
MessageStream<KV<String, Object>> input1 = appDesc.getInputStream(input1Descriptor);
input1.partitionBy(KV::getKey, KV::getValue, mock(KVSerde.class), "p1");
};
}
StreamApplication getRepartitionJoinStreamApplication() {
return appDesc -> {
MessageStream<KV<String, Object>> input1 = appDesc.getInputStream(input1Descriptor);
MessageStream<KV<String, Object>> input2 = appDesc.getInputStream(input2Descriptor);
OutputStream<KV<String, Object>> output = appDesc.getOutputStream(outputDescriptor);
JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class);
input1
.partitionBy(KV::getKey, KV::getValue, defaultSerde, "p1")
.map(kv -> kv.value)
.join(input2.map(kv -> kv.value), mockJoinFn,
new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class),
Duration.ofHours(1), "j1")
.sendTo(output);
};
}
TaskApplication getTaskApplication() {
return appDesc -> {
appDesc.withInputStream(input1Descriptor)
.withInputStream(input2Descriptor)
.withInputStream(intermediateInputDescriptor)
.withOutputStream(intermediateOutputDescriptor)
.withOutputStream(outputDescriptor)
.withTaskFactory(() -> new IdentityStreamTask());
};
}
TaskApplication getLegacyTaskApplication() {
return new LegacyTaskApplication(IdentityStreamTask.class.getName());
}
StreamApplication getBroadcastOnlyStreamApplication(Serde serde) {
return appDesc -> {
MessageStream<KV<String, Object>> input = appDesc.getInputStream(input1Descriptor);
input.broadcast(serde, "b1");
};
}
}