Merge branch 'NewConsumer2' of https://github.com/sborya/samza into NewKafkaSystemConsumer
diff --git a/.travis.yml b/.travis.yml
index 2a3ae0c..601ceac 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -30,8 +30,8 @@
- oraclejdk8
script:
- ## travis_wait increases build idle-wait time from 10 minutes to 20 minutes.
- - travis_wait 20 ./gradlew clean build
+ ## travis_wait increases build idle-wait time from 10 minutes to 30 minutes.
+ - travis_wait 30 ./gradlew clean build
- type sonar-scanner &>/dev/null; if [ $? -eq 0 ]; then sonar-scanner; else echo "Not running sonar"; fi
before_cache:
diff --git a/build.gradle b/build.gradle
index 7d23717..8b68205 100644
--- a/build.gradle
+++ b/build.gradle
@@ -184,6 +184,7 @@
compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
+ compile "net.jodah:failsafe:$failsafeVersion"
testCompile project(":samza-api").sourceSets.test.output
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 3c4c3a9..b33ab82 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -45,4 +45,5 @@
yarnVersion = "2.6.1"
zkClientVersion = "0.8"
zookeeperVersion = "3.4.6"
+ failsafeVersion = "1.1.0"
}
diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index c847088..cb5d5c0 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -56,7 +56,6 @@
*/
public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled";
public static final String CLUSTER_MANAGER_HOST_AFFINITY_ENABLED = "job.host-affinity.enabled";
- private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false;
/**
* Number of CPU cores to request from the cluster manager per container
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index ea892fe..810f424 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -21,6 +21,7 @@
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -38,7 +39,6 @@
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
import org.apache.samza.table.TableSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,14 +54,16 @@
public class ExecutionPlanner {
private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
- static final int MAX_INFERRED_PARTITIONS = 256;
+ /* package private */ static final int MAX_INFERRED_PARTITIONS = 256;
private final Config config;
+ private final StreamConfig streamConfig;
private final StreamManager streamManager;
public ExecutionPlanner(Config config, StreamManager streamManager) {
this.config = config;
this.streamManager = streamManager;
+ this.streamConfig = new StreamConfig(config);
}
public ExecutionPlan plan(OperatorSpecGraph specGraph) {
@@ -71,12 +73,10 @@
JobGraph jobGraph = createJobGraph(specGraph);
// fetch the external streams partition info
- updateExistingPartitions(jobGraph, streamManager);
+ fetchInputAndOutputStreamPartitions(jobGraph, streamManager);
- if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
- // figure out the partitions for internal streams
- calculatePartitions(jobGraph);
- }
+ // figure out the partitions for internal streams
+ calculatePartitions(jobGraph);
return jobGraph;
}
@@ -85,9 +85,9 @@
ApplicationConfig appConfig = new ApplicationConfig(config);
ClusterManagerConfig clusterConfig = new ClusterManagerConfig(config);
// currently we don't support host-affinity in batch mode
- if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH
- && clusterConfig.getHostAffinityEnabled()) {
- throw new SamzaException("Host affinity is not supported in batch mode. Please configure job.host-affinity.enabled=false.");
+ if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH && clusterConfig.getHostAffinityEnabled()) {
+ throw new SamzaException(String.format("Host affinity is not supported in batch mode. Please configure %s=false.",
+ ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED));
}
}
@@ -96,30 +96,33 @@
*/
/* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) {
JobGraph jobGraph = new JobGraph(config, specGraph);
- StreamConfig streamConfig = new StreamConfig(config);
+
+ // Source streams contain both input and intermediate streams.
Set<StreamSpec> sourceStreams = getStreamSpecs(specGraph.getInputOperators().keySet(), streamConfig);
+ // Sink streams contain both output and intermediate streams.
Set<StreamSpec> sinkStreams = getStreamSpecs(specGraph.getOutputStreams().keySet(), streamConfig);
- Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
- Set<TableSpec> tables = new HashSet<>(specGraph.getTables().keySet());
- intStreams.retainAll(sinkStreams);
- sourceStreams.removeAll(intStreams);
- sinkStreams.removeAll(intStreams);
+
+ Set<StreamSpec> intermediateStreams = Sets.intersection(sourceStreams, sinkStreams);
+ Set<StreamSpec> inputStreams = Sets.difference(sourceStreams, intermediateStreams);
+ Set<StreamSpec> outputStreams = Sets.difference(sinkStreams, intermediateStreams);
+
+ Set<TableSpec> tables = specGraph.getTables().keySet();
// For this phase, we have a single job node for the whole dag
String jobName = config.get(JobConfig.JOB_NAME());
String jobId = config.get(JobConfig.JOB_ID(), "1");
JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId);
- // add sources
- sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
+ // Add input streams
+ inputStreams.forEach(spec -> jobGraph.addInputStream(spec, node));
- // add sinks
- sinkStreams.forEach(spec -> jobGraph.addSink(spec, node));
+ // Add output streams
+ outputStreams.forEach(spec -> jobGraph.addOutputStream(spec, node));
- // add intermediate streams
- intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
+ // Add intermediate streams
+ intermediateStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
- // add tables
+ // Add tables
tables.forEach(spec -> jobGraph.addTable(spec, node));
jobGraph.validate();
@@ -132,71 +135,80 @@
*/
/* package private */ void calculatePartitions(JobGraph jobGraph) {
// calculate the partitions for the input streams of join operators
- calculateJoinInputPartitions(jobGraph, config);
+ calculateJoinInputPartitions(jobGraph, streamConfig);
// calculate the partitions for the rest of intermediate streams
- calculateIntStreamPartitions(jobGraph, config);
+ calculateIntermediateStreamPartitions(jobGraph, config);
// validate all the partitions are assigned
- validatePartitions(jobGraph);
+ validateIntermediateStreamPartitions(jobGraph);
}
/**
- * Fetch the partitions of source/sink streams and update the StreamEdges.
+ * Fetch the partitions of input/output streams and update the corresponding StreamEdges.
* @param jobGraph {@link JobGraph}
* @param streamManager the {@link StreamManager} to interface with the streams.
*/
- /* package private */ static void updateExistingPartitions(JobGraph jobGraph, StreamManager streamManager) {
+ /* package private */ static void fetchInputAndOutputStreamPartitions(JobGraph jobGraph, StreamManager streamManager) {
Set<StreamEdge> existingStreams = new HashSet<>();
- existingStreams.addAll(jobGraph.getSources());
- existingStreams.addAll(jobGraph.getSinks());
+ existingStreams.addAll(jobGraph.getInputStreams());
+ existingStreams.addAll(jobGraph.getOutputStreams());
+ // System to StreamEdges
Multimap<String, StreamEdge> systemToStreamEdges = HashMultimap.create();
- // group the StreamEdge(s) based on the system name
- existingStreams.forEach(streamEdge -> {
- SystemStream systemStream = streamEdge.getSystemStream();
- systemToStreamEdges.put(systemStream.getSystem(), streamEdge);
- });
- for (Map.Entry<String, Collection<StreamEdge>> entry : systemToStreamEdges.asMap().entrySet()) {
- String systemName = entry.getKey();
- Collection<StreamEdge> streamEdges = entry.getValue();
+
+ // Group StreamEdges by system
+ for (StreamEdge streamEdge : existingStreams) {
+ String system = streamEdge.getSystemStream().getSystem();
+ systemToStreamEdges.put(system, streamEdge);
+ }
+
+ // Fetch partition count for every set of StreamEdges belonging to a particular system.
+ for (String system : systemToStreamEdges.keySet()) {
+ Collection<StreamEdge> streamEdges = systemToStreamEdges.get(system);
+
+ // Map every stream to its corresponding StreamEdge so we can retrieve a StreamEdge given its stream.
Map<String, StreamEdge> streamToStreamEdge = new HashMap<>();
- // create the stream name to StreamEdge mapping for this system
- streamEdges.forEach(streamEdge -> streamToStreamEdge.put(streamEdge.getSystemStream().getStream(), streamEdge));
- // retrieve the partition counts for the streams in this system
- Map<String, Integer> streamToPartitionCount = streamManager.getStreamPartitionCounts(systemName, streamToStreamEdge.keySet());
- // set the partitions of a stream to its StreamEdge
- streamToPartitionCount.forEach((stream, partitionCount) -> {
- streamToStreamEdge.get(stream).setPartitionCount(partitionCount);
- log.info("Partition count is {} for stream {}", partitionCount, stream);
- });
+ for (StreamEdge streamEdge : streamEdges) {
+ streamToStreamEdge.put(streamEdge.getSystemStream().getStream(), streamEdge);
+ }
+
+ // Retrieve partition count for every set of streams.
+ Set<String> streams = streamToStreamEdge.keySet();
+ Map<String, Integer> streamToPartitionCount = streamManager.getStreamPartitionCounts(system, streams);
+
+ // Retrieve StreamEdge corresponding to every stream and set partition count on it.
+ for (Map.Entry<String, Integer> entry : streamToPartitionCount.entrySet()) {
+ String stream = entry.getKey();
+ Integer partitionCount = entry.getValue();
+ streamToStreamEdge.get(stream).setPartitionCount(partitionCount);
+ log.info("Fetched partition count value {} for stream {}", partitionCount, stream);
+ }
}
}
/**
* Calculate the partitions for the input streams of join operators
*/
- /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph, Config config) {
+ /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph, StreamConfig streamConfig) {
// mapping from a source stream to all join specs reachable from it
- Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
+ Multimap<JoinOperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
// reverse mapping of the above
- Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs = HashMultimap.create();
+ Multimap<StreamEdge, JoinOperatorSpec> streamEdgeToJoinSpecs = HashMultimap.create();
// A queue of joins with known input partitions
- Queue<OperatorSpec> joinQ = new LinkedList<>();
+ Queue<JoinOperatorSpec> joinQ = new LinkedList<>();
// The visited set keeps track of the join specs that have been already inserted in the queue before
- Set<OperatorSpec> visited = new HashSet<>();
+ Set<JoinOperatorSpec> visited = new HashSet<>();
- StreamConfig streamConfig = new StreamConfig(config);
-
- jobGraph.getSpecGraph().getInputOperators().forEach((key, value) -> {
- StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(key, streamConfig));
+ jobGraph.getSpecGraph().getInputOperators().forEach((streamId, inputOperatorSpec) -> {
+ StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(streamId, streamConfig));
// Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
- findReachableJoins(value, streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited);
+ findReachableJoins(inputOperatorSpec, streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited);
});
// At this point, joinQ contains joinSpecs where at least one of the input stream edge partitions is known.
while (!joinQ.isEmpty()) {
- OperatorSpec join = joinQ.poll();
+ JoinOperatorSpec join = joinQ.poll();
int partitions = StreamEdge.PARTITIONS_UNKNOWN;
// loop through the input streams to the join and find the partition count
for (StreamEdge edge : joinSpecToStreamEdges.get(join)) {
@@ -223,7 +235,7 @@
edge.setPartitionCount(partitions);
// find other joins can be inferred by setting this edge
- for (OperatorSpec op : streamEdgeToJoinSpecs.get(edge)) {
+ for (JoinOperatorSpec op : streamEdgeToJoinSpecs.get(edge)) {
if (!visited.contains(op)) {
joinQ.add(op);
visited.add(op);
@@ -244,17 +256,19 @@
* @param joinQ queue that contains joinSpecs where at least one of the input stream edge partitions is known.
*/
private static void findReachableJoins(OperatorSpec operatorSpec, StreamEdge sourceStreamEdge,
- Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges,
- Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs,
- Queue<OperatorSpec> joinQ, Set<OperatorSpec> visited) {
- if (operatorSpec instanceof JoinOperatorSpec) {
- joinSpecToStreamEdges.put(operatorSpec, sourceStreamEdge);
- streamEdgeToJoinSpecs.put(sourceStreamEdge, operatorSpec);
+ Multimap<JoinOperatorSpec, StreamEdge> joinSpecToStreamEdges,
+ Multimap<StreamEdge, JoinOperatorSpec> streamEdgeToJoinSpecs,
+ Queue<JoinOperatorSpec> joinQ, Set<JoinOperatorSpec> visited) {
- if (!visited.contains(operatorSpec) && sourceStreamEdge.getPartitionCount() > 0) {
+ if (operatorSpec instanceof JoinOperatorSpec) {
+ JoinOperatorSpec joinOperatorSpec = (JoinOperatorSpec) operatorSpec;
+ joinSpecToStreamEdges.put(joinOperatorSpec, sourceStreamEdge);
+ streamEdgeToJoinSpecs.put(sourceStreamEdge, joinOperatorSpec);
+
+ if (!visited.contains(joinOperatorSpec) && sourceStreamEdge.getPartitionCount() > 0) {
// put the joins with known input partitions into the queue and mark as visited
- joinQ.add(operatorSpec);
- visited.add(operatorSpec);
+ joinQ.add(joinOperatorSpec);
+ visited.add(joinOperatorSpec);
}
}
@@ -265,15 +279,16 @@
}
}
- private static void calculateIntStreamPartitions(JobGraph jobGraph, Config config) {
- int partitions = config.getInt(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), StreamEdge.PARTITIONS_UNKNOWN);
- if (partitions < 0) {
+ private static void calculateIntermediateStreamPartitions(JobGraph jobGraph, Config config) {
+ final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS();
+ int partitions = config.getInt(defaultPartitionsConfigProperty, StreamEdge.PARTITIONS_UNKNOWN);
+ if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
// use the following simple algo to figure out the partitions
// partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
// partition will be further bounded by MAX_INFERRED_PARTITIONS.
// This is important when running in hadoop where an HDFS input can have lots of files (partitions).
- int maxInPartitions = maxPartition(jobGraph.getSources());
- int maxOutPartitions = maxPartition(jobGraph.getSinks());
+ int maxInPartitions = maxPartitions(jobGraph.getInputStreams());
+ int maxOutPartitions = maxPartitions(jobGraph.getOutputStreams());
partitions = Math.max(maxInPartitions, maxOutPartitions);
if (partitions > MAX_INFERRED_PARTITIONS) {
@@ -281,7 +296,17 @@
log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
partitions, MAX_INFERRED_PARTITIONS));
}
+ } else {
+ // Reject any zero or other negative values explicitly specified in config.
+ if (partitions <= 0) {
+ throw new SamzaException(String.format("Invalid value %d specified for config property %s", partitions,
+ defaultPartitionsConfigProperty));
+ }
+
+ log.info("Using partition count value {} specified for config property {}", partitions,
+ defaultPartitionsConfigProperty);
}
+
for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
if (edge.getPartitionCount() <= 0) {
log.info("Set the partition count for intermediate stream {} to {}.", edge.getName(), partitions);
@@ -290,16 +315,15 @@
}
}
- private static void validatePartitions(JobGraph jobGraph) {
+ private static void validateIntermediateStreamPartitions(JobGraph jobGraph) {
for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
if (edge.getPartitionCount() <= 0) {
- throw new SamzaException(String.format("Failure to assign the partitions to Stream %s", edge.getName()));
+ throw new SamzaException(String.format("Failed to assign valid partition count to Stream %s", edge.getName()));
}
}
}
- /* package private */ static int maxPartition(Collection<StreamEdge> edges) {
- return edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).orElse(StreamEdge.PARTITIONS_UNKNOWN);
+ /* package private */ static int maxPartitions(Collection<StreamEdge> edges) {
+ return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN);
}
-
}
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index f49e6db..5b19095 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -54,8 +54,8 @@
private final Map<String, JobNode> nodes = new HashMap<>();
private final Map<String, StreamEdge> edges = new HashMap<>();
- private final Set<StreamEdge> sources = new HashSet<>();
- private final Set<StreamEdge> sinks = new HashSet<>();
+ private final Set<StreamEdge> inputStreams = new HashSet<>();
+ private final Set<StreamEdge> outputStreams = new HashSet<>();
private final Set<StreamEdge> intermediateStreams = new HashSet<>();
private final Set<TableSpec> tables = new HashSet<>();
private final Config config;
@@ -115,26 +115,26 @@
/**
* Add a source stream to a {@link JobNode}
- * @param input source stream
- * @param node the job node that consumes from the source
+ * @param streamSpec input stream
+ * @param node the job node that consumes from the streamSpec
*/
- void addSource(StreamSpec input, JobNode node) {
- StreamEdge edge = getOrCreateStreamEdge(input);
+ void addInputStream(StreamSpec streamSpec, JobNode node) {
+ StreamEdge edge = getOrCreateStreamEdge(streamSpec);
edge.addTargetNode(node);
node.addInEdge(edge);
- sources.add(edge);
+ inputStreams.add(edge);
}
/**
- * Add a sink stream to a {@link JobNode}
- * @param output sink stream
- * @param node the job node that outputs to the sink
+ * Add an output stream to a {@link JobNode}
+ * @param streamSpec output stream
+ * @param node the job node that outputs to the output stream
*/
- void addSink(StreamSpec output, JobNode node) {
- StreamEdge edge = getOrCreateStreamEdge(output);
+ void addOutputStream(StreamSpec streamSpec, JobNode node) {
+ StreamEdge edge = getOrCreateStreamEdge(streamSpec);
edge.addSourceNode(node);
node.addOutEdge(edge);
- sinks.add(edge);
+ outputStreams.add(edge);
}
/**
@@ -204,19 +204,19 @@
}
/**
- * Returns the source streams in the graph
+ * Returns the input streams in the graph
* @return unmodifiable set of {@link StreamEdge}
*/
- Set<StreamEdge> getSources() {
- return Collections.unmodifiableSet(sources);
+ Set<StreamEdge> getInputStreams() {
+ return Collections.unmodifiableSet(inputStreams);
}
/**
- * Return the sink streams in the graph
+ * Return the output streams in the graph
* @return unmodifiable set of {@link StreamEdge}
*/
- Set<StreamEdge> getSinks() {
- return Collections.unmodifiableSet(sinks);
+ Set<StreamEdge> getOutputStreams() {
+ return Collections.unmodifiableSet(outputStreams);
}
/**
@@ -236,22 +236,22 @@
}
/**
- * Validate the graph has the correct topology, meaning the sources are coming from external streams,
- * sinks are going to external streams, and the nodes are connected with intermediate streams.
- * Also validate all the nodes are reachable from the sources.
+ * Validate the graph has the correct topology, meaning the input streams are coming from external streams,
+ * output streams are going to external streams, and the nodes are connected with intermediate streams.
+ * Also validate all the nodes are reachable from the input streams.
*/
void validate() {
- validateSources();
- validateSinks();
+ validateInputStreams();
+ validateOutputStreams();
validateInternalStreams();
validateReachability();
}
/**
- * Validate the sources should have indegree being 0 and outdegree greater than 0
+ * Validate the input streams should have indegree being 0 and outdegree greater than 0
*/
- private void validateSources() {
- sources.forEach(edge -> {
+ private void validateInputStreams() {
+ inputStreams.forEach(edge -> {
if (!edge.getSourceNodes().isEmpty()) {
throw new IllegalArgumentException(
String.format("Source stream %s should not have producers.", edge.getName()));
@@ -264,10 +264,10 @@
}
/**
- * Validate the sinks should have outdegree being 0 and indegree greater than 0
+ * Validate the output streams should have outdegree being 0 and indegree greater than 0
*/
- private void validateSinks() {
- sinks.forEach(edge -> {
+ private void validateOutputStreams() {
+ outputStreams.forEach(edge -> {
if (!edge.getTargetNodes().isEmpty()) {
throw new IllegalArgumentException(
String.format("Sink stream %s should not have consumers", edge.getName()));
@@ -284,8 +284,8 @@
*/
private void validateInternalStreams() {
Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
- internalEdges.removeAll(sources);
- internalEdges.removeAll(sinks);
+ internalEdges.removeAll(inputStreams);
+ internalEdges.removeAll(outputStreams);
internalEdges.forEach(edge -> {
if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
@@ -296,10 +296,10 @@
}
/**
- * Validate all nodes are reachable by sources.
+ * Validate all nodes are reachable by input streams.
*/
private void validateReachability() {
- // validate all nodes are reachable from the sources
+ // validate all nodes are reachable from the input streams
final Set<JobNode> reachable = findReachable();
if (reachable.size() != nodes.size()) {
Set<JobNode> unreachable = new HashSet<>(nodes.values());
@@ -317,8 +317,8 @@
Queue<JobNode> queue = new ArrayDeque<>();
Set<JobNode> visited = new HashSet<>();
- sources.forEach(source -> {
- List<JobNode> next = source.getTargetNodes();
+ inputStreams.forEach(input -> {
+ List<JobNode> next = input.getTargetNodes();
queue.addAll(next);
visited.addAll(next);
});
@@ -353,11 +353,11 @@
pnodes.forEach(node -> {
String nid = node.getId();
//only count the degrees of intermediate streams
- long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
+ long degree = node.getInEdges().stream().filter(e -> !inputStreams.contains(e)).count();
indegree.put(nid, degree);
if (degree == 0L) {
- // start from the nodes that has no intermediate input streams, so it only consumes from sources
+ // start from the nodes that has no intermediate input streams, so it only consumes from input streams
q.add(node);
visited.add(node);
}
@@ -410,9 +410,9 @@
q.add(minNode);
visited.add(minNode);
} else {
- // all the remaining nodes should be reachable from sources
- // start from sources again to find the next node that hasn't been visited
- JobNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
+ // all the remaining nodes should be reachable from input streams
+ // start from input streams again to find the next node that hasn't been visited
+ JobNode nextNode = inputStreams.stream().flatMap(input -> input.getTargetNodes().stream())
.filter(node -> !visited.contains(node))
.findAny().get();
q.add(nextNode);
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 3a8d5c9..91453d2 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -134,8 +134,8 @@
jobGraphJson.sinkStreams = new HashMap<>();
jobGraphJson.intermediateStreams = new HashMap<>();
jobGraphJson.tables = new HashMap<>();
- jobGraph.getSources().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams));
- jobGraph.getSinks().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams));
+ jobGraph.getInputStreams().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams));
+ jobGraph.getOutputStreams().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams));
jobGraph.getIntermediateStreamEdges().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.intermediateStreams));
jobGraph.getTables().forEach(t -> buildTableJson(t, jobGraphJson.tables));
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index a9f744c..47705ee 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -65,7 +65,6 @@
*/
public class JobNode {
private static final Logger log = LoggerFactory.getLogger(JobNode.class);
- private static final String CONFIG_JOB_PREFIX = "jobs.%s.";
private static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan";
private final String jobName;
@@ -87,7 +86,7 @@
public static Config mergeJobConfig(Config fullConfig, Config generatedConfig) {
return new JobConfig(Util.rewriteConfig(extractScopedConfig(
- fullConfig, generatedConfig, String.format(CONFIG_JOB_PREFIX, new JobConfig(fullConfig).getName().get()))));
+ fullConfig, generatedConfig, String.format(JobConfig.CONFIG_JOB_PREFIX(), new JobConfig(fullConfig).getName().get()))));
}
public OperatorSpecGraph getSpecGraph() {
@@ -203,7 +202,7 @@
log.info("Job {} has generated configs {}", jobName, configs);
- String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
+ String configPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), jobName);
// Disallow user specified job inputs/outputs. This info comes strictly from the user application.
Map<String, String> allowedConfigs = new HashMap<>(config);
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 4ef9f9c..7910216 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -428,7 +428,7 @@
public void afterFailure(Throwable t) {
containerShutdownLatch.countDown();
synchronized (lock) {
- LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException);
+ LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), t);
state = State.STOPPING;
containerException = t;
jobCoordinator.stop();
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index 186b4a8..ae72414 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -37,12 +37,14 @@
/**
* A {@link TableManager} manages tables within a Samza task. For each table, it maintains
- * the {@link TableSpec} and the {@link TableProvider}. It is used at execution for
- * {@link org.apache.samza.container.TaskInstance} to retrieve table instances for
- * read/write operations.
+ * the {@link TableSpec}, the {@link TableProvider} and the {@link Table} instance.
+ * It is used at execution for {@link org.apache.samza.container.TaskInstance} to retrieve
+ * table instances for read/write operations.
*
* A {@link TableManager} is constructed from job configuration, the {@link TableSpec}
- * and {@link TableProvider} are constructed by processing the job configuration.
+ * and {@link TableProvider} are constructed by processing the job configuration
+ * during initialization. The {@link Table} is constructed when {@link #getTable(String)}
+ * is called and cached.
*
* After a {@link TableManager} is constructed, local tables are associated with
* local store instances created during {@link org.apache.samza.container.SamzaContainer}
@@ -51,19 +53,19 @@
* Method {@link TableManager#getTable(String)} will throw {@link IllegalStateException},
* if it's called before initialization.
*
- * For store backed tables, the list of stores must be injected into the constructor.
*/
public class TableManager {
static public class TableCtx {
private TableSpec tableSpec;
private TableProvider tableProvider;
+ private Table table;
}
private final Logger logger = LoggerFactory.getLogger(TableManager.class.getName());
// tableId -> TableCtx
- private final Map<String, TableCtx> tables = new HashMap<>();
+ private final Map<String, TableCtx> tableContexts = new HashMap<>();
private boolean initialized;
@@ -100,7 +102,7 @@
*/
public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
Preconditions.checkNotNull(containerContext, "null container context.");
- tables.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
+ tableContexts.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
initialized = true;
}
@@ -109,7 +111,7 @@
* @param tableSpec the table spec
*/
private void addTable(TableSpec tableSpec) {
- if (tables.containsKey(tableSpec.getId())) {
+ if (tableContexts.containsKey(tableSpec.getId())) {
throw new SamzaException("Table " + tableSpec.getId() + " already exists");
}
TableCtx ctx = new TableCtx();
@@ -117,14 +119,14 @@
Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec);
ctx.tableSpec = tableSpec;
- tables.put(tableSpec.getId(), ctx);
+ tableContexts.put(tableSpec.getId(), ctx);
}
/**
* Shutdown the table manager, internally it shuts down all tables
*/
public void close() {
- tables.values().forEach(ctx -> ctx.tableProvider.close());
+ tableContexts.values().forEach(ctx -> ctx.tableProvider.close());
}
/**
@@ -133,10 +135,14 @@
* @return table instance
*/
public Table getTable(String tableId) {
- if (!initialized) {
- throw new IllegalStateException("TableManager has not been initialized.");
+ Preconditions.checkState(initialized, "TableManager has not been initialized.");
+
+ TableCtx ctx = tableContexts.get(tableId);
+ Preconditions.checkNotNull(ctx, "Unknown tableId " + tableId);
+
+ if (ctx.table == null) {
+ ctx.table = ctx.tableProvider.getTable();
}
- Preconditions.checkArgument(tables.containsKey(tableId), "Unknown tableId=" + tableId);
- return tables.get(tableId).tableProvider.getTable();
+ return ctx.table;
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index 88bc7df..9ef4c1b 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -43,11 +43,11 @@
* @param <V> the type of the value in this table
*/
public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> {
- private final TableWriteFunction<K, V> writeFn;
private DefaultTableWriteMetrics writeMetrics;
@VisibleForTesting
+ final TableWriteFunction<K, V> writeFn;
final TableRateLimiter writeRateLimiter;
public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index 3186fee..b3d82f3 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -80,10 +80,10 @@
protected final ExecutorService callbackExecutor;
protected final ExecutorService tableExecutor;
- private final TableReadFunction<K, V> readFn;
private DefaultTableReadMetrics readMetrics;
@VisibleForTesting
+ final TableReadFunction<K, V> readFn;
final TableRateLimiter<K, V> readRateLimiter;
/**
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
index a8d419d..537ff87 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
@@ -24,6 +24,7 @@
import org.apache.samza.operators.BaseTableDescriptor;
import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.table.utils.SerdeUtils;
import org.apache.samza.util.EmbeddedTaggedRateLimiter;
import org.apache.samza.util.RateLimiter;
@@ -70,6 +71,9 @@
private TableRateLimiter.CreditFunction<K, V> readCreditFn;
private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
+ private TableRetryPolicy readRetryPolicy;
+ private TableRetryPolicy writeRetryPolicy;
+
// By default execute future callbacks on the native client threads
// ie. no additional thread pool for callbacks.
private int asyncCallbackPoolSize = -1;
@@ -115,13 +119,23 @@
"write credit function", writeCreditFn));
}
+ if (readRetryPolicy != null) {
+ tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize(
+ "read retry policy", readRetryPolicy));
+ }
+
+ if (writeRetryPolicy != null) {
+ tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize(
+ "write retry policy", writeRetryPolicy));
+ }
+
tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig);
}
/**
- * Use specified TableReadFunction with remote table.
+ * Use specified TableReadFunction with remote table and a retry policy.
* @param readFn read function instance
* @return this table descriptor instance
*/
@@ -132,7 +146,7 @@
}
/**
- * Use specified TableWriteFunction with remote table.
+ * Use specified TableWriteFunction with remote table and a retry policy.
* @param writeFn write function instance
* @return this table descriptor instance
*/
@@ -143,6 +157,34 @@
}
/**
+ * Use specified TableReadFunction with remote table.
+ * @param readFn read function instance
+ * @param retryPolicy retry policy for the read function
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
+ Preconditions.checkNotNull(readFn, "null read function");
+ Preconditions.checkNotNull(retryPolicy, "null retry policy");
+ this.readFn = readFn;
+ this.readRetryPolicy = retryPolicy;
+ return this;
+ }
+
+ /**
+ * Use specified TableWriteFunction with remote table.
+ * @param writeFn write function instance
+ * @param retryPolicy retry policy for the write function
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
+ Preconditions.checkNotNull(writeFn, "null write function");
+ Preconditions.checkNotNull(retryPolicy, "null retry policy");
+ this.writeFn = writeFn;
+ this.writeRetryPolicy = retryPolicy;
+ return this;
+ }
+
+ /**
* Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
* of credits to be charged from the rate limiter for table read and write operations.
* This is an advanced API that provides greater flexibility to throttle each record in the table
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 6c5d9b3..cae0bbd 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -25,11 +25,16 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.samza.table.Table;
import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.table.utils.BaseTableProvider;
import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.table.utils.TableMetricsUtil;
import org.apache.samza.util.RateLimiter;
import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
@@ -47,6 +52,8 @@
static final String READ_CREDIT_FN = "io.read.credit.func";
static final String WRITE_CREDIT_FN = "io.write.credit.func";
static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
+ static final String READ_RETRY_POLICY = "io.read.retry.policy";
+ static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
private final boolean readOnly;
private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
@@ -58,6 +65,7 @@
*/
private static Map<String, ExecutorService> tableExecutors = new ConcurrentHashMap<>();
private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
+ private static ScheduledExecutorService retryExecutor;
public RemoteTableProvider(TableSpec tableSpec) {
super(tableSpec);
@@ -72,7 +80,7 @@
RemoteReadableTable table;
String tableId = tableSpec.getId();
- TableReadFunction<?, ?> readFn = getReadFn();
+ TableReadFunction readFn = getReadFn();
RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
if (rateLimiter != null) {
rateLimiter.init(containerContext.config, taskContext);
@@ -83,11 +91,33 @@
TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
TableRateLimiter writeRateLimiter = null;
+ TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY);
+ TableRetryPolicy writeRetryPolicy = null;
+
+ if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
+ retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-retry-executor");
+ thread.setDaemon(true);
+ return thread;
+ });
+ }
+
+ if (readRetryPolicy != null) {
+ readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
+ }
+
+ TableWriteFunction writeFn = getWriteFn();
+
boolean isRateLimited = readRateLimiter.isRateLimited();
if (!readOnly) {
writeCreditFn = deserializeObject(WRITE_CREDIT_FN);
writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RL_WRITE_TAG);
isRateLimited |= writeRateLimiter.isRateLimited();
+ writeRetryPolicy = deserializeObject(WRITE_RETRY_POLICY);
+ if (writeRetryPolicy != null) {
+ writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
+ }
}
// Optional executor for future callback/completion. Shared by both read and write operations.
@@ -116,10 +146,18 @@
table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter,
tableExecutors.get(tableId), callbackExecutors.get(tableId));
} else {
- table = new RemoteReadWriteTable(tableSpec.getId(), readFn, getWriteFn(), readRateLimiter,
+ table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter,
writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
}
+ TableMetricsUtil metricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
+ if (readRetryPolicy != null) {
+ ((RetriableReadFunction) readFn).setMetrics(metricsUtil);
+ }
+ if (writeRetryPolicy != null) {
+ ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
+ }
+
table.init(containerContext, taskContext);
tables.add(table);
return table;
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
index 5d0f963..4791779 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
@@ -100,5 +100,12 @@
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().join())));
}
+ /**
+ * Determine whether the current operation can be retried with the last thrown exception.
+ * @param exception exception thrown by a table operation
+ * @return whether the operation can be retried
+ */
+ boolean isRetriable(Throwable exception);
+
// optionally implement readObject() to initialize transient states
}
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
index 0ac3a0c..d9d619f 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
@@ -143,6 +143,13 @@
}
/**
+ * Determine whether the current operation can be retried with the last thrown exception.
+ * @param exception exception thrown by a table operation
+ * @return whether the operation can be retried
+ */
+ boolean isRetriable(Throwable exception);
+
+ /**
* Flush the remote store (optional)
*/
default void flush() {
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java
new file mode 100644
index 0000000..b2eccd8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.SamzaException;
+
+import net.jodah.failsafe.AsyncFailsafe;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
+
+
+/**
+ * Helper class adapting the generic {@link TableRetryPolicy} to a failsafe {@link RetryPolicy} and
+ * creating failsafe retryer instances with proper metrics management.
+ */
+class FailsafeAdapter {
+ /**
+ * Convert the {@link TableRetryPolicy} to failsafe {@link RetryPolicy}.
+ * @return this policy instance
+ */
+ static RetryPolicy valueOf(TableRetryPolicy policy) {
+ RetryPolicy failSafePolicy = new RetryPolicy();
+
+ switch (policy.getBackoffType()) {
+ case NONE:
+ break;
+
+ case FIXED:
+ failSafePolicy.withDelay(policy.getSleepTime().toMillis(), TimeUnit.MILLISECONDS);
+ break;
+
+ case RANDOM:
+ failSafePolicy.withDelay(policy.getRandomMin().toMillis(), policy.getRandomMax().toMillis(), TimeUnit.MILLISECONDS);
+ break;
+
+ case EXPONENTIAL:
+ failSafePolicy.withBackoff(policy.getSleepTime().toMillis(), policy.getExponentialMaxSleep().toMillis(), TimeUnit.MILLISECONDS,
+ policy.getExponentialFactor());
+ break;
+
+ default:
+ throw new SamzaException("Unknown retry policy type.");
+ }
+
+ if (policy.getMaxDuration() != null) {
+ failSafePolicy.withMaxDuration(policy.getMaxDuration().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ if (policy.getMaxAttempts() != null) {
+ failSafePolicy.withMaxRetries(policy.getMaxAttempts());
+ }
+ if (policy.getJitter() != null && policy.getBackoffType() != TableRetryPolicy.BackoffType.RANDOM) {
+ failSafePolicy.withJitter(policy.getJitter().toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ failSafePolicy.retryOn(e -> policy.getRetryPredicate().test(e));
+
+ return failSafePolicy;
+ }
+
+ /**
+ * Obtain an async failsafe retryer instance with the specified policy, metrics, and executor service.
+ * @param retryPolicy retry policy
+ * @param metrics retry metrics
+ * @param retryExec executor service for scheduling async retries
+ * @return {@link net.jodah.failsafe.AsyncFailsafe} instance
+ */
+ static AsyncFailsafe<?> failsafe(RetryPolicy retryPolicy, RetryMetrics metrics, ScheduledExecutorService retryExec) {
+ long startMs = System.currentTimeMillis();
+ return Failsafe.with(retryPolicy).with(retryExec)
+ .onRetry(e -> metrics.retryCount.inc())
+ .onRetriesExceeded(e -> {
+ metrics.retryTimer.update(System.currentTimeMillis() - startMs);
+ metrics.permFailureCount.inc();
+ })
+ .onSuccess((e, ctx) -> {
+ if (ctx.getExecutions() > 1) {
+ metrics.retryTimer.update(System.currentTimeMillis() - startMs);
+ } else {
+ metrics.successCount.inc();
+ }
+ });
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
new file mode 100644
index 0000000..1adddc0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Predicate;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import net.jodah.failsafe.RetryPolicy;
+
+import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
+
+
+/**
+ * Wrapper for a {@link TableReadFunction} instance to add common retry
+ * support with a {@link TableRetryPolicy}. This wrapper is created by
+ * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry
+ * policy is specified together with the {@link TableReadFunction}.
+ *
+ * Actual retry mechanism is provided by the failsafe library. Retry is
+ * attempted in an async way with a {@link ScheduledExecutorService}.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RetriableReadFunction<K, V> implements TableReadFunction<K, V> {
+ private final RetryPolicy retryPolicy;
+ private final TableReadFunction<K, V> readFn;
+ private final ScheduledExecutorService retryExecutor;
+
+ @VisibleForTesting
+ RetryMetrics retryMetrics;
+
+ public RetriableReadFunction(TableRetryPolicy policy, TableReadFunction<K, V> readFn,
+ ScheduledExecutorService retryExecutor) {
+ Preconditions.checkNotNull(policy);
+ Preconditions.checkNotNull(readFn);
+ Preconditions.checkNotNull(retryExecutor);
+
+ this.readFn = readFn;
+ this.retryExecutor = retryExecutor;
+ Predicate<Throwable> retryPredicate = policy.getRetryPredicate();
+ policy.withRetryPredicate((ex) -> readFn.isRetriable(ex) || retryPredicate.test(ex));
+ this.retryPolicy = FailsafeAdapter.valueOf(policy);
+ }
+
+ @Override
+ public CompletableFuture<V> getAsync(K key) {
+ return failsafe(retryPolicy, retryMetrics, retryExecutor)
+ .future(() -> readFn.getAsync(key))
+ .exceptionally(e -> {
+ throw new SamzaException("Failed to get the record for " + key + " after retries.", e);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
+ return failsafe(retryPolicy, retryMetrics, retryExecutor)
+ .future(() -> readFn.getAllAsync(keys))
+ .exceptionally(e -> {
+ throw new SamzaException("Failed to get the records for " + keys + " after retries.", e);
+ });
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return readFn.isRetriable(exception);
+ }
+
+ /**
+ * Initialize retry-related metrics
+ * @param metricsUtil metrics util
+ */
+ public void setMetrics(TableMetricsUtil metricsUtil) {
+ this.retryMetrics = new RetryMetrics("reader", metricsUtil);
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
new file mode 100644
index 0000000..2f3f062
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Predicate;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import net.jodah.failsafe.RetryPolicy;
+
+import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
+
+
+/**
+ * Wrapper for a {@link TableWriteFunction} instance to add common retry
+ * support with a {@link TableRetryPolicy}. This wrapper is created by
+ * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry
+ * policy is specified together with the {@link TableWriteFunction}.
+ *
+ * Actual retry mechanism is provided by the failsafe library. Retry is
+ * attempted in an async way with a {@link ScheduledExecutorService}.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RetriableWriteFunction<K, V> implements TableWriteFunction<K, V> {
+ private final RetryPolicy retryPolicy;
+ private final TableWriteFunction<K, V> writeFn;
+ private final ScheduledExecutorService retryExecutor;
+
+ @VisibleForTesting
+ RetryMetrics retryMetrics;
+
+ public RetriableWriteFunction(TableRetryPolicy policy, TableWriteFunction<K, V> writeFn,
+ ScheduledExecutorService retryExecutor) {
+ Preconditions.checkNotNull(policy);
+ Preconditions.checkNotNull(writeFn);
+ Preconditions.checkNotNull(retryExecutor);
+
+ this.writeFn = writeFn;
+ this.retryExecutor = retryExecutor;
+ Predicate<Throwable> retryPredicate = policy.getRetryPredicate();
+ policy.withRetryPredicate((ex) -> writeFn.isRetriable(ex) || retryPredicate.test(ex));
+ this.retryPolicy = FailsafeAdapter.valueOf(policy);
+ }
+
+ @Override
+ public CompletableFuture<Void> putAsync(K key, V record) {
+ return failsafe(retryPolicy, retryMetrics, retryExecutor)
+ .future(() -> writeFn.putAsync(key, record))
+ .exceptionally(e -> {
+ throw new SamzaException("Failed to get the record for " + key + " after retries.", e);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> putAllAsync(Collection<Entry<K, V>> records) {
+ return failsafe(retryPolicy, retryMetrics, retryExecutor)
+ .future(() -> writeFn.putAllAsync(records))
+ .exceptionally(e -> {
+ throw new SamzaException("Failed to put records after retries.", e);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(K key) {
+ return failsafe(retryPolicy, retryMetrics, retryExecutor)
+ .future(() -> writeFn.deleteAsync(key))
+ .exceptionally(e -> {
+ throw new SamzaException("Failed to delete the record for " + key + " after retries.", e);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAllAsync(Collection<K> keys) {
+ return failsafe(retryPolicy, retryMetrics, retryExecutor)
+ .future(() -> writeFn.deleteAllAsync(keys))
+ .exceptionally(e -> {
+ throw new SamzaException("Failed to delete the records for " + keys + " after retries.", e);
+ });
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return writeFn.isRetriable(exception);
+ }
+
+ /**
+ * Initialize retry-related metrics.
+ * @param metricsUtil metrics util
+ */
+ public void setMetrics(TableMetricsUtil metricsUtil) {
+ this.retryMetrics = new RetryMetrics("writer", metricsUtil);
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java
new file mode 100644
index 0000000..fbc511c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+
+/**
+ * Wrapper of retry-related metrics common to both {@link RetriableReadFunction} and
+ * {@link RetriableWriteFunction}.
+ */
+class RetryMetrics {
+ /**
+ * Number of retries executed (excluding the first attempt)
+ */
+ final Counter retryCount;
+
+ /**
+ * Number of successes with only the first attempt
+ */
+ final Counter successCount;
+
+ /**
+ * Number of operations that failed permanently and exhausted all retries
+ */
+ final Counter permFailureCount;
+
+ /**
+ * Total time spent in each IO; this is updated only
+ * when at least one retries have been attempted.
+ */
+ final Timer retryTimer;
+
+ public RetryMetrics(String prefix, TableMetricsUtil metricsUtil) {
+ retryCount = metricsUtil.newCounter(prefix + "-retry-count");
+ successCount = metricsUtil.newCounter(prefix + "-success-count");
+ permFailureCount = metricsUtil.newCounter(prefix + "-perm-failure-count");
+ retryTimer = metricsUtil.newTimer(prefix + "-retry-timer");
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
new file mode 100644
index 0000000..162eb07
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Common retry policy parameters for table IO. This serves as an abstraction on top of
+ * retry libraries. This common policy supports below features:
+ * - backoff modes: fixed, random, exponential
+ * - termination modes: by attempts, by duration
+ * - jitter
+ *
+ * Retry libraries can implement a subset or all features as described by this common policy.
+ */
+public class TableRetryPolicy implements Serializable {
+ enum BackoffType {
+ /**
+ * No backoff in between two retry attempts.
+ */
+ NONE,
+
+ /**
+ * Backoff by a fixed duration {@code sleepTime}.
+ */
+ FIXED,
+
+ /**
+ * Backoff by a randomly selected duration between {@code minSleep} and {@code maxSleep}.
+ */
+ RANDOM,
+
+ /**
+ * Backoff by exponentially increasing durations by {@code exponentialFactor} starting from {@code sleepTime}.
+ */
+ EXPONENTIAL
+ }
+
+ // Backoff parameters
+ private Duration sleepTime;
+ private Duration randomMin;
+ private Duration randomMax;
+ private double exponentialFactor;
+ private Duration exponentialMaxSleep;
+ private Duration jitter;
+
+ // By default no early termination
+ private Integer maxAttempts = null;
+ private Duration maxDuration = null;
+
+ // By default no backoff during retries
+ private BackoffType backoffType = BackoffType.NONE;
+
+ /**
+ * Serializable adapter interface for {@link java.util.function.Predicate}.
+ * This is needed because TableRetryPolicy needs to be serializable as part of the
+ * table config whereas {@link java.util.function.Predicate} is not serializable.
+ */
+ public interface RetryPredicate extends Predicate<Throwable>, Serializable {
+ }
+
+ // By default no custom retry predicate so retry decision is made solely by the table functions
+ private RetryPredicate retryPredicate = (ex) -> false;
+
+ /**
+ * Set the sleepTime time for the fixed backoff policy.
+ * @param sleepTime sleepTime time
+ * @return this policy instance
+ */
+ public TableRetryPolicy withFixedBackoff(Duration sleepTime) {
+ Preconditions.checkNotNull(sleepTime);
+ this.sleepTime = sleepTime;
+ this.backoffType = BackoffType.FIXED;
+ return this;
+ }
+
+ /**
+ * Set the sleepTime time for the random backoff policy. The actual sleepTime time
+ * before each attempt is randomly selected between {@code [minSleep, maxSleep]}
+ * @param minSleep lower bound sleepTime time
+ * @param maxSleep upper bound sleepTime time
+ * @return this policy instance
+ */
+ public TableRetryPolicy withRandomBackoff(Duration minSleep, Duration maxSleep) {
+ Preconditions.checkNotNull(minSleep);
+ Preconditions.checkNotNull(maxSleep);
+ this.randomMin = minSleep;
+ this.randomMax = maxSleep;
+ this.backoffType = BackoffType.RANDOM;
+ return this;
+ }
+
+ /**
+ * Set the parameters for the exponential backoff policy. The actual sleepTime time
+ * is exponentially incremented up to the {@code maxSleep} and multiplying
+ * successive delays by the {@code factor}.
+ * @param sleepTime initial sleepTime time
+ * @param maxSleep upper bound sleepTime time
+ * @param factor exponential factor for backoff
+ * @return this policy instance
+ */
+ public TableRetryPolicy withExponentialBackoff(Duration sleepTime, Duration maxSleep, double factor) {
+ Preconditions.checkNotNull(sleepTime);
+ Preconditions.checkNotNull(maxSleep);
+ this.sleepTime = sleepTime;
+ this.exponentialMaxSleep = maxSleep;
+ this.exponentialFactor = factor;
+ this.backoffType = BackoffType.EXPONENTIAL;
+ return this;
+ }
+
+ /**
+ * Set the jitter for the backoff policy to provide additional randomness.
+ * If this is set, a random value between {@code [0, jitter]} will be added
+ * to each sleepTime time. This applies to {@code FIXED} and {@code EXPONENTIAL}
+ * modes only.
+ * @param jitter initial sleepTime time
+ * @return this policy instance
+ */
+ public TableRetryPolicy withJitter(Duration jitter) {
+ Preconditions.checkNotNull(jitter);
+ if (backoffType != BackoffType.RANDOM) {
+ this.jitter = jitter;
+ }
+ return this;
+ }
+
+ /**
+ * Set maximum number of attempts before terminating the operation.
+ * @param maxAttempts number of attempts
+ * @return this policy instance
+ */
+ public TableRetryPolicy withStopAfterAttempts(int maxAttempts) {
+ Preconditions.checkArgument(maxAttempts >= 0);
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ /**
+ * Set maximum total delay (sleepTime + execution) before terminating the operation.
+ * @param maxDelay delay time
+ * @return this policy instance
+ */
+ public TableRetryPolicy withStopAfterDelay(Duration maxDelay) {
+ Preconditions.checkNotNull(maxDelay);
+ this.maxDuration = maxDelay;
+ return this;
+ }
+
+ /**
+ * Set the predicate to use for identifying retriable exceptions. If specified, table
+ * retry logic will consult both such predicate and table function and retry will be
+ * attempted if either option returns true.
+ * @param retryPredicate predicate for retriable exception identification
+ * @return this policy instance
+ */
+ public TableRetryPolicy withRetryPredicate(RetryPredicate retryPredicate) {
+ Preconditions.checkNotNull(retryPredicate);
+ this.retryPredicate = retryPredicate;
+ return this;
+ }
+
+ /**
+ * @return initial/fixed sleep time.
+ */
+ public Duration getSleepTime() {
+ return sleepTime;
+ }
+
+ /**
+ * @return lower sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}.
+ */
+ public Duration getRandomMin() {
+ return randomMin;
+ }
+
+ /**
+ * @return upper sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}.
+ */
+ public Duration getRandomMax() {
+ return randomMax;
+ }
+
+ /**
+ * @return exponential factor for exponential backoff.
+ */
+ public double getExponentialFactor() {
+ return exponentialFactor;
+ }
+
+ /**
+ * @return maximum sleepTime time for exponential backoff or null if {@code policyType} is not {@code EXPONENTIAL}.
+ */
+ public Duration getExponentialMaxSleep() {
+ return exponentialMaxSleep;
+ }
+
+ /**
+ * Introduce randomness to the sleepTime time.
+ * @return jitter to add on to each backoff or null if not set.
+ */
+ public Duration getJitter() {
+ return jitter;
+ }
+
+ /**
+ * Termination after a fix number of attempts.
+ * @return maximum number of attempts without success before giving up the operation or null if not set.
+ */
+ public Integer getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ /**
+ * Termination after a fixed duration.
+ * @return maximum duration without success before giving up the operation or null if not set.
+ */
+ public Duration getMaxDuration() {
+ return maxDuration;
+ }
+
+ /**
+ * @return type of the backoff.
+ */
+ public BackoffType getBackoffType() {
+ return backoffType;
+ }
+
+ /**
+ * @return Custom predicate for retriable exception identification or null if not specified.
+ */
+ public RetryPredicate getRetryPredicate() {
+ return retryPredicate;
+ }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index ddcaa5e..fc8780f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -39,6 +39,7 @@
*/
val CONFIG_REWRITERS = "job.config.rewriters" // streaming.job_config_rewriters
val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // streaming.job_config_rewriter_class - regex, system, config
+ val CONFIG_JOB_PREFIX = "jobs.%s."
val JOB_NAME = "job.name" // streaming.job_name
val JOB_ID = "job.id" // streaming.job_id
val SAMZA_FWK_PATH = "samza.fwk.path"
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 61cf6c5..c089225 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -28,10 +28,12 @@
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
@@ -72,7 +74,6 @@
private GenericInputDescriptor<KV<Object, Object>> input2Descriptor;
private StreamSpec input3Spec;
private GenericInputDescriptor<KV<Object, Object>> input3Descriptor;
- private StreamSpec input4Spec;
private GenericInputDescriptor<KV<Object, Object>> input4Descriptor;
private StreamSpec output1Spec;
private GenericOutputDescriptor<KV<Object, Object>> output1Descriptor;
@@ -168,44 +169,49 @@
private StreamApplicationDescriptorImpl createStreamGraphWithJoinAndWindow() {
return new StreamApplicationDescriptorImpl(appDesc -> {
- MessageStream<KV<Object, Object>> messageStream1 =
- appDesc.getInputStream(input1Descriptor)
- .map(m -> m);
+ MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor).map(m -> m);
MessageStream<KV<Object, Object>> messageStream2 =
- appDesc.getInputStream(input2Descriptor)
- .partitionBy(m -> m.key, m -> m.value, "p1")
- .filter(m -> true);
+ appDesc.getInputStream(input2Descriptor).partitionBy(m -> m.key, m -> m.value, "p1").filter(m -> true);
MessageStream<KV<Object, Object>> messageStream3 =
- appDesc.getInputStream(input3Descriptor)
- .filter(m -> true)
- .partitionBy(m -> m.key, m -> m.value, "p2")
- .map(m -> m);
+ appDesc.getInputStream(input3Descriptor).filter(m -> true).partitionBy(m -> m.key, m -> m.value, "p2").map(m -> m);
OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
OutputStream<KV<Object, Object>> output2 = appDesc.getOutputStream(output2Descriptor);
messageStream1.map(m -> m)
- .filter(m->true)
- .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1");
+ .filter(m -> true)
+ .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1");
messageStream2.map(m -> m)
- .filter(m->true)
- .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2");
+ .filter(m -> true)
+ .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2");
+
+ messageStream1.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1").sendTo(output1);
+ messageStream3.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2").sendTo(output2);
+ messageStream3.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3").sendTo(output2);
+ }, config);
+ }
+
+ private StreamApplicationDescriptorImpl createStreamGraphWithInvalidJoin() {
+ /**
+ * input1 (64) --
+ * |
+ * join -> output1 (8)
+ * |
+ * input3 (32) --
+ */
+ return new StreamApplicationDescriptorImpl(appDesc -> {
+ MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor);
+ MessageStream<KV<Object, Object>> messageStream3 = appDesc.getInputStream(input3Descriptor);
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
messageStream1
- .join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1")
- .sendTo(output1);
- messageStream3
- .join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2")
- .sendTo(output2);
- messageStream3
- .join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3")
- .sendTo(output2);
+ .join(messageStream3,
+ (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
+ .sendTo(output1);
}, config);
}
@@ -225,7 +231,6 @@
input1Spec = new StreamSpec("input1", "input1", "system1");
input2Spec = new StreamSpec("input2", "input2", "system2");
input3Spec = new StreamSpec("input3", "input3", "system2");
- input4Spec = new StreamSpec("input4", "input4", "system1");
output1Spec = new StreamSpec("output1", "output1", "system1");
output2Spec = new StreamSpec("output2", "output2", "system2");
@@ -265,8 +270,8 @@
StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
- assertTrue(jobGraph.getSources().size() == 3);
- assertTrue(jobGraph.getSinks().size() == 2);
+ assertTrue(jobGraph.getInputStreams().size() == 3);
+ assertTrue(jobGraph.getOutputStreams().size() == 2);
assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy
}
@@ -276,7 +281,7 @@
StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
- ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
+ ExecutionPlanner.fetchInputAndOutputStreamPartitions(jobGraph, streamManager);
assertTrue(jobGraph.getOrCreateStreamEdge(input1Spec).getPartitionCount() == 64);
assertTrue(jobGraph.getOrCreateStreamEdge(input2Spec).getPartitionCount() == 16);
assertTrue(jobGraph.getOrCreateStreamEdge(input3Spec).getPartitionCount() == 32);
@@ -294,8 +299,8 @@
StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
- ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
- ExecutionPlanner.calculateJoinInputPartitions(jobGraph, config);
+ ExecutionPlanner.fetchInputAndOutputStreamPartitions(jobGraph, streamManager);
+ ExecutionPlanner.calculateJoinInputPartitions(jobGraph, new StreamConfig(config));
// the partitions should be the same as input1
jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -303,6 +308,14 @@
});
}
+ @Test(expected = SamzaException.class)
+ public void testRejectsInvalidJoin() {
+ ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithInvalidJoin();
+
+ planner.plan(graphSpec.getOperatorSpecGraph());
+ }
+
@Test
public void testDefaultPartitions() {
Map<String, String> map = new HashMap<>(config);
@@ -321,7 +334,7 @@
}
@Test
- public void testTriggerIntervalForJoins() throws Exception {
+ public void testTriggerIntervalForJoins() {
Map<String, String> map = new HashMap<>(config);
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
@@ -336,7 +349,7 @@
}
@Test
- public void testTriggerIntervalForWindowsAndJoins() throws Exception {
+ public void testTriggerIntervalForWindowsAndJoins() {
Map<String, String> map = new HashMap<>(config);
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
@@ -352,7 +365,7 @@
}
@Test
- public void testTriggerIntervalWithInvalidWindowMs() throws Exception {
+ public void testTriggerIntervalWithInvalidWindowMs() {
Map<String, String> map = new HashMap<>(config);
map.put(TaskConfig.WINDOW_MS(), "-1");
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
@@ -368,9 +381,8 @@
assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
-
@Test
- public void testTriggerIntervalForStatelessOperators() throws Exception {
+ public void testTriggerIntervalForStatelessOperators() {
Map<String, String> map = new HashMap<>(config);
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
@@ -384,7 +396,7 @@
}
@Test
- public void testTriggerIntervalWhenWindowMsIsConfigured() throws Exception {
+ public void testTriggerIntervalWhenWindowMsIsConfigured() {
Map<String, String> map = new HashMap<>(config);
map.put(TaskConfig.WINDOW_MS(), "2000");
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
@@ -399,7 +411,7 @@
}
@Test
- public void testCalculateIntStreamPartitions() throws Exception {
+ public void testCalculateIntStreamPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph());
@@ -423,10 +435,10 @@
edge.setPartitionCount(16);
edges.add(edge);
- assertEquals(32, ExecutionPlanner.maxPartition(edges));
+ assertEquals(32, ExecutionPlanner.maxPartitions(edges));
edges = Collections.emptyList();
- assertEquals(StreamEdge.PARTITIONS_UNKNOWN, ExecutionPlanner.maxPartition(edges));
+ assertEquals(StreamEdge.PARTITIONS_UNKNOWN, ExecutionPlanner.maxPartitions(edges));
}
@Test
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
index 73452d8..ed35d67 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -74,9 +74,9 @@
JobNode n10 = graph1.getOrCreateJobNode("10", "1");
JobNode n11 = graph1.getOrCreateJobNode("11", "1");
- graph1.addSource(genStream(), n5);
- graph1.addSource(genStream(), n7);
- graph1.addSource(genStream(), n3);
+ graph1.addInputStream(genStream(), n5);
+ graph1.addInputStream(genStream(), n7);
+ graph1.addInputStream(genStream(), n3);
graph1.addIntermediateStream(genStream(), n5, n11);
graph1.addIntermediateStream(genStream(), n7, n11);
graph1.addIntermediateStream(genStream(), n7, n8);
@@ -85,9 +85,9 @@
graph1.addIntermediateStream(genStream(), n11, n9);
graph1.addIntermediateStream(genStream(), n8, n9);
graph1.addIntermediateStream(genStream(), n11, n10);
- graph1.addSink(genStream(), n2);
- graph1.addSink(genStream(), n9);
- graph1.addSink(genStream(), n10);
+ graph1.addOutputStream(genStream(), n2);
+ graph1.addOutputStream(genStream(), n9);
+ graph1.addOutputStream(genStream(), n10);
}
/**
@@ -108,7 +108,7 @@
JobNode n6 = graph2.getOrCreateJobNode("6", "1");
JobNode n7 = graph2.getOrCreateJobNode("7", "1");
- graph2.addSource(genStream(), n1);
+ graph2.addInputStream(genStream(), n1);
graph2.addIntermediateStream(genStream(), n1, n2);
graph2.addIntermediateStream(genStream(), n2, n3);
graph2.addIntermediateStream(genStream(), n3, n4);
@@ -117,7 +117,7 @@
graph2.addIntermediateStream(genStream(), n6, n2);
graph2.addIntermediateStream(genStream(), n5, n5);
graph2.addIntermediateStream(genStream(), n5, n7);
- graph2.addSink(genStream(), n7);
+ graph2.addOutputStream(genStream(), n7);
}
/**
@@ -132,7 +132,7 @@
JobNode n1 = graph3.getOrCreateJobNode("1", "1");
JobNode n2 = graph3.getOrCreateJobNode("2", "1");
- graph3.addSource(genStream(), n1);
+ graph3.addInputStream(genStream(), n1);
graph3.addIntermediateStream(genStream(), n1, n1);
graph3.addIntermediateStream(genStream(), n1, n2);
graph3.addIntermediateStream(genStream(), n2, n2);
@@ -149,7 +149,7 @@
JobNode n1 = graph4.getOrCreateJobNode("1", "1");
- graph4.addSource(genStream(), n1);
+ graph4.addInputStream(genStream(), n1);
graph4.addIntermediateStream(genStream(), n1, n1);
}
@@ -180,12 +180,12 @@
StreamSpec s1 = genStream();
StreamSpec s2 = genStream();
StreamSpec s3 = genStream();
- graph.addSource(s1, n1);
- graph.addSource(s2, n1);
- graph.addSource(s3, n2);
- graph.addSource(s3, n3);
+ graph.addInputStream(s1, n1);
+ graph.addInputStream(s2, n1);
+ graph.addInputStream(s3, n2);
+ graph.addInputStream(s3, n3);
- assertTrue(graph.getSources().size() == 3);
+ assertTrue(graph.getInputStreams().size() == 3);
assertTrue(graph.getOrCreateJobNode("1", "1").getInEdges().size() == 2);
assertTrue(graph.getOrCreateJobNode("2", "1").getInEdges().size() == 1);
@@ -214,11 +214,11 @@
StreamSpec s1 = genStream();
StreamSpec s2 = genStream();
StreamSpec s3 = genStream();
- graph.addSink(s1, n1);
- graph.addSink(s2, n2);
- graph.addSink(s3, n2);
+ graph.addOutputStream(s1, n1);
+ graph.addOutputStream(s2, n2);
+ graph.addOutputStream(s3, n2);
- assertTrue(graph.getSinks().size() == 3);
+ assertTrue(graph.getOutputStreams().size() == 3);
assertTrue(graph.getOrCreateJobNode("1", "1").getOutEdges().size() == 1);
assertTrue(graph.getOrCreateJobNode("2", "1").getOutEdges().size() == 2);
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index 24178d0..42f05c0 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -124,11 +124,14 @@
TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
tableManager.init(mock(SamzaContainerContext.class), mock(TaskContext.class));
- Table table = tableManager.getTable(TABLE_ID);
- verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject());
- Assert.assertEquals(DummyTableProviderFactory.table, table);
+ for (int i = 0; i < 2; i++) {
+ Table table = tableManager.getTable(TABLE_ID);
+ verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject());
+ verify(DummyTableProviderFactory.tableProvider, times(1)).getTable();
+ Assert.assertEquals(DummyTableProviderFactory.table, table);
+ }
- Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tables");
+ Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tableContexts");
TableManager.TableCtx ctx = ctxMap.get(TABLE_ID);
TableSpec tableSpec = getFieldValue(ctx, "tableSpec");
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
index 21fc6a5..3e844c3 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -27,6 +27,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.metrics.Counter;
@@ -34,6 +35,9 @@
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.task.TaskContext;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -43,6 +47,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -50,6 +55,18 @@
public class TestRemoteTable {
+ private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
+
+ public static TaskContext getMockTaskContext() {
+ MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+ doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
+ doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
+ doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
+ TaskContext taskContext = mock(TaskContext.class);
+ doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
+ return taskContext;
+ }
+
private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
return getTable(tableId, readFn, writeFn, null);
@@ -72,12 +89,7 @@
table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
}
- TaskContext taskContext = mock(TaskContext.class);
- MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
- doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
- doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
- doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any());
- doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
+ TaskContext taskContext = getMockTaskContext();
SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
@@ -86,35 +98,53 @@
return (T) table;
}
- private void doTestGet(boolean sync, boolean error) throws Exception {
+ private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
+ String tableId = "testGet-" + sync + error + retry;
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
// Sync is backed by async so needs to mock the async method
CompletableFuture<String> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
+ if (!retry) {
+ doReturn(future).when(readFn).getAsync(anyString());
+ } else {
+ final int [] times = new int[] {0};
+ doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
+ .when(readFn).getAsync(anyString());
+ }
} else {
future = CompletableFuture.completedFuture("bar");
+ doReturn(future).when(readFn).getAsync(anyString());
}
- doReturn(future).when(readFn).getAsync(anyString());
- RemoteReadableTable<String, String> table = getTable("testGet-" + sync + error, readFn, null);
+ if (retry) {
+ doReturn(true).when(readFn).isRetriable(any());
+ TableRetryPolicy policy = new TableRetryPolicy();
+ readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
+ }
+ RemoteReadableTable<String, String> table = getTable(tableId, readFn, null);
Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
verify(table.readRateLimiter, times(1)).throttle(anyString());
}
@Test
public void testGet() throws Exception {
- doTestGet(true, false);
+ doTestGet(true, false, false);
}
@Test
public void testGetAsync() throws Exception {
- doTestGet(false, false);
+ doTestGet(false, false, false);
}
@Test(expected = ExecutionException.class)
public void testGetAsyncError() throws Exception {
- doTestGet(false, true);
+ doTestGet(false, true, false);
+ }
+
+ @Test
+ public void testGetAsyncErrorRetried() throws Exception {
+ doTestGet(false, true, true);
}
@Test
@@ -139,23 +169,36 @@
});
}
- private void doTestPut(boolean sync, boolean error, boolean isDelete) throws Exception {
- TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- RemoteReadWriteTable<String, String> table = getTable("testPut-" + sync + error + isDelete,
- mock(TableReadFunction.class), writeFn);
- CompletableFuture<Void> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
+ private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
+ String tableId = "testPut-" + sync + error + isDelete + retry;
+ TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
+ TableWriteFunction<String, String> writeFn = mockWriteFn;
+ CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
+ CompletableFuture<Void> failureFuture = new CompletableFuture();
+ failureFuture.completeExceptionally(new RuntimeException("Test exception"));
+ if (!error) {
+ if (isDelete) {
+ doReturn(successFuture).when(writeFn).deleteAsync(any());
+ } else {
+ doReturn(successFuture).when(writeFn).putAsync(any(), any());
+ }
+ } else if (!retry) {
+ if (isDelete) {
+ doReturn(failureFuture).when(writeFn).deleteAsync(any());
+ } else {
+ doReturn(failureFuture).when(writeFn).putAsync(any(), any());
+ }
} else {
- future = CompletableFuture.completedFuture(null);
+ doReturn(true).when(writeFn).isRetriable(any());
+ final int [] times = new int[] {0};
+ if (isDelete) {
+ doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
+ } else {
+ doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
+ }
+ writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
}
- // Sync is backed by async so needs to mock the async method
- if (isDelete) {
- doReturn(future).when(writeFn).deleteAsync(any());
- } else {
- doReturn(future).when(writeFn).putAsync(any(), any());
- }
+ RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
if (sync) {
table.put("foo", isDelete ? null : "bar");
} else {
@@ -164,9 +207,9 @@
ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
if (isDelete) {
- verify(writeFn, times(1)).deleteAsync(keyCaptor.capture());
+ verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
} else {
- verify(writeFn, times(1)).putAsync(keyCaptor.capture(), valCaptor.capture());
+ verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
Assert.assertEquals("bar", valCaptor.getValue());
}
Assert.assertEquals("foo", keyCaptor.getValue());
@@ -179,27 +222,32 @@
@Test
public void testPut() throws Exception {
- doTestPut(true, false, false);
+ doTestPut(true, false, false, false);
}
@Test
public void testPutDelete() throws Exception {
- doTestPut(true, false, true);
+ doTestPut(true, false, true, false);
}
@Test
public void testPutAsync() throws Exception {
- doTestPut(false, false, false);
+ doTestPut(false, false, false, false);
}
@Test
public void testPutAsyncDelete() throws Exception {
- doTestPut(false, false, true);
+ doTestPut(false, false, true, false);
}
@Test(expected = ExecutionException.class)
public void testPutAsyncError() throws Exception {
- doTestPut(false, true, false);
+ doTestPut(false, true, false, false);
+ }
+
+ @Test
+ public void testPutAsyncErrorRetried() throws Exception {
+ doTestPut(false, true, false, true);
}
private void doTestDelete(boolean sync, boolean error) throws Exception {
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
index e30da12..efe1acf 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
@@ -31,6 +31,9 @@
import org.apache.samza.metrics.Timer;
import org.apache.samza.table.Table;
import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.EmbeddedTaggedRateLimiter;
import org.apache.samza.util.RateLimiter;
@@ -138,7 +141,9 @@
private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean rlGets, boolean rlPuts) {
int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0);
RemoteTableDescriptor<String, String> desc = new RemoteTableDescriptor("1");
- desc.withReadFunction(mock(TableReadFunction.class));
+ TableRetryPolicy retryPolicy = new TableRetryPolicy();
+ retryPolicy.withRetryPredicate((ex) -> false);
+ desc.withReadFunction(mock(TableReadFunction.class), retryPolicy);
desc.withWriteFunction(mock(TableWriteFunction.class));
desc.withAsyncCallbackExecutorPoolSize(10);
@@ -178,6 +183,9 @@
ThreadPoolExecutor callbackExecutor = (ThreadPoolExecutor) rwTable.callbackExecutor;
Assert.assertEquals(10, callbackExecutor.getCorePoolSize());
+
+ Assert.assertNotNull(rwTable.readFn instanceof RetriableReadFunction);
+ Assert.assertNotNull(!(rwTable.writeFn instanceof RetriableWriteFunction));
}
@Test
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
new file mode 100644
index 0000000..9dd5a74
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.remote.TestRemoteTable;
+import org.apache.samza.table.utils.TableMetricsUtil;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestRetriableTableFunctions {
+ private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
+
+ public TableMetricsUtil getMetricsUtil(String tableId) {
+ Table table = mock(Table.class);
+ SamzaContainerContext cntCtx = mock(SamzaContainerContext.class);
+ TaskContext taskCtx = TestRemoteTable.getMockTaskContext();
+ return new TableMetricsUtil(cntCtx, taskCtx, table, tableId);
+ }
+
+ @Test
+ public void testFirstTimeSuccessGet() throws Exception {
+ String tableId = "testFirstTimeSuccessGet";
+ TableRetryPolicy policy = new TableRetryPolicy();
+ policy.withFixedBackoff(Duration.ofMillis(100));
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ doReturn(true).when(readFn).isRetriable(any());
+ doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
+ RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec);
+ retryIO.setMetrics(getMetricsUtil(tableId));
+ Assert.assertEquals("bar", retryIO.getAsync("foo").get());
+ verify(readFn, times(1)).getAsync(anyString());
+
+ Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount());
+ Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount());
+ Assert.assertEquals(0, retryIO.retryMetrics.retryTimer.getSnapshot().getMax());
+ }
+
+ @Test
+ public void testRetryEngagedGet() throws Exception {
+ String tableId = "testRetryEngagedGet";
+ TableRetryPolicy policy = new TableRetryPolicy();
+ policy.withFixedBackoff(Duration.ofMillis(10));
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ doReturn(true).when(readFn).isRetriable(any());
+
+ int [] times = new int[] {0};
+ Map<String, String> map = new HashMap<>();
+ map.put("foo1", "bar1");
+ map.put("foo2", "bar2");
+ doAnswer(invocation -> {
+ CompletableFuture<Map<String, String>> future = new CompletableFuture();
+ if (times[0] > 0) {
+ future.complete(map);
+ } else {
+ times[0]++;
+ future.completeExceptionally(new RuntimeException("test exception"));
+ }
+ return future;
+ }).when(readFn).getAllAsync(any());
+
+ RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec);
+ retryIO.setMetrics(getMetricsUtil(tableId));
+ Assert.assertEquals(map, retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get());
+ verify(readFn, times(2)).getAllAsync(any());
+
+ Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount());
+ Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+ Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+ }
+
+ @Test
+ public void testRetryExhaustedTimeGet() throws Exception {
+ String tableId = "testRetryExhaustedTime";
+ TableRetryPolicy policy = new TableRetryPolicy();
+ policy.withFixedBackoff(Duration.ofMillis(5));
+ policy.withStopAfterDelay(Duration.ofMillis(100));
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ doReturn(true).when(readFn).isRetriable(any());
+
+ CompletableFuture<String> future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("test exception"));
+ doReturn(future).when(readFn).getAsync(anyString());
+
+ RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec);
+ retryIO.setMetrics(getMetricsUtil(tableId));
+ try {
+ retryIO.getAsync("foo").get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ }
+
+ // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms maxDelay
+ verify(readFn, atLeast(3)).getAsync(anyString());
+ Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3);
+ Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+ Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+ }
+
+ @Test
+ public void testRetryExhaustedAttemptsGet() throws Exception {
+ String tableId = "testRetryExhaustedAttempts";
+ TableRetryPolicy policy = new TableRetryPolicy();
+ policy.withFixedBackoff(Duration.ofMillis(5));
+ policy.withStopAfterAttempts(10);
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ doReturn(true).when(readFn).isRetriable(any());
+
+ CompletableFuture<String> future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("test exception"));
+ doReturn(future).when(readFn).getAllAsync(any());
+
+ RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec);
+ retryIO.setMetrics(getMetricsUtil(tableId));
+ try {
+ retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ }
+
+ // 1 initial try + 10 retries
+ verify(readFn, times(11)).getAllAsync(any());
+ Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount());
+ Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+ Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+ }
+
+ @Test
+ public void testFirstTimeSuccessPut() throws Exception {
+ String tableId = "testFirstTimeSuccessPut";
+ TableRetryPolicy policy = new TableRetryPolicy();
+ policy.withFixedBackoff(Duration.ofMillis(100));
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ doReturn(true).when(writeFn).isRetriable(any());
+ doReturn(CompletableFuture.completedFuture("bar")).when(writeFn).putAsync(anyString(), anyString());
+ RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec);
+ retryIO.setMetrics(getMetricsUtil(tableId));
+ retryIO.putAsync("foo", "bar").get();
+ verify(writeFn, times(1)).putAsync(anyString(), anyString());
+
+ Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount());
+ Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount());
+ Assert.assertEquals(0, retryIO.retryMetrics.retryTimer.getSnapshot().getMax());
+ }
+
+ @Test
+ public void testRetryEngagedPut() throws Exception {
+ String tableId = "testRetryEngagedPut";
+ TableRetryPolicy policy = new TableRetryPolicy();
+ policy.withFixedBackoff(Duration.ofMillis(10));
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any());
+ doReturn(true).when(writeFn).isRetriable(any());
+
+ int [] times = new int[] {0};
+ List<Entry<String, String>> records = new ArrayList<>();
+ records.add(new Entry<>("foo1", "bar1"));
+ records.add(new Entry<>("foo2", "bar2"));
+ doAnswer(invocation -> {
+ CompletableFuture<Map<String, String>> future = new CompletableFuture();
+ if (times[0] > 0) {
+ future.complete(null);
+ } else {
+ times[0]++;
+ future.completeExceptionally(new RuntimeException("test exception"));
+ }
+ return future;
+ }).when(writeFn).putAllAsync(any());
+
+ RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec);
+ retryIO.setMetrics(getMetricsUtil(tableId));
+ retryIO.putAllAsync(records).get();
+ verify(writeFn, times(2)).putAllAsync(any());
+
+ Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount());
+ Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+ Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+ }
+
+ @Test
+ public void testRetryExhaustedTimePut() throws Exception {
+ String tableId = "testRetryExhaustedTimePut";
+ TableRetryPolicy policy = new TableRetryPolicy();
+ policy.withFixedBackoff(Duration.ofMillis(5));
+ policy.withStopAfterDelay(Duration.ofMillis(100));
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ doReturn(true).when(writeFn).isRetriable(any());
+
+ CompletableFuture<String> future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("test exception"));
+ doReturn(future).when(writeFn).deleteAsync(anyString());
+
+ RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec);
+ retryIO.setMetrics(getMetricsUtil(tableId));
+ try {
+ retryIO.deleteAsync("foo").get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ }
+
+ // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms maxDelay
+ verify(writeFn, atLeast(3)).deleteAsync(anyString());
+ Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3);
+ Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+ Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+ }
+
+ @Test
+ public void testRetryExhaustedAttemptsPut() throws Exception {
+ String tableId = "testRetryExhaustedAttemptsPut";
+ TableRetryPolicy policy = new TableRetryPolicy();
+ policy.withFixedBackoff(Duration.ofMillis(5));
+ policy.withStopAfterAttempts(10);
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ doReturn(true).when(writeFn).isRetriable(any());
+
+ CompletableFuture<String> future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("test exception"));
+ doReturn(future).when(writeFn).deleteAllAsync(any());
+
+ RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec);
+ retryIO.setMetrics(getMetricsUtil(tableId));
+ try {
+ retryIO.deleteAllAsync(Arrays.asList("foo1", "foo2")).get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ }
+
+ // 1 initial try + 10 retries
+ verify(writeFn, times(11)).deleteAllAsync(any());
+ Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount());
+ Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+ Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+ }
+
+ @Test
+ public void testMixedIsRetriablePredicates() throws Exception {
+ String tableId = "testMixedIsRetriablePredicates";
+ TableRetryPolicy policy = new TableRetryPolicy();
+ policy.withFixedBackoff(Duration.ofMillis(100));
+
+ // Retry should be attempted based on the custom classification, ie. retry on NPE
+ policy.withRetryPredicate(ex -> ex instanceof NullPointerException);
+
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+
+ // Table fn classification only retries on IllegalArgumentException
+ doAnswer(arg -> arg.getArgumentAt(0, Throwable.class) instanceof IllegalArgumentException).when(readFn).isRetriable(any());
+
+ int [] times = new int[1];
+ doAnswer(arg -> {
+ if (times[0]++ == 0) {
+ CompletableFuture<String> future = new CompletableFuture();
+ future.completeExceptionally(new NullPointerException("test exception"));
+ return future;
+ } else {
+ return CompletableFuture.completedFuture("bar");
+ }
+ }).when(readFn).getAsync(any());
+
+ RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec);
+ retryIO.setMetrics(getMetricsUtil(tableId));
+
+ Assert.assertEquals("bar", retryIO.getAsync("foo").get());
+
+ verify(readFn, times(2)).getAsync(anyString());
+ Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount());
+ Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+ Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+ }
+
+}
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java
new file mode 100644
index 0000000..c343d63
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.time.Duration;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import net.jodah.failsafe.RetryPolicy;
+
+
+public class TestTableRetryPolicy {
+ @Test
+ public void testNoRetry() {
+ TableRetryPolicy retryPolicy = new TableRetryPolicy();
+ Assert.assertEquals(TableRetryPolicy.BackoffType.NONE, retryPolicy.getBackoffType());
+ }
+
+ @Test
+ public void testFixedRetry() {
+ TableRetryPolicy retryPolicy = new TableRetryPolicy();
+ retryPolicy.withFixedBackoff(Duration.ofMillis(1000));
+ retryPolicy.withJitter(Duration.ofMillis(100));
+ retryPolicy.withStopAfterAttempts(4);
+ Assert.assertEquals(TableRetryPolicy.BackoffType.FIXED, retryPolicy.getBackoffType());
+ RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy);
+ Assert.assertEquals(1000, fsRetry.getDelay().toMillis());
+ Assert.assertEquals(100, fsRetry.getJitter().toMillis());
+ Assert.assertEquals(4, fsRetry.getMaxRetries());
+ Assert.assertNotNull(retryPolicy.getRetryPredicate());
+ }
+
+ @Test
+ public void testRandomRetry() {
+ TableRetryPolicy retryPolicy = new TableRetryPolicy();
+ retryPolicy.withRandomBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000));
+ retryPolicy.withJitter(Duration.ofMillis(100)); // no-op
+ Assert.assertEquals(TableRetryPolicy.BackoffType.RANDOM, retryPolicy.getBackoffType());
+ RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy);
+ Assert.assertEquals(1000, fsRetry.getDelayMin().toMillis());
+ Assert.assertEquals(2000, fsRetry.getDelayMax().toMillis());
+ }
+
+ @Test
+ public void testExponentialRetry() {
+ TableRetryPolicy retryPolicy = new TableRetryPolicy();
+ retryPolicy.withExponentialBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000), 1.5);
+ retryPolicy.withJitter(Duration.ofMillis(100));
+ Assert.assertEquals(TableRetryPolicy.BackoffType.EXPONENTIAL, retryPolicy.getBackoffType());
+ RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy);
+ Assert.assertEquals(1000, fsRetry.getDelay().toMillis());
+ Assert.assertEquals(2000, fsRetry.getMaxDelay().toMillis());
+ Assert.assertEquals(1.5, fsRetry.getDelayFactor(), 0.001);
+ Assert.assertEquals(100, fsRetry.getJitter().toMillis());
+ }
+
+ @Test
+ public void testCustomRetryPredicate() {
+ TableRetryPolicy retryPolicy = new TableRetryPolicy();
+ retryPolicy.withRetryPredicate((e) -> e instanceof IllegalArgumentException);
+ Assert.assertTrue(retryPolicy.getRetryPredicate().test(new IllegalArgumentException()));
+ Assert.assertFalse(retryPolicy.getRetryPredicate().test(new NullPointerException()));
+ }
+}
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
index 28a1bac..0d50f26 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -37,6 +37,7 @@
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
+import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
@@ -221,6 +222,17 @@
if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) {
return null;
}
+ /*
+ * Properly handle END_OF_STREAM offset here. If both are END_OF_STREAM,
+ * then they are equal. Otherwise END_OF_STREAM is always greater than any
+ * other offsets.
+ */
+ if (offset1.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
+ return offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET) ? 0 : 1;
+ }
+ if (offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
+ return -1;
+ }
int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1);
int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2);
if (fileIndex1 == fileIndex2) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 5aaad26..7071b39 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -45,6 +45,7 @@
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
import org.apache.samza.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,7 +114,8 @@
tableFieldNames);
Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
- Serde<SamzaSqlRelMessage> valueSerde = new JsonSerdeV2<>(SamzaSqlRelMessage.class);
+ SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+ (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
// Always re-partition the messages from the input stream by the composite key and then join the messages
// with the table.
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 477d5b8..033bcdf 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -191,7 +191,8 @@
public TestRunner addOverrideConfig(String key, String value) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
- configs.put(key, value);
+ String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), JOB_NAME);
+ configs.put(String.format("%s%s", configKeyPrefix, key), value);
return this;
}
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 3b2d08a..b249d4d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -87,9 +87,9 @@
private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
- private static final String TASK_SHUTDOWN_MS = "2000";
- private static final String JOB_DEBOUNCE_TIME_MS = "2000";
- private static final String BARRIER_TIMEOUT_MS = "2000";
+ private static final String TASK_SHUTDOWN_MS = "10000";
+ private static final String JOB_DEBOUNCE_TIME_MS = "10000";
+ private static final String BARRIER_TIMEOUT_MS = "10000";
private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
private String inputKafkaTopic;
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 0d9df8b..5c067ad 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -32,6 +32,7 @@
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.serializers.IntegerSerde;
@@ -66,8 +67,7 @@
Arrays.asList(TestTableData.generateProfiles(10)));
}
- // @Test
- // TODO: re-enable after fixing the coordinator stream issue in SAMZA-1786
+ @Test
public void testJoinWithDurableSideInputTable() {
runTest(
"durable-side-input",
@@ -98,6 +98,7 @@
.addInputStream(profileStream)
.addOutputStream(outputStream)
.addConfigs(new MapConfig(configs))
+ .addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString())
.run(Duration.ofMillis(100000));
try {
@@ -116,7 +117,6 @@
assertEquals("Mismatch between the expected and actual join count", results.size(),
expectedEnrichedPageviews.size());
assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin);
-
} catch (InterruptedException e) {
e.printStackTrace();
}
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index d79683e..4cf99ff 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -92,6 +92,11 @@
return CompletableFuture.completedFuture(profileMap.get(key));
}
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+
static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) {
return new InMemoryReadFunction(serializedProfiles);
}
@@ -124,6 +129,11 @@
records.remove(key);
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
}
private <K, V> Table<KV<K, V>> getCachingTable(Table<KV<K, V>> actualTable, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
@@ -142,6 +152,18 @@
return appDesc.getTable(cachingDesc);
}
+ static class MyReadFunction implements TableReadFunction {
+ @Override
+ public CompletableFuture getAsync(Object key) {
+ return null;
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+ }
+
private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception {
final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
@@ -166,9 +188,12 @@
.withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
.withRateLimiter(readRateLimiter, null, null);
+ // dummy reader
+ TableReadFunction readFn = new MyReadFunction();
+
RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
outputTableDesc
- .withReadFunction(key -> null) // dummy reader
+ .withReadFunction(readFn)
.withWriteFunction(writer)
.withRateLimiter(writeRateLimiter, null, null);
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
index 41b6509..34ffbd4 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
@@ -24,6 +24,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.ConfigRewriter;
@@ -116,15 +118,27 @@
public static class MySampleNonTableDescriptorsProvider {
}
+ static class MyReadFunction implements TableReadFunction {
+ @Override
+ public CompletableFuture getAsync(Object key) {
+ return null;
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+ }
+
public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider {
@Override
public List<TableDescriptor> getTableDescriptors(Config config) {
List<TableDescriptor> tableDescriptors = new ArrayList<>();
final RateLimiter readRateLimiter = mock(RateLimiter.class);
- final TableReadFunction readRemoteTable = (TableReadFunction) key -> null;
+ final MyReadFunction readFn = new MyReadFunction();
tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
- .withReadFunction(readRemoteTable)
+ .withReadFunction(readFn)
.withRateLimiter(readRateLimiter, null, null)
.withSerde(KVSerde.of(new StringSerde(), new LongSerde())));
tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")