[FLINK-35223][rest] Add jobType in JobDetailsInfo related rest api
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 8e78ae8..000be08 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -43,6 +43,7 @@
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
@@ -1268,6 +1269,7 @@
                         "foobar",
                         false,
                         JobStatus.RUNNING,
+                        JobType.STREAMING,
                         1,
                         2,
                         1,
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index f58f74d..6fcee2f 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -869,6 +869,10 @@
           "type" : "string",
           "enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
         },
+        "job-type" : {
+          "type" : "string",
+          "enum" : [ "BATCH", "STREAMING" ]
+        },
         "start-time" : {
           "type" : "integer"
         },
@@ -2839,31 +2843,6 @@
       }
     }
   }, {
-    "url" : "/jobs/:jobid/vertices/:vertexid/jm-operator-metrics",
-    "method" : "GET",
-    "status-code" : "200 OK",
-    "file-upload" : false,
-    "path-parameters" : {
-      "pathParameters" : [ {
-        "key" : "jobid"
-      }, {
-        "key" : "vertexid"
-      } ]
-    },
-    "query-parameters" : {
-      "queryParameters" : [ {
-        "key" : "get",
-        "mandatory" : false
-      } ]
-    },
-    "request" : {
-      "type" : "object",
-      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
-    },
-    "response" : {
-      "type" : "any"
-    }
-  }, {
     "url" : "/jobs/:jobid/vertices/:vertexid/flamegraph",
     "method" : "GET",
     "status-code" : "200 OK",
@@ -2917,6 +2896,31 @@
       }
     }
   }, {
+    "url" : "/jobs/:jobid/vertices/:vertexid/jm-operator-metrics",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      }, {
+        "key" : "vertexid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ {
+        "key" : "get",
+        "mandatory" : false
+      } ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "any"
+    }
+  }, {
     "url" : "/jobs/:jobid/vertices/:vertexid/metrics",
     "method" : "GET",
     "status-code" : "200 OK",
@@ -4186,4 +4190,4 @@
       }
     }
   } ]
-}
+}
\ No newline at end of file
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index cc6a62d..dba2640 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -146,6 +146,7 @@
                 executionGraph.getJobName(),
                 executionGraph.isStoppable(),
                 executionGraph.getState(),
+                executionGraph.getJobType(),
                 startTime,
                 endTime,
                 duration,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
index ca9ee5e..2504070 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
@@ -57,6 +58,8 @@
 
     public static final String FIELD_NAME_JOB_STATUS = "state";
 
+    public static final String FIELD_NAME_JOB_TYPE = "job-type";
+
     public static final String FIELD_NAME_START_TIME = "start-time";
 
     public static final String FIELD_NAME_END_TIME = "end-time";
@@ -89,6 +92,9 @@
     @JsonProperty(FIELD_NAME_JOB_STATUS)
     private final JobStatus jobStatus;
 
+    @JsonProperty(FIELD_NAME_JOB_TYPE)
+    private final JobType jobType;
+
     @JsonProperty(FIELD_NAME_START_TIME)
     private final long startTime;
 
@@ -123,6 +129,7 @@
             @JsonProperty(FIELD_NAME_JOB_NAME) String name,
             @JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable,
             @JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus,
+            @JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType,
             @JsonProperty(FIELD_NAME_START_TIME) long startTime,
             @JsonProperty(FIELD_NAME_END_TIME) long endTime,
             @JsonProperty(FIELD_NAME_DURATION) long duration,
@@ -138,6 +145,7 @@
         this.name = Preconditions.checkNotNull(name);
         this.isStoppable = isStoppable;
         this.jobStatus = Preconditions.checkNotNull(jobStatus);
+        this.jobType = Preconditions.checkNotNull(jobType);
         this.startTime = startTime;
         this.endTime = endTime;
         this.duration = duration;
@@ -167,6 +175,7 @@
                 && Objects.equals(jobId, that.jobId)
                 && Objects.equals(name, that.name)
                 && jobStatus == that.jobStatus
+                && jobType == that.jobType
                 && Objects.equals(timestamps, that.timestamps)
                 && Objects.equals(jobVertexInfos, that.jobVertexInfos)
                 && Objects.equals(jobVerticesPerState, that.jobVerticesPerState)
@@ -180,6 +189,7 @@
                 name,
                 isStoppable,
                 jobStatus,
+                jobType,
                 startTime,
                 endTime,
                 duration,
@@ -212,6 +222,11 @@
     }
 
     @JsonIgnore
+    public JobType getJobType() {
+        return jobType;
+    }
+
+    @JsonIgnore
     public long getStartTime() {
         return startTime;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
index 74e8960..6bb69b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
@@ -74,6 +75,7 @@
                 "foobar",
                 true,
                 JobStatus.values()[random.nextInt(JobStatus.values().length)],
+                JobType.STREAMING,
                 1L,
                 2L,
                 1L,