Merge master
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
index b415cf8..23e8625 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
@@ -27,6 +27,8 @@
*/
@InterfaceStability.Unstable
public interface StreamGraphBuilder {
+ static final String BUILDER_CLASS_CONFIG = "job.graph.builder.class";
+
/**
* Users are required to implement this abstract method to initialize the processing logic of the application, in terms
* of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 32ab47a..f81266b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -27,12 +27,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
-import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import java.util.concurrent.ExecutorService;
import static org.apache.samza.util.Util.asScalaClock;
+import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
/**
* Factory class to create runloop for a Samza task, based on the type
@@ -109,18 +109,4 @@
}
}
- /**
- * Returns a default value object for scala option.getOrDefault() to use
- * @param value default value
- * @param <T> value type
- * @return object containing default value
- */
- public static <T> AbstractFunction0<T> defaultValue(final T value) {
- return new AbstractFunction0<T>() {
- @Override
- public T apply() {
- return value;
- }
- };
- }
}
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 830e4a5..77cb3fc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -163,10 +163,12 @@
@Override
public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) {
- MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor);
+ int opId = graph.getNextOpId();
+ String streamId = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name(), opId);
+ MessageStreamImpl<M> intStream = this.graph.createIntStream(streamId, parKeyExtractor);
OutputStream<M> outputStream = this.graph.getOutputStream(intStream);
this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(),
- this.graph, outputStream));
+ this.graph, outputStream, opId));
return intStream;
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 8ca8157..f3fd176 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,20 +18,20 @@
*/
package org.apache.samza.operators;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.function.Function;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.ExecutionEnvironment;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to
* create system input/output/intermediate streams.
@@ -129,9 +129,14 @@
*/
private final Map<String, MessageStream> inStreams = new HashMap<>();
private final Map<String, OutputStream> outStreams = new HashMap<>();
+ private final ExecutionEnvironment executionEnvironment;
private ContextManager contextManager = new ContextManager() { };
+ public StreamGraphImpl(ExecutionEnvironment executionEnvironment) {
+ this.executionEnvironment = executionEnvironment;
+ }
+
@Override
public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
if (!this.inStreams.containsKey(streamSpec.getId())) {
@@ -182,7 +187,12 @@
@Override public Map<StreamSpec, OutputStream> getOutStreams() {
Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>();
- this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry));
+ this.outStreams.forEach((ss, entry) -> {
+ StreamSpec streamSpec = (entry instanceof IntermediateStreamImpl) ?
+ ((IntermediateStreamImpl) entry).getSpec() :
+ ((OutputStreamImpl) entry).getSpec();
+ outStreamMap.put(streamSpec, entry);
+ });
return Collections.unmodifiableMap(outStreamMap);
}
@@ -208,8 +218,8 @@
*/
public MessageStreamImpl getInputStream(SystemStream sstream) {
for (MessageStream entry: this.inStreams.values()) {
- if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() &&
- ((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) {
+ if (((InputStreamImpl) entry).getSpec().getSystemName().equals(sstream.getSystem()) &&
+ ((InputStreamImpl) entry).getSpec().getPhysicalName().equals(sstream.getStream())) {
return (MessageStreamImpl) entry;
}
}
@@ -231,9 +241,9 @@
* @param <M> the type of input message
* @return the {@link OutputStream} object for the re-partitioned stream
*/
- <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
+ <PK, M> MessageStreamImpl<M> createIntStream(String streamId, Function<M, PK> parKeyFn) {
// TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
- StreamSpec streamSpec = this.createIntStreamSpec();
+ StreamSpec streamSpec = executionEnvironment.streamFromConfig(streamId);
if (!this.inStreams.containsKey(streamSpec.getId())) {
this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
@@ -244,10 +254,4 @@
}
return intStream;
}
-
- private StreamSpec createIntStreamSpec() {
- // TODO: placeholder to generate the intermediate stream's {@link StreamSpec} automatically
- return null;
- }
-
}
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index d626852..7b14b9c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -152,8 +152,8 @@
* @param <M> type of input message
* @return the {@link SinkOperatorSpec}
*/
- public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
- return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream);
+ public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream, int opId) {
+ return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, opId, stream);
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
new file mode 100644
index 0000000..757034e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ExecutionPlanner creates the physical execution graph for the StreamGraph, and
+ * the intermediate topics needed for the execution.
+ */
+public class ExecutionPlanner {
+ private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
+
+ private final Config config;
+
+ public ExecutionPlanner(Config config) {
+ this.config = config;
+ }
+
+ public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
+ Map<String, SystemAdmin> sysAdmins = getSystemAdmins(config);
+
+ // create physical processors based on stream graph
+ ProcessorGraph processorGraph = createProcessorGraph(streamGraph);
+
+ if (!processorGraph.getInternalStreams().isEmpty()) {
+ // figure out the partitions for internal streams
+ calculatePartitions(streamGraph, processorGraph, sysAdmins);
+
+ // create the streams
+ createStreams(processorGraph, sysAdmins);
+ }
+
+ return processorGraph;
+ }
+
+ /**
+ * Create the physical graph from StreamGraph
+ * Package private for testing
+ */
+ ProcessorGraph createProcessorGraph(StreamGraph streamGraph) {
+ // For this phase, we are going to create a processor for the whole dag
+ String processorId = config.get(JobConfig.JOB_NAME()); // only one processor, use the job name
+
+ ProcessorGraph processorGraph = new ProcessorGraph(config);
+ Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
+ Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
+ Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
+ intStreams.retainAll(sinkStreams);
+ sourceStreams.removeAll(intStreams);
+ sinkStreams.removeAll(intStreams);
+
+ // add sources
+ sourceStreams.forEach(spec -> processorGraph.addSource(spec, processorId));
+
+ // add sinks
+ sinkStreams.forEach(spec -> processorGraph.addSink(spec, processorId));
+
+ // add intermediate streams
+ intStreams.forEach(spec -> processorGraph.addEdge(spec, processorId, processorId));
+
+ processorGraph.validate();
+
+ return processorGraph;
+ }
+
+ /**
+ * Figure out the number of partitions of intermediate streams
+ * Package private for testing
+ */
+ void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+ // fetch the external streams partition info
+ fetchExistingStreamPartitions(processorGraph, sysAdmins);
+
+ // calculate the partitions for the input streams of join operators
+ calculateJoinInputPartitions(streamGraph, processorGraph);
+
+ // calculate the partitions for the rest of intermediate streams
+ calculateIntStreamPartitions(processorGraph, config);
+ }
+
+ static void fetchExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+ Set<StreamEdge> existingStreams = new HashSet<>();
+ existingStreams.addAll(processorGraph.getSources());
+ existingStreams.addAll(processorGraph.getSinks());
+
+ Multimap<String, StreamEdge> existingStreamsMap = HashMultimap.create();
+ existingStreams.forEach(streamEdge -> {
+ SystemStream systemStream = streamEdge.getSystemStream();
+ existingStreamsMap.put(systemStream.getSystem(), streamEdge);
+ });
+ for (Map.Entry<String, Collection<StreamEdge>> entry : existingStreamsMap.asMap().entrySet()) {
+ String systemName = entry.getKey();
+ Collection<StreamEdge> streamEdges = entry.getValue();
+ Map<String, StreamEdge> streamToEdge = new HashMap<>();
+ streamEdges.forEach(streamEdge -> streamToEdge.put(streamEdge.getSystemStream().getStream(), streamEdge));
+ SystemAdmin systemAdmin = sysAdmins.get(systemName);
+ Map<String, SystemStreamMetadata> metadata = systemAdmin.getSystemStreamMetadata(streamToEdge.keySet());
+ metadata.forEach((stream, data) -> {
+ int partitions = data.getSystemStreamPartitionMetadata().size();
+ streamToEdge.get(stream).setPartitions(partitions);
+ log.debug("Partition count is {} for stream {}", partitions, stream);
+ });
+ }
+ }
+
+ /**
+ * Calculate the partitions for the input streams of join operators
+ * Package private for testing
+ */
+ static void calculateJoinInputPartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
+ // get join operators with input streams
+ Multimap<OperatorSpec, StreamEdge> joinToStreamMap = HashMultimap.create();
+ Multimap<StreamEdge, OperatorSpec> streamToJoinMap = HashMultimap.create();
+ Map<MessageStream, OperatorSpec> outputToJoinMap = new HashMap<>();
+ Queue<OperatorSpec> joinQ = new LinkedList<>(); // a queue of joins with known input partitions
+ Set<OperatorSpec> visited = new HashSet<>();
+ streamGraph.getInStreams().entrySet().forEach(entry -> {
+ StreamEdge streamEdge = processorGraph.getEdge(entry.getKey());
+ getJoins(entry.getValue(), streamEdge, joinToStreamMap, streamToJoinMap, outputToJoinMap, joinQ, visited);
+ });
+ // calculate join input partition count
+ while (!joinQ.isEmpty()) {
+ OperatorSpec join = joinQ.poll();
+ int partitions = -1;
+ // loop through the input streams to the join and find the partition count
+ for (StreamEdge edge : joinToStreamMap.get(join)) {
+ int edgePartitions = edge.getPartitions();
+ if (edgePartitions != -1) {
+ if (partitions == -1) {
+ //if the partition is not assigned
+ partitions = edgePartitions;
+ } else if (partitions != edgePartitions) {
+ throw new SamzaException(String.format("Unable to resolve input partitions of stream %s for join",
+ edge.getSystemStream().toString()));
+ }
+ }
+ }
+ // assign the partition count
+ for (StreamEdge edge : joinToStreamMap.get(join)) {
+ if (edge.getPartitions() <= 0) {
+ edge.setPartitions(partitions);
+
+ // find other joins can be inferred by setting this edge
+ for (OperatorSpec op : streamToJoinMap.get(edge)) {
+ if (!visited.contains(op)) {
+ joinQ.add(op);
+ visited.add(op);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * This function
+ * @param messageStream
+ * @param streamEdge
+ * @param joinToStreamMap
+ * @param streamToJoinMap
+ * @param outputToJoinMap
+ * @param joinQ
+ * @param visited
+ */
+ static void getJoins(MessageStream messageStream,
+ StreamEdge streamEdge,
+ Multimap<OperatorSpec, StreamEdge> joinToStreamMap,
+ Multimap<StreamEdge, OperatorSpec> streamToJoinMap,
+ Map<MessageStream, OperatorSpec> outputToJoinMap,
+ Queue<OperatorSpec> joinQ,
+ Set<OperatorSpec> visited) {
+ Collection<OperatorSpec> specs = ((MessageStreamImpl) messageStream).getRegisteredOperatorSpecs();
+ for (OperatorSpec spec : specs) {
+ if (spec instanceof PartialJoinOperatorSpec) {
+ // every join will have two partial join operators
+ // we will choose one of them in order to consolidate the inputs
+ // the first one who registered with the outputToJoinMap will win
+ MessageStream output = spec.getNextStream();
+ OperatorSpec joinSpec = outputToJoinMap.get(output);
+ if (joinSpec == null) {
+ joinSpec = spec;
+ outputToJoinMap.put(output, joinSpec);
+ }
+
+ joinToStreamMap.put(joinSpec, streamEdge);
+ streamToJoinMap.put(streamEdge, joinSpec);
+
+ if (!visited.contains(joinSpec) && streamEdge.getPartitions() > 0) {
+ // put the joins with known input partitions into the queue
+ joinQ.add(joinSpec);
+ visited.add(joinSpec);
+ }
+ }
+
+ if (spec.getNextStream() != null) {
+ getJoins(spec.getNextStream(), streamEdge, joinToStreamMap, streamToJoinMap, outputToJoinMap, joinQ, visited);
+ }
+ }
+ }
+
+ static void calculateIntStreamPartitions(ProcessorGraph processorGraph, Config config) {
+ int partitions = config.getInt(JobConfig.JOB_DEFAULT_PARTITIONS(), -1);
+ if (partitions < 0) {
+ // use the following simple algo to figure out the partitions
+ // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
+ int maxInPartitions = maxPartition(processorGraph.getSources());
+ int maxOutPartitions = maxPartition(processorGraph.getSinks());
+ partitions = Math.max(maxInPartitions, maxOutPartitions);
+ }
+ for (StreamEdge edge : processorGraph.getInternalStreams()) {
+ if (edge.getPartitions() <= 0) {
+ edge.setPartitions(partitions);
+ }
+ }
+ }
+
+ private static void createStreams(ProcessorGraph graph, Map<String, SystemAdmin> sysAdmins) {
+ Multimap<String, StreamSpec> streamsToCreate = HashMultimap.create();
+ graph.getInternalStreams().forEach(edge -> {
+ StreamSpec streamSpec = createStreamSpec(edge);
+ streamsToCreate.put(edge.getSystemStream().getSystem(), streamSpec);
+ });
+
+ for (Map.Entry<String, Collection<StreamSpec>> entry : streamsToCreate.asMap().entrySet()) {
+ String systemName = entry.getKey();
+ SystemAdmin systemAdmin = sysAdmins.get(systemName);
+
+ for (StreamSpec stream : entry.getValue()) {
+ log.info("Creating stream {} with partitions {} on system {}",
+ new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName});
+ systemAdmin.createStream(stream);
+ }
+ }
+ }
+
+ private static int maxPartition(Collection<StreamEdge> edges) {
+ return edges.stream().map(StreamEdge::getPartitions).reduce(Integer::max).get();
+ }
+
+ private static StreamSpec createStreamSpec(StreamEdge edge) {
+ StreamSpec orgSpec = edge.getStreamSpec();
+ return orgSpec.copyWithPartitionCount(edge.getPartitions());
+ }
+
+ private static Map<String, SystemAdmin> getSystemAdmins(Config config) {
+ return getSystemFactories(config).entrySet()
+ .stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().getAdmin(entry.getKey(), config)));
+ }
+
+ private static Map<String, SystemFactory> getSystemFactories(Config config) {
+ Map<String, SystemFactory> systemFactories =
+ getSystemNames(config).stream().collect(Collectors.toMap(systemName -> systemName, systemName -> {
+ String systemFactoryClassName = new JavaSystemConfig(config).getSystemFactory(systemName);
+ if (systemFactoryClassName == null) {
+ throw new SamzaException(
+ String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+ }
+ return Util.getObj(systemFactoryClassName);
+ }));
+
+ return systemFactories;
+ }
+
+ private static Collection<String> getSystemNames(Config config) {
+ return new JavaSystemConfig(config).getSystemNames();
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
new file mode 100644
index 0000000..5ee4d29
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ProcessorGraph is the physical execution graph for a multi-stage Samza application.
+ * It contains the topology of execution processors connected by source/sink/intermediate streams.
+ * High level APIs are transformed into ProcessorGraph for planing, validation and execution.
+ */
+public class ProcessorGraph {
+ private static final Logger log = LoggerFactory.getLogger(ProcessorGraph.class);
+
+ private final Map<String, ProcessorNode> nodes = new HashMap<>();
+ private final Map<String, StreamEdge> edges = new HashMap<>();
+ private final Set<StreamEdge> sources = new HashSet<>();
+ private final Set<StreamEdge> sinks = new HashSet<>();
+ private final Set<StreamEdge> internalStreams = new HashSet<>();
+ private final Config config;
+
+ ProcessorGraph(Config config) {
+ this.config = config;
+ }
+
+ void addSource(StreamSpec input, String targetProcessorId) {
+ ProcessorNode node = getNode(targetProcessorId);
+ StreamEdge edge = getEdge(input);
+ edge.addTargetNode(node);
+ node.addInEdge(edge);
+ sources.add(edge);
+ }
+
+ void addSink(StreamSpec output, String sourceProcessorId) {
+ ProcessorNode node = getNode(sourceProcessorId);
+ StreamEdge edge = getEdge(output);
+ edge.addSourceNode(node);
+ node.addOutEdge(edge);
+ sinks.add(edge);
+ }
+
+ void addEdge(StreamSpec streamSpec, String sourceProcessorId, String targetProcessorId) {
+ ProcessorNode sourceNode = getNode(sourceProcessorId);
+ ProcessorNode targetNode = getNode(targetProcessorId);
+ StreamEdge edge = getEdge(streamSpec);
+ edge.addSourceNode(sourceNode);
+ edge.addTargetNode(targetNode);
+ sourceNode.addOutEdge(edge);
+ targetNode.addInEdge(edge);
+ internalStreams.add(edge);
+ }
+
+ ProcessorNode getNode(String processorId) {
+ ProcessorNode node = nodes.get(processorId);
+ if (node == null) {
+ node = new ProcessorNode(processorId, config);
+ nodes.put(processorId, node);
+ }
+ return node;
+ }
+
+ StreamEdge getEdge(StreamSpec streamSpec) {
+ String streamId = streamSpec.getId();
+ StreamEdge edge = edges.get(streamId);
+ if (edge == null) {
+ edge = new StreamEdge(streamSpec, config);
+ edges.put(streamId, edge);
+ }
+ return edge;
+ }
+
+ /**
+ * Returns the processor with configs to be executed in the topological order
+ * @return list of ProcessorConfig
+ */
+ public List<ProcessorNode> getProcessors() {
+ List<ProcessorNode> sortedNodes = topologicalSort();
+ return Collections.unmodifiableList(sortedNodes);
+ }
+
+ public Set<StreamEdge> getSources() {
+ return Collections.unmodifiableSet(sources);
+ }
+
+ public Set<StreamEdge> getSinks() {
+ return Collections.unmodifiableSet(sinks);
+ }
+
+ public Set<StreamEdge> getInternalStreams() {
+ return Collections.unmodifiableSet(internalStreams);
+ }
+
+
+ /**
+ * Validate the graph
+ */
+ public void validate() {
+ validateSources();
+ validateSinks();
+ validateInternalStreams();
+ validateReachability();
+ }
+
+ /**
+ * Validate the sources should have indegree being 0 and outdegree greater than 0
+ */
+ private void validateSources() {
+ sources.forEach(edge -> {
+ if (!edge.getSourceNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Source stream %s should not have producers.", edge.getFormattedSystemStream()));
+ }
+ if (edge.getTargetNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Source stream %s should have consumers.", edge.getFormattedSystemStream()));
+ }
+ });
+ }
+
+ /**
+ * Validate the sinks should have outdegree being 0 and indegree greater than 0
+ */
+ private void validateSinks() {
+ sinks.forEach(edge -> {
+ if (!edge.getTargetNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Sink stream %s should not have consumers", edge.getFormattedSystemStream()));
+ }
+ if (edge.getSourceNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Sink stream %s should have producers", edge.getFormattedSystemStream()));
+ }
+ });
+ }
+
+ /**
+ * Validate the internal streams should have both indegree and outdegree greater than 0
+ */
+ private void validateInternalStreams() {
+ Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
+ internalEdges.removeAll(sources);
+ internalEdges.removeAll(sinks);
+
+ internalEdges.forEach(edge -> {
+ if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Internal stream %s should have both producers and consumers", edge.getFormattedSystemStream()));
+ }
+ });
+ }
+
+ /**
+ * Validate all nodes are reachable by sources.
+ */
+ private void validateReachability() {
+ // validate all nodes are reachable from the sources
+ final Set<ProcessorNode> reachable = findReachable();
+ if (reachable.size() != nodes.size()) {
+ Set<ProcessorNode> unreachable = new HashSet<>(nodes.values());
+ unreachable.removeAll(reachable);
+ throw new IllegalArgumentException(String.format("Processors %s cannot be reached from Sources.",
+ String.join(", ", unreachable.stream().map(ProcessorNode::getId).collect(Collectors.toList()))));
+ }
+ }
+
+ /**
+ * Find the reachable set of nodes using BFS.
+ * Package private for test.
+ * @return reachable set of nodes
+ */
+ Set<ProcessorNode> findReachable() {
+ Queue<ProcessorNode> queue = new ArrayDeque<>();
+ Set<ProcessorNode> visited = new HashSet<>();
+
+ sources.forEach(source -> {
+ List<ProcessorNode> next = source.getTargetNodes();
+ queue.addAll(next);
+ visited.addAll(next);
+ });
+
+ while (!queue.isEmpty()) {
+ ProcessorNode node = queue.poll();
+ node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
+ if (!visited.contains(target)) {
+ visited.add(target);
+ queue.offer(target);
+ }
+ });
+ }
+
+ return visited;
+ }
+
+ /**
+ * An variation of Kahn's algorithm of topological sorting.
+ * This algorithm also takes account of the simple loops in the graph
+ * Package private for test.
+ * @return topologically sorted ProcessorNode(s)
+ */
+ List<ProcessorNode> topologicalSort() {
+ Collection<ProcessorNode> pnodes = nodes.values();
+ Queue<ProcessorNode> q = new ArrayDeque<>();
+ Map<String, Long> indegree = new HashMap<>();
+ Set<ProcessorNode> visited = new HashSet<>();
+ pnodes.forEach(node -> {
+ String nid = node.getId();
+ //only count the degrees of intermediate streams since sources have degree 0
+ long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
+ indegree.put(nid, degree);
+
+ if (degree == 0L) {
+ // start from the nodes that only consume from sources
+ q.add(node);
+ visited.add(node);
+ }
+ });
+
+ List<ProcessorNode> sortedNodes = new ArrayList<>();
+ Set<ProcessorNode> reachable = new HashSet<>();
+ while (sortedNodes.size() < pnodes.size()) {
+ while (!q.isEmpty()) {
+ ProcessorNode node = q.poll();
+ sortedNodes.add(node);
+ node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
+ String nid = n.getId();
+ Long degree = indegree.get(nid) - 1;
+ indegree.put(nid, degree);
+ if (degree == 0L && !visited.contains(n)) {
+ q.add(n);
+ visited.add(n);
+ }
+ reachable.add(n);
+ });
+ }
+
+ if (sortedNodes.size() < pnodes.size()) {
+ // The remaining nodes have circles
+ // use the following approach to break the circles
+ // start from the nodes that are reachable from previous traverse
+ reachable.removeAll(sortedNodes);
+ if (!reachable.isEmpty()) {
+ //find out the nodes with minimal input edge
+ long min = Long.MAX_VALUE;
+ ProcessorNode minNode = null;
+ for (ProcessorNode node : reachable) {
+ Long degree = indegree.get(node.getId());
+ if (degree < min) {
+ min = degree;
+ minNode = node;
+ }
+ }
+ // start from the node with minimal input edge again
+ q.add(minNode);
+ } else {
+ // all the remaining nodes should be reachable from sources
+ // start from sources again to find the next node that hasn't been visited
+ ProcessorNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
+ .filter(node -> !visited.contains(node))
+ .findAny().get();
+ q.add(nextNode);
+ }
+ }
+ }
+
+ return sortedNodes;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
new file mode 100644
index 0000000..08d94b4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.util.ConfigInheritence;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A ProcessorNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
+ * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
+ * A ProcessorNode contains the input/output, and the configs for physical execution.
+ */
+public class ProcessorNode {
+ private static final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
+ private static final String CONFIG_PROCESSOR_PREFIX = "processors.%s.";
+
+ private final String id;
+ private final List<StreamEdge> inEdges = new ArrayList<>();
+ private final List<StreamEdge> outEdges = new ArrayList<>();
+ private final Config config;
+
+ ProcessorNode(String id, Config config) {
+ this.id = id;
+ this.config = config;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ void addInEdge(StreamEdge in) {
+ inEdges.add(in);
+ }
+
+ void addOutEdge(StreamEdge out) {
+ outEdges.add(out);
+ }
+
+ List<StreamEdge> getInEdges() {
+ return inEdges;
+ }
+
+ List<StreamEdge> getOutEdges() {
+ return outEdges;
+ }
+
+ public Config generateConfig() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put(JobConfig.JOB_NAME(), id);
+
+ List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
+ configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+ log.info("Processor {} has generated configs {}", id, configs);
+
+ String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
+ // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
+ return Util.rewriteConfig(ConfigInheritence.extractScopedConfig(config, new MapConfig(configs), configPrefix));
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
new file mode 100644
index 0000000..664a458
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Util;
+
+
+/**
+ * A StreamEdge connects the source {@link ProcessorNode}s to the target {@link ProcessorNode}s with a stream.
+ * If it's a sink StreamEdge, the target ProcessorNode is empty.
+ * If it's a source StreamEdge, the source ProcessorNode is empty.
+ */
+public class StreamEdge {
+ private final StreamSpec streamSpec;
+ private final List<ProcessorNode> sourceNodes = new ArrayList<>();
+ private final List<ProcessorNode> targetNodes = new ArrayList<>();
+ private final Config config;
+
+ private String name = "";
+ private int partitions = -1;
+
+ StreamEdge(StreamSpec streamSpec, Config config) {
+ this.streamSpec = streamSpec;
+ this.config = config;
+ this.name = Util.getNameFromSystemStream(getSystemStream());
+ }
+
+ void addSourceNode(ProcessorNode sourceNode) {
+ sourceNodes.add(sourceNode);
+ }
+
+ void addTargetNode(ProcessorNode targetNode) {
+ targetNodes.add(targetNode);
+ }
+
+ StreamSpec getStreamSpec() {
+ return streamSpec;
+ }
+
+ SystemStream getSystemStream() {
+ return new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+ }
+
+ String getFormattedSystemStream() {
+ return Util.getNameFromSystemStream(getSystemStream());
+ }
+
+ List<ProcessorNode> getSourceNodes() {
+ return sourceNodes;
+ }
+
+ List<ProcessorNode> getTargetNodes() {
+ return targetNodes;
+ }
+
+ int getPartitions() {
+ return partitions;
+ }
+
+ void setPartitions(int partitions) {
+ this.partitions = partitions;
+ }
+
+ String getName() {
+ return name;
+ }
+
+ void setName(String name) {
+ this.name = name;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
index e592e66..3288c5c 100644
--- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -18,13 +18,24 @@
*/
package org.apache.samza.system;
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.JobRunner;
+import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.StreamGraphBuilder;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.processorgraph.ExecutionPlanner;
+import org.apache.samza.processorgraph.ProcessorGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
*/
public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {
+
+ private static final Logger log = LoggerFactory.getLogger(RemoteExecutionEnvironment.class);
public RemoteExecutionEnvironment(Config config) {
super(config);
@@ -33,9 +44,26 @@
@Override public void run(StreamGraphBuilder app, Config config) {
// TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
// TODO: actually instantiate the tasks and run the job, i.e.
- // 1. create all input/output/intermediate topics
- // 2. create the single job configuration
- // 3. execute JobRunner to submit the single job for the whole graph
- }
+ try {
+ // 1. build stream graph
+ StreamGraph streamGraph = new StreamGraphImpl(this);
+ app.init(streamGraph, config);
+ // 2. create the physical execution plan
+ ExecutionPlanner planner = new ExecutionPlanner(config);
+ ProcessorGraph processorGraph = planner.plan(streamGraph);
+
+ // 3. submit jobs for remote execution
+ processorGraph.getProcessors().forEach(processor -> {
+ Config processorConfig = processor.generateConfig();
+ String processorId = processor.getId();
+ log.info("Starting processor {} with config {}", processorId, config);
+
+ JobRunner runner = new JobRunner(processorConfig);
+ runner.run(true);
+ });
+ } catch (Exception e) {
+ throw new SamzaException("fail to run graph", e);
+ }
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
index 71d60ef..b88e356 100644
--- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -36,7 +36,7 @@
// TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
StreamGraph createGraph(StreamGraphBuilder app, Config config) {
- StreamGraphImpl graph = new StreamGraphImpl();
+ StreamGraphImpl graph = new StreamGraphImpl(this);
app.init(graph, config);
return graph;
}
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index b007e3c..6032f4d 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -25,6 +25,7 @@
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.data.InputMessageEnvelope;
import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.ExecutionEnvironment;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStream;
@@ -77,8 +78,11 @@
@Override
public final void init(Config config, TaskContext context) throws Exception {
+ // for now, we need to create the execution env again
+ // in the future if we decide to serialize the dag, this can be clean up
+ ExecutionEnvironment executionEnvironment = ExecutionEnvironment.fromConfig(config);
// create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
- StreamGraphImpl streams = new StreamGraphImpl();
+ StreamGraphImpl streams = new StreamGraphImpl(executionEnvironment);
this.graphBuilder.init(streams, config);
// get the context manager of the {@link StreamGraph} and initialize the task-specific context
this.contextManager = streams.getContextManager();
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java
new file mode 100644
index 0000000..0bfa8c4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.TaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
+
+public class TaskFactories {
+ private static final Logger log = LoggerFactory.getLogger(TaskFactories.class);
+
+ public static Object fromTaskClassConfig(Config config) throws ClassNotFoundException {
+
+ String taskClassName;
+
+ // if there is configuration to set the job w/ a specific type of task, instantiate the corresponding task factory
+ if (isStreamOperatorTask(config)) {
+ taskClassName = StreamOperatorTask.class.getName();
+ } else {
+ taskClassName = new TaskConfig(config).getTaskClass().getOrElse(defaultValue(null));
+ }
+
+ if (taskClassName == null) {
+ throw new ConfigException("There is no task class defined in the configuration. Failed to create a valid TaskFactory");
+ }
+ log.info("Got task class name: {}", taskClassName);
+
+ boolean isAsyncTaskClass = AsyncStreamTask.class.isAssignableFrom(Class.forName(taskClassName));
+ if (isAsyncTaskClass) {
+ return new AsyncStreamTaskFactory() {
+ @Override
+ public AsyncStreamTask createInstance() {
+ try {
+ return (AsyncStreamTask) Class.forName(taskClassName).newInstance();
+ } catch (Exception e) {
+ log.error("Error loading AsyncStreamTask class: {}. error: {}", taskClassName, e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ return new StreamTaskFactory() {
+ @Override
+ public StreamTask createInstance() {
+ try {
+ return taskClassName == StreamOperatorTask.class.getName() ? createStreamOperatorTask(config) :
+ (StreamTask) Class.forName(taskClassName).newInstance();
+ } catch (Exception e) {
+ log.error("Error loading StreamTask class: {}. error: {}", taskClassName, e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static StreamTask createStreamOperatorTask(Config config) throws Exception {
+ StreamGraphBuilder graphBuilder = (StreamGraphBuilder) Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance();
+ return new StreamOperatorTask(graphBuilder);
+ }
+
+ private static boolean isStreamOperatorTask(Config config) {
+ try {
+ if (config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG) != null && config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG) != "") {
+ return StreamGraphBuilder.class.isAssignableFrom(Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)));
+ }
+ return false;
+ } catch (Exception e) {
+ log.error("Failed to validate StreamGraphBuilder class from the config. {}={}",
+ StreamGraphBuilder.BUILDER_CLASS_CONFIG, config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG));
+ return false;
+ }
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
new file mode 100644
index 0000000..e4fb32f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigInheritence {
+ private static final Logger log = LoggerFactory.getLogger(ConfigInheritence.class);
+ private static final boolean INHERIT_ROOT_CONFIGS = true;
+
+ public static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
+ Config scopedConfig = fullConfig.subset(configPrefix);
+ log.debug("Prefix '{}' has extracted config {}", configPrefix, scopedConfig);
+ log.debug("Prefix '{}' has generated config {}", configPrefix, generatedConfig);
+
+ Config[] configPrecedence;
+ if (INHERIT_ROOT_CONFIGS) {
+ configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig};
+ } else {
+ configPrecedence = new Config[] {generatedConfig, scopedConfig};
+ }
+
+ // Strip empty configs so they don't override the configs before them.
+ Map<String, String> mergedConfig = new HashMap<>();
+ for (Map<String, String> config : configPrecedence) {
+ for (Map.Entry<String, String> property : config.entrySet()) {
+ String value = property.getValue();
+ if (!(value == null || value.isEmpty())) {
+ mergedConfig.put(property.getKey(), property.getValue());
+ }
+ }
+ }
+ scopedConfig = new MapConfig(mergedConfig);
+ log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
+
+ return scopedConfig;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
new file mode 100644
index 0000000..6c7fc2d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import scala.runtime.AbstractFunction0;
+
+/**
+ * Common utils methods that helps to convert or use Scala objects in Java code
+ */
+public class ScalaToJavaUtils {
+ /**
+ * Returns a default value object for scala option.getOrDefault() to use
+ * @param value default value
+ * @param <T> value type
+ * @return object containing default value
+ */
+ public static <T> AbstractFunction0<T> defaultValue(final T value) {
+ return new AbstractFunction0<T>() {
+ @Override
+ public T apply() {
+ return value;
+ }
+ };
+ }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 9d6cbc2..5f22409 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -44,6 +44,7 @@
val SAMZA_FWK_VERSION = "samza.fwk.version"
val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
val JOB_DEFAULT_SYSTEM = "job.default.system"
+ val JOB_DEFAULT_PARTITIONS = "job.default.partitions"
val JOB_CONTAINER_COUNT = "job.container.count"
val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
@@ -79,6 +80,10 @@
// Processor Config Constants
val PROCESSOR_ID = "processor.id"
+ val EXECUTION_ENV = "job.execution.env"
+
+ val STREAM_GRAPH_BUILDER = "job.stream.graph.builder"
+
implicit def Config2Job(config: Config) = new JobConfig(config)
/**
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 89522dc..5476fb5 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -25,6 +25,8 @@
import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
import java.lang.Thread.UncaughtExceptionHandler
import java.net.{URL, UnknownHostException}
+import javax.servlet.SingleThreadModel
+
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
import org.apache.samza.config.JobConfig.Config2Job
@@ -64,13 +66,7 @@
import org.apache.samza.system.chooser.DefaultChooser
import org.apache.samza.system.chooser.MessageChooserFactory
import org.apache.samza.system.chooser.RoundRobinChooserFactory
-import org.apache.samza.task.AsyncRunLoop
-import org.apache.samza.task.AsyncStreamTask
-import org.apache.samza.task.AsyncStreamTaskAdapter
-import org.apache.samza.task.AsyncStreamTaskFactory
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.StreamTaskFactory
-import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.task._
import org.apache.samza.util.HighResolutionClock
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.Logging
@@ -432,28 +428,6 @@
val singleThreadMode = config.getSingleThreadMode
info("Got single thread mode: " + singleThreadMode)
- val taskClassName = config.getTaskClass.orNull
- info("Got task class name: %s" format taskClassName)
-
- if (taskClassName == null && taskFactory == null) {
- throw new SamzaException("Either the task class name or the task factory instance is required.")
- }
-
- val isAsyncTask: Boolean =
- if (taskFactory != null) {
- taskFactory.isInstanceOf[AsyncStreamTaskFactory]
- } else {
- classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName))
- }
-
- if (isAsyncTask) {
- info("Got an AsyncStreamTask implementation.")
- }
-
- if(singleThreadMode && isAsyncTask) {
- throw new SamzaException("AsyncStreamTask cannot run on single thread mode.")
- }
-
val threadPoolSize = config.getThreadPoolSize
info("Got thread pool size: " + threadPoolSize)
@@ -462,6 +436,39 @@
else
null
+ def maybeAsyncStreamTaskFactory : Object = {
+
+ val taskClassFactory = taskFactory match {
+ case null => TaskFactories.fromTaskClassConfig(config)
+ case _ => taskFactory
+ }
+
+ if (taskClassFactory == null) {
+ throw new SamzaException("Either the task class name or the task factory instance is required.")
+ }
+
+ val isAsyncTaskClass: Boolean = taskClassFactory.isInstanceOf[AsyncStreamTaskFactory]
+
+ if (isAsyncTaskClass) {
+ info("Got an AsyncStreamTask implementation.")
+ }
+
+ if (singleThreadMode && isAsyncTaskClass) {
+ throw new SamzaException("AsyncStreamTask cannot run on single thread mode.")
+ }
+
+ if (!singleThreadMode && !isAsyncTaskClass) {
+ info("Converting StreamTask to AsyncStreamTaskAdapter when running StreamTask w/ multiple threads")
+ new AsyncStreamTaskFactory {
+ override def createInstance() = new AsyncStreamTaskAdapter(taskClassFactory.asInstanceOf[StreamTaskFactory].createInstance(), taskThreadPool)
+ }
+ } else {
+ taskClassFactory
+ }
+ }
+
+ val finalTaskFactory = maybeAsyncStreamTaskFactory
+
// Wire up all task-instance-level (unshared) objects.
val taskNames = containerModel
.getTasks
@@ -483,24 +490,14 @@
val taskName = taskModel.getTaskName
- val taskObj = if (taskFactory != null) {
- debug("Using task factory to create task instance")
- taskFactory match {
+ debug("Using task factory to create task instance")
+ val task =
+ finalTaskFactory match {
case tf: AsyncStreamTaskFactory => tf.createInstance()
case tf: StreamTaskFactory => tf.createInstance()
case _ =>
throw new SamzaException("taskFactory must be an instance of StreamTaskFactory or AsyncStreamTaskFactory")
}
- } else {
- debug("Using task class name: %s to create instance" format taskClassName)
- Class.forName(taskClassName).newInstance
- }
-
- val task = if (!singleThreadMode && !isAsyncTask)
- // Wrap the StreamTask into a AsyncStreamTask with the build-in thread pool
- new AsyncStreamTaskAdapter(taskObj.asInstanceOf[StreamTask], taskThreadPool)
- else
- taskObj
val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format taskName)
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 022b480..1d81b33 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,17 +20,22 @@
package org.apache.samza.job
import org.apache.samza.SamzaException
-import org.apache.samza.config.{ConfigRewriter, Config}
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
+import org.apache.samza.config.Config
+import org.apache.samza.config.ConfigRewriter
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.messages.Delete
+import org.apache.samza.coordinator.stream.messages.SetConfig
import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.operators.StreamGraphBuilder
+import org.apache.samza.system.ExecutionEnvironment
import org.apache.samza.util.ClassLoaderHelper
import org.apache.samza.util.CommandLine
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
+
import scala.collection.JavaConversions._
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
object JobRunner extends Logging {
@@ -63,7 +68,16 @@
val cmdline = new CommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
- new JobRunner(rewriteConfig(config)).run()
+
+ // start execution env if it's defined
+ val envClass: String = config.get(ExecutionEnvironment.ENVIRONMENT_CONFIG, "")
+ if (!envClass.isEmpty) {
+ val env: ExecutionEnvironment = ExecutionEnvironment.fromConfig(config)
+ val graphBuilder: StreamGraphBuilder = Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance.asInstanceOf[StreamGraphBuilder]
+ env.run(graphBuilder, rewriteConfig(config))
+ } else {
+ new JobRunner(rewriteConfig(config)).run()
+ }
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 9019d02..97bd22a 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -23,6 +23,7 @@
import java.io._
import java.lang.management.ManagementFactory
import java.util.zip.CRC32
+import org.apache.samza.config.ConfigRewriter
import org.apache.samza.{SamzaException, Partition}
import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
import java.util.Random
@@ -395,4 +396,28 @@
* @return Scala clock function
*/
implicit def asScalaClock(c: HighResolutionClock): () => Long = () => c.nanoTime()
+
+ /**
+ * Re-writes configuration using a ConfigRewriter, if one is defined. If
+ * there is no ConfigRewriter defined for the job, then this method is a
+ * no-op.
+ *
+ * @param config The config to re-write
+ * @return re-written config
+ */
+ def rewriteConfig(config: Config): Config = {
+ def rewrite(c: Config, rewriterName: String): Config = {
+ val klass = config
+ .getConfigRewriterClass(rewriterName)
+ .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+ val rewriter = Util.getObj[ConfigRewriter](klass)
+ info("Re-writing config with " + rewriter)
+ rewriter.rewrite(rewriterName, c)
+ }
+
+ config.getConfigRewriters match {
+ case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+ case _ => config
+ }
+ }
}
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
index 8ecd44f..f817379 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
@@ -21,6 +21,7 @@
import java.lang.reflect.Field;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.impl.OperatorGraph;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.StreamOperatorTask;
@@ -47,7 +48,7 @@
@Test
public void testUserTask() throws Exception {
- Config mockConfig = mock(Config.class);
+ Config mockConfig = spy(new MapConfig());
TaskContext mockContext = mock(TaskContext.class);
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
@@ -64,7 +65,7 @@
@Test
public void testSplitTask() throws Exception {
- Config mockConfig = mock(Config.class);
+ Config mockConfig = spy(new MapConfig());
TaskContext mockContext = mock(TaskContext.class);
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
@@ -81,7 +82,7 @@
@Test
public void testJoinTask() throws Exception {
- Config mockConfig = mock(Config.class);
+ Config mockConfig = spy(new MapConfig());
TaskContext mockContext = mock(TaskContext.class);
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
diff --git a/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java
new file mode 100644
index 0000000..68a2142
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.system.AbstractExecutionEnvironment;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class TestExecutionPlanner {
+
+ private Config config;
+
+ private ExecutionEnvironment env;
+
+ private static final String DEFAULT_SYSTEM = "test-system";
+ private static final int DEFAULT_PARTITIONS = 10;
+
+ private StreamSpec input1;
+ private StreamSpec input2;
+ private StreamSpec input3;
+ private StreamSpec output1;
+ private StreamSpec output2;
+
+ private Map<String, SystemAdmin> systemAdmins;
+
+ private JoinFunction createJoin() {
+ return new JoinFunction() {
+ @Override
+ public Object apply(Object message, Object otherMessage) {
+ return null;
+ }
+
+ @Override
+ public Object getFirstKey(Object message) {
+ return null;
+ }
+
+ @Override
+ public Object getSecondKey(Object message) {
+ return null;
+ }
+ };
+ }
+
+ private SinkFunction createSink() {
+ return new SinkFunction() {
+ @Override
+ public void apply(Object message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
+ }
+ };
+ }
+
+ private SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) {
+
+ return new SystemAdmin() {
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ return null;
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+ Map<String, SystemStreamMetadata> map = new HashMap<>();
+ for (String stream : streamNames) {
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> m = new HashMap<>();
+ for (int i = 0; i < streamToPartitions.get(stream); i++) {
+ m.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata("", "", ""));
+ }
+ map.put(stream, new SystemStreamMetadata(stream, m));
+ }
+ return map;
+ }
+
+ @Override
+ public void createChangelogStream(String streamName, int numOfPartitions) {
+
+ }
+
+ @Override
+ public void validateChangelogStream(String streamName, int numOfPartitions) {
+
+ }
+
+ @Override
+ public void createCoordinatorStream(String streamName) {
+
+ }
+
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ return null;
+ }
+ };
+ }
+
+ private StreamGraph createSimpleGraph() {
+ /**
+ * a simple graph of partitionBy and map
+ *
+ * input1 -> partitionBy -> map -> output1
+ *
+ */
+ StreamGraph streamGraph = new StreamGraphImpl(env);
+ streamGraph.createInStream(input1, null, null).partitionBy(m -> "yes!!!").map(m -> m).sendTo(streamGraph.createOutStream(output1, null, null));
+ return streamGraph;
+ }
+
+ private StreamGraph createStreamGraphWithJoin() {
+
+ /** the graph looks like the following
+ *
+ * input1 -> map -> join -> output1
+ * |
+ * input2 -> partitionBy -> filter -|
+ * |
+ * input3 -> filter -> partitionBy -> map -> join -> output2
+ *
+ */
+
+ StreamGraph streamGraph = new StreamGraphImpl(env);
+ MessageStream m1 = streamGraph.createInStream(input1, null, null).map(m -> m);
+ MessageStream m2 = streamGraph.createInStream(input2, null, null).partitionBy(m -> "haha").filter(m -> true);
+ MessageStream m3 = streamGraph.createInStream(input3, null, null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+
+ m1.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output1, null, null));
+ m3.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output2, null, null));
+
+ return streamGraph;
+ }
+
+ @Before
+ public void setup() {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME(), "test-app");
+ configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), DEFAULT_SYSTEM);
+
+ config = new MapConfig(configMap);
+
+ env = new AbstractExecutionEnvironment(config) {
+ @Override
+ public void run(StreamGraphBuilder graphBuilder, Config config) {
+ }
+ };
+
+ input1 = new StreamSpec("input1", "input1", "system1");
+ input2 = new StreamSpec("input2", "input2", "system2");
+ input3 = new StreamSpec("input3", "input3", "system2");
+
+ output1 = new StreamSpec("output1", "output1", "system1");
+ output2 = new StreamSpec("output2", "output2", "system2");
+
+ // set up external partition count
+ Map<String, Integer> system1Map = new HashMap<>();
+ system1Map.put("input1", 64);
+ system1Map.put("output1", 8);
+ Map<String, Integer> system2Map = new HashMap<>();
+ system2Map.put("input2", 16);
+ system2Map.put("input3", 32);
+ system2Map.put("output2", 16);
+
+ SystemAdmin systemAdmin1 = createSystemAdmin(system1Map);
+ SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
+ systemAdmins = new HashMap<>();
+ systemAdmins.put("system1", systemAdmin1);
+ systemAdmins.put("system2", systemAdmin2);
+ }
+
+ @Test
+ public void testCreateProcessorGraph() {
+ ExecutionPlanner planner = new ExecutionPlanner(config);
+ StreamGraph streamGraph = createStreamGraphWithJoin();
+
+ ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+ assertTrue(processorGraph.getSources().size() == 3);
+ assertTrue(processorGraph.getSinks().size() == 2);
+ assertTrue(processorGraph.getInternalStreams().size() == 2); // two streams generated by partitionBy
+ }
+
+ @Test
+ public void testFetchExistingStreamPartitions() {
+ ExecutionPlanner planner = new ExecutionPlanner(config);
+ StreamGraph streamGraph = createStreamGraphWithJoin();
+ ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+
+ ExecutionPlanner.fetchExistingStreamPartitions(processorGraph, systemAdmins);
+ assertTrue(processorGraph.getEdge(input1).getPartitions() == 64);
+ assertTrue(processorGraph.getEdge(input2).getPartitions() == 16);
+ assertTrue(processorGraph.getEdge(input3).getPartitions() == 32);
+ assertTrue(processorGraph.getEdge(output1).getPartitions() == 8);
+ assertTrue(processorGraph.getEdge(output2).getPartitions() == 16);
+
+ processorGraph.getInternalStreams().forEach(edge -> {
+ assertTrue(edge.getPartitions() == -1);
+ });
+ }
+
+ @Test
+ public void testCalculateJoinInputPartitions() {
+ ExecutionPlanner planner = new ExecutionPlanner(config);
+ StreamGraph streamGraph = createStreamGraphWithJoin();
+ ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+
+ ExecutionPlanner.fetchExistingStreamPartitions(processorGraph, systemAdmins);
+ ExecutionPlanner.calculateJoinInputPartitions(streamGraph, processorGraph);
+
+ // the partitions should be the same as input1
+ processorGraph.getInternalStreams().forEach(edge -> {
+ assertTrue(edge.getPartitions() == 64);
+ });
+ }
+
+ @Test
+ public void testDefaultPartitions() {
+ Map<String, String> map = new HashMap<>(config);
+ map.put(JobConfig.JOB_DEFAULT_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
+ Config cfg = new MapConfig(map);
+
+ ExecutionPlanner planner = new ExecutionPlanner(cfg);
+ StreamGraph streamGraph = createSimpleGraph();
+ ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+ planner.calculatePartitions(streamGraph, processorGraph, systemAdmins);
+
+ // the partitions should be the same as input1
+ processorGraph.getInternalStreams().forEach(edge -> {
+ assertTrue(edge.getPartitions() == DEFAULT_PARTITIONS);
+ });
+ }
+
+ @Test
+ public void testCalculateIntStreamPartitions() {
+ ExecutionPlanner planner = new ExecutionPlanner(config);
+ StreamGraph streamGraph = createSimpleGraph();
+ ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+ planner.calculatePartitions(streamGraph, processorGraph, systemAdmins);
+
+ // the partitions should be the same as input1
+ processorGraph.getInternalStreams().forEach(edge -> {
+ assertTrue(edge.getPartitions() == 64); // max of input1 and output1
+ });
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java b/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java
new file mode 100644
index 0000000..7aa9f41
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class TestProcessorGraph {
+
+ ProcessorGraph graph1;
+ ProcessorGraph graph2;
+ int streamSeq = 0;
+
+ private StreamSpec genStream() {
+ ++streamSeq;
+
+ return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
+ }
+
+ @Before
+ public void setup() {
+ /**
+ * graph1 is the example graph from wikipedia
+ *
+ * 5 7 3
+ * | / | / |
+ * v v |
+ * 11 8 |
+ * | \X /
+ * v v \v
+ * 2 9 10
+ */
+ // init graph1
+ graph1 = new ProcessorGraph(null);
+ graph1.addSource(genStream(), "5");
+ graph1.addSource(genStream(), "7");
+ graph1.addSource(genStream(), "3");
+ graph1.addEdge(genStream(), "5", "11");
+ graph1.addEdge(genStream(), "7", "11");
+ graph1.addEdge(genStream(), "7", "8");
+ graph1.addEdge(genStream(), "3", "8");
+ graph1.addEdge(genStream(), "11", "2");
+ graph1.addEdge(genStream(), "11", "9");
+ graph1.addEdge(genStream(), "8", "9");
+ graph1.addEdge(genStream(), "11", "10");
+ graph1.addSink(genStream(), "2");
+ graph1.addSink(genStream(), "9");
+ graph1.addSink(genStream(), "10");
+
+ /**
+ * graph2 is a graph with a loop
+ * 1 -> 2 -> 3 -> 4 -> 5 -> 7
+ * |<---6 <--| <>
+ */
+ graph2 = new ProcessorGraph(null);
+ graph2.addSource(genStream(), "1");
+ graph2.addEdge(genStream(), "1", "2");
+ graph2.addEdge(genStream(), "2", "3");
+ graph2.addEdge(genStream(), "3", "4");
+ graph2.addEdge(genStream(), "4", "5");
+ graph2.addEdge(genStream(), "4", "6");
+ graph2.addEdge(genStream(), "6", "2");
+ graph2.addEdge(genStream(), "5", "5");
+ graph2.addEdge(genStream(), "5", "7");
+ graph2.addSink(genStream(), "7");
+ }
+
+ @Test
+ public void testAddSource() {
+ ProcessorGraph graph = new ProcessorGraph(null);
+
+ /**
+ * s1 -> 1
+ * s2 ->|
+ *
+ * s3 -> 2
+ * |-> 3
+ */
+ StreamSpec s1 = genStream();
+ StreamSpec s2 = genStream();
+ StreamSpec s3 = genStream();
+ graph.addSource(s1, "1");
+ graph.addSource(s2, "1");
+ graph.addSource(s3, "2");
+ graph.addSource(s3, "3");
+
+ assertTrue(graph.getSources().size() == 3);
+
+ assertTrue(graph.getNode("1").getInEdges().size() == 2);
+ assertTrue(graph.getNode("2").getInEdges().size() == 1);
+ assertTrue(graph.getNode("3").getInEdges().size() == 1);
+
+ assertTrue(graph.getEdge(s1).getSourceNodes().size() == 0);
+ assertTrue(graph.getEdge(s1).getTargetNodes().size() == 1);
+ assertTrue(graph.getEdge(s2).getSourceNodes().size() == 0);
+ assertTrue(graph.getEdge(s2).getTargetNodes().size() == 1);
+ assertTrue(graph.getEdge(s3).getSourceNodes().size() == 0);
+ assertTrue(graph.getEdge(s3).getTargetNodes().size() == 2);
+ }
+
+ @Test
+ public void testAddSink() {
+ /**
+ * 1 -> s1
+ * 2 -> s2
+ * 2 -> s3
+ */
+ StreamSpec s1 = genStream();
+ StreamSpec s2 = genStream();
+ StreamSpec s3 = genStream();
+ ProcessorGraph graph = new ProcessorGraph(null);
+ graph.addSink(s1, "1");
+ graph.addSink(s2, "2");
+ graph.addSink(s3, "2");
+
+ assertTrue(graph.getSinks().size() == 3);
+ assertTrue(graph.getNode("1").getOutEdges().size() == 1);
+ assertTrue(graph.getNode("2").getOutEdges().size() == 2);
+
+ assertTrue(graph.getEdge(s1).getSourceNodes().size() == 1);
+ assertTrue(graph.getEdge(s1).getTargetNodes().size() == 0);
+ assertTrue(graph.getEdge(s2).getSourceNodes().size() == 1);
+ assertTrue(graph.getEdge(s2).getTargetNodes().size() == 0);
+ assertTrue(graph.getEdge(s3).getSourceNodes().size() == 1);
+ assertTrue(graph.getEdge(s3).getTargetNodes().size() == 0);
+ }
+
+ @Test
+ public void testReachable() {
+ Set<ProcessorNode> reachable1 = graph1.findReachable();
+ assertTrue(reachable1.size() == 8);
+
+ Set<ProcessorNode> reachable2 = graph2.findReachable();
+ assertTrue(reachable2.size() == 7);
+ }
+
+ @Test
+ public void testTopologicalSort() {
+
+ // test graph1
+ List<ProcessorNode> sortedNodes1 = graph1.topologicalSort();
+ Map<String, Integer> idxMap1 = new HashMap<>();
+ for (int i = 0; i < sortedNodes1.size(); i++) {
+ idxMap1.put(sortedNodes1.get(i).getId(), i);
+ }
+
+ assertTrue(idxMap1.size() == 8);
+ assertTrue(idxMap1.get("11") > idxMap1.get("5"));
+ assertTrue(idxMap1.get("11") > idxMap1.get("7"));
+ assertTrue(idxMap1.get("8") > idxMap1.get("7"));
+ assertTrue(idxMap1.get("8") > idxMap1.get("3"));
+ assertTrue(idxMap1.get("2") > idxMap1.get("11"));
+ assertTrue(idxMap1.get("9") > idxMap1.get("8"));
+ assertTrue(idxMap1.get("9") > idxMap1.get("11"));
+ assertTrue(idxMap1.get("10") > idxMap1.get("11"));
+ assertTrue(idxMap1.get("10") > idxMap1.get("3"));
+
+ // test graph2
+ List<ProcessorNode> sortedNodes2 = graph2.topologicalSort();
+ Map<String, Integer> idxMap2 = new HashMap<>();
+ for (int i = 0; i < sortedNodes2.size(); i++) {
+ idxMap2.put(sortedNodes2.get(i).getId(), i);
+ }
+
+ assertTrue(idxMap2.size() == 7);
+ assertTrue(idxMap2.get("2") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("3") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("4") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("6") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("5") > idxMap2.get("4"));
+ assertTrue(idxMap2.get("7") > idxMap2.get("5"));
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java
new file mode 100644
index 0000000..29741a4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.testUtils.TestAsyncStreamTask;
+import org.apache.samza.testUtils.TestStreamTask;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test methods to create {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} based on task class configuration
+ */
+public class TestTaskFactories {
+
+ @Test
+ public void testStreamTaskClass() throws ClassNotFoundException {
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "no.such.class");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ no.such.class");
+ } catch (ClassNotFoundException cfe) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testStreamOperatorTaskClass() throws ClassNotFoundException {
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ no.such.class");
+ } catch (ConfigException ce) {
+ // expected
+ }
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "no.such.class");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ no.such.class");
+ } catch (ConfigException ce) {
+ // expected
+ }
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ empty class name for StreamGraphBuilder");
+ } catch (ConfigException ce) {
+ // expected
+ }
+
+ config = new MapConfig(new HashMap<String, String>());
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail(String.format("Should have failed w/ non-existing entry for %s", StreamGraphBuilder.BUILDER_CLASS_CONFIG));
+ } catch (ConfigException ce) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testStreamOperatorTaskClassWithTaskClass() throws ClassNotFoundException {
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "no.such.class");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+ }
+
+ @Test
+ public void testStreamTaskClassWithInvalidStreamGraphBuilder() throws ClassNotFoundException {
+
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, null);
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ no class not found");
+ } catch (ClassNotFoundException cne) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testAsyncStreamTask() throws ClassNotFoundException {
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+ assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "no.such.class");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ no.such.class");
+ } catch (ClassNotFoundException cfe) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws ClassNotFoundException {
+
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+ assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+ assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, null);
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+ assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java b/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java
new file mode 100644
index 0000000..2fe6946
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+/**
+ * Test class. Invalid class to implement {@link org.apache.samza.operators.StreamGraphBuilder}
+ */
+public class InvalidStreamGraphBuilder {
+}
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
new file mode 100644
index 0000000..81f3fd4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Test implementation class for {@link AsyncStreamTask}
+ */
+public class TestAsyncStreamTask implements AsyncStreamTask {
+ @Override
+ public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) {
+
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java
new file mode 100644
index 0000000..10eb02f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+
+/**
+ * Test implementation class for {@link StreamGraphBuilder}
+ */
+public class TestStreamGraphBuilder implements StreamGraphBuilder {
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
new file mode 100644
index 0000000..ce0980a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Test implementation class for {@link StreamTask}
+ */
+public class TestStreamTask implements StreamTask {
+ @Override
+ public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index bfda464..9a30dc6 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -56,7 +56,7 @@
@Before
public void testSetup() {
- testZkConnectionString = "localhost:" + zkServer.getPort();
+ testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
try {
testZkUtils = getZkUtilsWithNewClient();
} catch (Exception e) {
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 58c3ed6..09dc5c7 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -55,7 +55,7 @@
public void testSetup() {
try {
zkClient = new ZkClient(
- new ZkConnection("localhost:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
+ new ZkConnection("127.0.0.1:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
CONNECTION_TIMEOUT_MS);
} catch (Exception e) {
Assert.fail("Client connection setup failed. Aborting tests..");
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 5d82b92..b803dfe 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -89,8 +89,6 @@
"systems.kafka.samza.offset.default" -> "oldest", // applies to a nonempty topic
"systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
"systems.kafka.samza.msg.serde" -> "string",
- "systems.kafka.consumer.zookeeper.connect" -> "localhost:2181",
- "systems.kafka.producer.bootstrap.servers" -> "localhost:9092",
// Since using state, need a checkpoint manager
"task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
"task.checkpoint.system" -> "kafka",
@@ -122,6 +120,7 @@
val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT)
brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
+ // setup the zookeeper and bootstrap servers for local kafka cluster
jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
"systems.kafka.producer.bootstrap.servers" -> brokers)