blob: d87530d56ed629601fb7ccfc365905203158c0d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.execution;
import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StatefulOperatorSpec;
import org.apache.samza.operators.spec.StoreDescriptor;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.table.TableConfigGenerator;
import org.apache.samza.table.descriptors.LocalTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.util.ConfigUtil;
import org.apache.samza.util.MathUtil;
import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides methods to generate configuration for a {@link JobNode}
*/
/* package private */ class JobNodeConfigurationGenerator {
private static final Logger LOG = LoggerFactory.getLogger(JobNodeConfigurationGenerator.class);
static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan";
static Config mergeConfig(Map<String, String> originalConfig, Map<String, String> generatedConfig) {
validateJobConfigs(originalConfig, generatedConfig);
Map<String, String> mergedConfig = new HashMap<>(generatedConfig);
originalConfig.forEach((k, v) -> {
if (generatedConfig.containsKey(k) && !Objects.equals(generatedConfig.get(k), v)) {
LOG.info("Replacing generated config for key: {} value: {} with original config value: {}", k, generatedConfig.get(k), v);
}
mergedConfig.put(k, v);
});
return ConfigUtil.rewriteConfig(new MapConfig(mergedConfig));
}
static void validateJobConfigs(Map<String, String> originalConfig, Map<String, String> generatedConfig) {
String userConfiguredJobId = originalConfig.get(JobConfig.JOB_ID);
String userConfiguredJobName = originalConfig.get(JobConfig.JOB_NAME);
String generatedJobId = generatedConfig.get(JobConfig.JOB_ID);
String generatedJobName = generatedConfig.get(JobConfig.JOB_NAME);
if (generatedJobName != null && userConfiguredJobName != null && !StringUtils.equals(generatedJobName,
userConfiguredJobName)) {
throw new SamzaException(String.format(
"Generated job.name = %s from app.name = %s does not match user configured job.name = %s, please configure job.name same as app.name",
generatedJobName, originalConfig.get(ApplicationConfig.APP_NAME), userConfiguredJobName));
}
if (generatedJobId != null && userConfiguredJobId != null && !StringUtils.equals(generatedJobId,
userConfiguredJobId)) {
throw new SamzaException(String.format(
"Generated job.id = %s from app.id = %s does not match user configured job.id = %s, please configure job.id same as app.id",
generatedJobId, originalConfig.get(ApplicationConfig.APP_ID), userConfiguredJobId));
}
}
JobConfig generateJobConfig(JobNode jobNode, String executionPlanJson) {
if (jobNode.isLegacyTaskApplication()) {
return new JobConfig(jobNode.getConfig());
}
Map<String, String> generatedConfig = new HashMap<>();
// set up job name and job ID
generatedConfig.put(JobConfig.JOB_NAME, jobNode.getJobName());
generatedConfig.put(JobConfig.JOB_ID, jobNode.getJobId());
Map<String, StreamEdge> inEdges = jobNode.getInEdges();
Map<String, StreamEdge> outEdges = jobNode.getOutEdges();
Collection<OperatorSpec> reachableOperators = jobNode.getReachableOperators();
List<StoreDescriptor> stores = getStoreDescriptors(reachableOperators);
Map<String, TableDescriptor> reachableTables = getReachableTables(reachableOperators, jobNode);
// config passed by the JobPlanner. user-provided + system-stream descriptor config + misc. other config
Config originalConfig = jobNode.getConfig();
// check all inputs to the node for broadcast and input streams
final Set<String> inputs = new HashSet<>();
final Set<String> broadcastInputs = new HashSet<>();
for (StreamEdge inEdge : inEdges.values()) {
String formattedSystemStream = inEdge.getName();
if (inEdge.isBroadcast()) {
if (inEdge.getPartitionCount() > 1) {
broadcastInputs.add(formattedSystemStream + "#[0-" + (inEdge.getPartitionCount() - 1) + "]");
} else {
broadcastInputs.add(formattedSystemStream + "#0");
}
} else {
inputs.add(formattedSystemStream);
}
}
configureBroadcastInputs(generatedConfig, originalConfig, broadcastInputs);
// compute window and join operator intervals in this node
configureWindowInterval(generatedConfig, originalConfig, reachableOperators);
// set store configuration for stateful operators.
stores.forEach(sd -> generatedConfig.putAll(sd.getStorageConfigs()));
// set the execution plan in json
generatedConfig.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
// write intermediate input/output streams to configs
inEdges.values().stream().filter(StreamEdge::isIntermediate)
.forEach(intermediateEdge -> generatedConfig.putAll(intermediateEdge.generateConfig()));
// write serialized serde instances and stream, store, and table serdes to configs
// serde configuration generation has to happen before table configuration, since the serde configuration
// is required when generating configurations for some TableProvider (i.e. local store backed tables)
configureSerdes(generatedConfig, inEdges, outEdges, stores, reachableTables.keySet(), jobNode);
// generate table configuration and potential side input configuration
configureTables(generatedConfig, originalConfig, reachableTables, inputs);
// generate the task.inputs configuration
generatedConfig.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(inputs));
LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(), generatedConfig);
return new JobConfig(mergeConfig(originalConfig, generatedConfig));
}
private Map<String, TableDescriptor> getReachableTables(Collection<OperatorSpec> reachableOperators, JobNode jobNode) {
// TODO: Fix this in SAMZA-1893. For now, returning all tables for single-job execution plan
return jobNode.getTables();
}
private void configureBroadcastInputs(Map<String, String> configs, Config config, Set<String> broadcastStreams) {
// TODO: SAMZA-1841: remove this once we support defining broadcast input stream in high-level
// task.broadcast.input should be generated by the planner in the future.
if (broadcastStreams.isEmpty()) {
return;
}
String broadcastInputs = config.get(TaskConfig.BROADCAST_INPUT_STREAMS);
if (StringUtils.isNotBlank(broadcastInputs)) {
broadcastStreams.add(broadcastInputs);
}
configs.put(TaskConfig.BROADCAST_INPUT_STREAMS, Joiner.on(',').join(broadcastStreams));
}
private void configureWindowInterval(Map<String, String> configs, Config config,
Collection<OperatorSpec> reachableOperators) {
if (!reachableOperators.stream().anyMatch(op -> op.getOpCode() == OperatorSpec.OpCode.WINDOW
|| op.getOpCode() == OperatorSpec.OpCode.JOIN)) {
return;
}
// set triggering interval if a window or join is defined. Only applies to high-level applications
long triggerInterval = computeTriggerInterval(reachableOperators);
LOG.info("Using triggering interval: {}", triggerInterval);
configs.put(TaskConfig.WINDOW_MS, String.valueOf(triggerInterval));
}
/**
* Computes the triggering interval to use during the execution of this {@link JobNode}
*/
private long computeTriggerInterval(Collection<OperatorSpec> reachableOperators) {
List<Long> windowTimerIntervals = reachableOperators.stream()
.filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW)
.map(spec -> ((WindowOperatorSpec) spec).getDefaultTriggerMs())
.collect(Collectors.toList());
// Filter out the join operators, and obtain a list of their ttl values
List<Long> joinTtlIntervals = reachableOperators.stream()
.filter(spec -> spec instanceof JoinOperatorSpec)
.map(spec -> ((JoinOperatorSpec) spec).getTtlMs())
.collect(Collectors.toList());
// Combine both the above lists
List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals);
candidateTimerIntervals.addAll(windowTimerIntervals);
if (candidateTimerIntervals.isEmpty()) {
return -1;
}
// Compute the gcd of the resultant list
return MathUtil.gcd(candidateTimerIntervals);
}
private List<StoreDescriptor> getStoreDescriptors(Collection<OperatorSpec> reachableOperators) {
return reachableOperators.stream().filter(operatorSpec -> operatorSpec instanceof StatefulOperatorSpec)
.map(operatorSpec -> ((StatefulOperatorSpec) operatorSpec).getStoreDescriptors()).flatMap(Collection::stream)
.collect(Collectors.toList());
}
private void configureTables(Map<String, String> generatedConfig, Config originalConfig,
Map<String, TableDescriptor> tables, Set<String> inputs) {
generatedConfig.putAll(
TableConfigGenerator.generate(
new MapConfig(generatedConfig), new ArrayList<>(tables.values())));
// Add side inputs to the inputs and mark the stream as bootstrap
tables.values().forEach(tableDescriptor -> {
if (tableDescriptor instanceof LocalTableDescriptor) {
LocalTableDescriptor localTableDescriptor = (LocalTableDescriptor) tableDescriptor;
List<String> sideInputs = localTableDescriptor.getSideInputs();
if (sideInputs != null && !sideInputs.isEmpty()) {
sideInputs.stream()
.map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(originalConfig, sideInput))
.forEach(systemStream -> {
inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
generatedConfig.put(String.format(StreamConfig.STREAM_PREFIX + StreamConfig.BOOTSTRAP,
systemStream.getSystem(), systemStream.getStream()), "true");
});
}
}
});
}
/**
* Serializes the {@link Serde} instances for operators, adds them to the provided config, and
* sets the serde configuration for the input/output/intermediate streams appropriately.
*
* We try to preserve the number of Serde instances before and after serialization. However we don't
* guarantee that references shared between these serdes instances (e.g. an Jackson ObjectMapper shared
* between two json serdes) are shared after deserialization too.
*
* Ideally all the user defined objects in the application should be serialized and de-serialized in one pass
* from the same output/input stream so that we can maintain reference sharing relationships.
*
* @param configs the configs to add serialized serde instances and stream serde configs to
*/
private void configureSerdes(Map<String, String> configs, Map<String, StreamEdge> inEdges, Map<String, StreamEdge> outEdges,
List<StoreDescriptor> stores, Collection<String> tables, JobNode jobNode) {
// collect all key and msg serde instances for streams
Map<String, Serde> streamKeySerdes = new HashMap<>();
Map<String, Serde> streamMsgSerdes = new HashMap<>();
inEdges.keySet().forEach(streamId ->
addSerdes(jobNode.getInputSerdes(streamId), streamId, streamKeySerdes, streamMsgSerdes));
outEdges.keySet().forEach(streamId ->
addSerdes(jobNode.getOutputSerde(streamId), streamId, streamKeySerdes, streamMsgSerdes));
Map<String, Serde> storeKeySerdes = new HashMap<>();
Map<String, Serde> storeMsgSerdes = new HashMap<>();
stores.forEach(storeDescriptor -> {
storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde());
storeMsgSerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde());
});
Map<String, Serde> tableKeySerdes = new HashMap<>();
Map<String, Serde> tableMsgSerdes = new HashMap<>();
tables.forEach(tableId -> {
addSerdes(jobNode.getTableSerdes(tableId), tableId, tableKeySerdes, tableMsgSerdes);
});
// for each unique stream or store serde instance, generate a unique name and serialize to config
HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
serdes.addAll(streamMsgSerdes.values());
serdes.addAll(storeKeySerdes.values());
serdes.addAll(storeMsgSerdes.values());
serdes.addAll(tableKeySerdes.values());
serdes.addAll(tableMsgSerdes.values());
SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
Base64.Encoder base64Encoder = Base64.getEncoder();
Map<Serde, String> serdeUUIDs = new HashMap<>();
serdes.forEach(serde -> {
String serdeName = serdeUUIDs.computeIfAbsent(serde,
s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString());
configs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE, serdeName),
base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
});
// set key and msg serdes for streams to the serde names generated above
streamKeySerdes.forEach((streamId, serde) -> {
String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX, streamId);
String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE;
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});
streamMsgSerdes.forEach((streamId, serde) -> {
String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX, streamId);
String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE;
configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
});
// set key and msg serdes for stores to the serde names generated above
storeKeySerdes.forEach((storeName, serde) -> {
String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE, storeName);
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});
storeMsgSerdes.forEach((storeName, serde) -> {
String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE, storeName);
configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
});
// set key and msg serdes for stores to the serde names generated above
tableKeySerdes.forEach((tableId, serde) -> {
String keySerdeConfigKey = String.format(JavaTableConfig.STORE_KEY_SERDE, tableId);
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});
tableMsgSerdes.forEach((tableId, serde) -> {
String valueSerdeConfigKey = String.format(JavaTableConfig.STORE_MSG_SERDE, tableId);
configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
});
}
private void addSerdes(KV<Serde, Serde> serdes, String streamId, Map<String, Serde> keySerdeMap,
Map<String, Serde> msgSerdeMap) {
if (serdes != null) {
if (serdes.getKey() != null && !(serdes.getKey() instanceof NoOpSerde)) {
keySerdeMap.put(streamId, serdes.getKey());
}
if (serdes.getValue() != null && !(serdes.getValue() instanceof NoOpSerde)) {
msgSerdeMap.put(streamId, serdes.getValue());
}
}
}
}