[FLINK-35223][rest] Add jobType in JobDetailsInfo related rest api
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 8e78ae8..000be08 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -43,6 +43,7 @@
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
@@ -1268,6 +1269,7 @@
"foobar",
false,
JobStatus.RUNNING,
+ JobType.STREAMING,
1,
2,
1,
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index f58f74d..6fcee2f 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -869,6 +869,10 @@
"type" : "string",
"enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
},
+ "job-type" : {
+ "type" : "string",
+ "enum" : [ "BATCH", "STREAMING" ]
+ },
"start-time" : {
"type" : "integer"
},
@@ -2839,31 +2843,6 @@
}
}
}, {
- "url" : "/jobs/:jobid/vertices/:vertexid/jm-operator-metrics",
- "method" : "GET",
- "status-code" : "200 OK",
- "file-upload" : false,
- "path-parameters" : {
- "pathParameters" : [ {
- "key" : "jobid"
- }, {
- "key" : "vertexid"
- } ]
- },
- "query-parameters" : {
- "queryParameters" : [ {
- "key" : "get",
- "mandatory" : false
- } ]
- },
- "request" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
- },
- "response" : {
- "type" : "any"
- }
- }, {
"url" : "/jobs/:jobid/vertices/:vertexid/flamegraph",
"method" : "GET",
"status-code" : "200 OK",
@@ -2917,6 +2896,31 @@
}
}
}, {
+ "url" : "/jobs/:jobid/vertices/:vertexid/jm-operator-metrics",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "jobid"
+ }, {
+ "key" : "vertexid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ {
+ "key" : "get",
+ "mandatory" : false
+ } ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "any"
+ }
+ }, {
"url" : "/jobs/:jobid/vertices/:vertexid/metrics",
"method" : "GET",
"status-code" : "200 OK",
@@ -4186,4 +4190,4 @@
}
}
} ]
-}
+}
\ No newline at end of file
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index cc6a62d..dba2640 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -146,6 +146,7 @@
executionGraph.getJobName(),
executionGraph.isStoppable(),
executionGraph.getState(),
+ executionGraph.getJobType(),
startTime,
endTime,
duration,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
index ca9ee5e..2504070 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.ResponseBody;
@@ -57,6 +58,8 @@
public static final String FIELD_NAME_JOB_STATUS = "state";
+ public static final String FIELD_NAME_JOB_TYPE = "job-type";
+
public static final String FIELD_NAME_START_TIME = "start-time";
public static final String FIELD_NAME_END_TIME = "end-time";
@@ -89,6 +92,9 @@
@JsonProperty(FIELD_NAME_JOB_STATUS)
private final JobStatus jobStatus;
+ @JsonProperty(FIELD_NAME_JOB_TYPE)
+ private final JobType jobType;
+
@JsonProperty(FIELD_NAME_START_TIME)
private final long startTime;
@@ -123,6 +129,7 @@
@JsonProperty(FIELD_NAME_JOB_NAME) String name,
@JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable,
@JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus,
+ @JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType,
@JsonProperty(FIELD_NAME_START_TIME) long startTime,
@JsonProperty(FIELD_NAME_END_TIME) long endTime,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@@ -138,6 +145,7 @@
this.name = Preconditions.checkNotNull(name);
this.isStoppable = isStoppable;
this.jobStatus = Preconditions.checkNotNull(jobStatus);
+ this.jobType = Preconditions.checkNotNull(jobType);
this.startTime = startTime;
this.endTime = endTime;
this.duration = duration;
@@ -167,6 +175,7 @@
&& Objects.equals(jobId, that.jobId)
&& Objects.equals(name, that.name)
&& jobStatus == that.jobStatus
+ && jobType == that.jobType
&& Objects.equals(timestamps, that.timestamps)
&& Objects.equals(jobVertexInfos, that.jobVertexInfos)
&& Objects.equals(jobVerticesPerState, that.jobVerticesPerState)
@@ -180,6 +189,7 @@
name,
isStoppable,
jobStatus,
+ jobType,
startTime,
endTime,
duration,
@@ -212,6 +222,11 @@
}
@JsonIgnore
+ public JobType getJobType() {
+ return jobType;
+ }
+
+ @JsonIgnore
public long getStartTime() {
return startTime;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
index 74e8960..6bb69b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
@@ -74,6 +75,7 @@
"foobar",
true,
JobStatus.values()[random.nextInt(JobStatus.values().length)],
+ JobType.STREAMING,
1L,
2L,
1L,