[hotfix] Fix checkstyle violations in ExecutionJobVertex
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e5b7aa5..e0ed7ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -317,7 +317,7 @@
public ExecutionGraph getGraph() {
return graph;
}
-
+
public JobVertex getJobVertex() {
return jobVertex;
}
@@ -344,33 +344,33 @@
public JobID getJobId() {
return graph.getJobID();
}
-
+
@Override
public JobVertexID getJobVertexId() {
return jobVertex.getID();
}
-
+
@Override
public ExecutionVertex[] getTaskVertices() {
return taskVertices;
}
-
+
public IntermediateResult[] getProducedDataSets() {
return producedDataSets;
}
-
+
public InputSplitAssigner getSplitAssigner() {
return splitAssigner;
}
-
+
public SlotSharingGroup getSlotSharingGroup() {
return slotSharingGroup;
}
-
+
public CoLocationGroup getCoLocationGroup() {
return coLocationGroup;
}
-
+
public List<IntermediateResult> getInputs() {
return inputs;
}
@@ -423,28 +423,28 @@
//---------------------------------------------------------------------------------------------
-
+
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
-
+
List<JobEdge> inputs = jobVertex.getInputs();
-
+
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(), jobVertex.getName(), inputs.size()));
}
-
+
for (int num = 0; num < inputs.size(); num++) {
JobEdge edge = inputs.get(num);
-
+
if (LOG.isDebugEnabled()) {
if (edge.getSource() == null) {
- LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
+ LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
num, jobVertex.getID(), jobVertex.getName(), edge.getSourceId()));
} else {
LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
num, jobVertex.getID(), jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
}
}
-
+
// fetch the intermediate result via ID. if it does not exist, then it either has not been created, or the order
// in which this method is called for the job vertices is not a topological order
IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
@@ -452,18 +452,18 @@
throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
+ edge.getSourceId());
}
-
+
this.inputs.add(ires);
-
+
int consumerIndex = ires.registerConsumer();
-
+
for (int i = 0; i < parallelism; i++) {
ExecutionVertex ev = taskVertices[i];
ev.connectSource(num, ires, edge, consumerIndex);
}
}
}
-
+
//---------------------------------------------------------------------------------------------
// Actions
//---------------------------------------------------------------------------------------------
@@ -480,7 +480,7 @@
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint) {
-
+
final ExecutionVertex[] vertices = this.taskVertices;
final ArrayList<CompletableFuture<Void>> scheduleFutures = new ArrayList<>(vertices.length);
@@ -497,9 +497,9 @@
* Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns
* pairs of the slots and execution attempts, to ease correlation between vertices and execution
* attempts.
- *
+ *
* <p>If this method throws an exception, it makes sure to release all so far requested slots.
- *
+ *
* @param resourceProvider The resource provider from whom the slots are requested.
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location preferences