[GRIFFIN-290] Fix bug for submitting job to livy
When griffin submit multiple DQ jobs to livy, the http parameter `name` is always griffin.
So livy will reject them.
job request :
```
[owner: null, request: [proxyUser: None, file: hdfs://nameservice-standby/user/kun.wan/measure-0.6.0-SNAPSHOT.jar,
args: {
"spark" :
Unknown macro: { "log.level" }
,
"sinks" : [
Unknown macro: { .... }
],
"griffin.checkpoint" : [ ]
},{
"measure.type" : "griffin",
"id" : 5202,
"name" : "spu_null_check",
"owner" : "test",
"description" : "check null value for store and category",
"deleted" : false,
"timestamp" : 1568195100000,
"dq.type" : "PROFILING",
"sinks" : [ "ELASTICSEARCH", "HDFS" ],
"process.type" : "BATCH",
"rule.description" :
,
"data.sources" : [
Unknown macro: { .... }
],
"evaluate.rule" :
,
"measure.type" : "griffin"
},raw,raw, driverMemory: 1g, executorMemory: 6g, executorCores: 2, numExecutors: 6, queue: root.users.kun_dot_wan, name: griffin]]
```
livy Response :
```
400 Bad Request
[Date:"Thu, 12 Sep 2019 10:00:00 GMT", Content-Type:"application/json;charset=utf-8", Content-Length:"47", Server:"Jetty(9.3.24.v20180605)"]
{"msg":"Duplicate session name: Some(griffin)"}
```
Author: wankunde <wankunde@163.com>
Closes #534 from wankunde/livy_bug.
diff --git a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
index cf50527..54cc582 100644
--- a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
+++ b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
@@ -51,6 +51,7 @@
import org.springframework.http.MediaType;
import org.springframework.security.kerberos.client.KerberosRestTemplate;
import org.springframework.stereotype.Component;
+import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
@Component
@@ -247,8 +248,14 @@
HttpEntity<String> springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap), headers);
result = restTemplate.postForObject(uri, springEntity, String.class);
LOGGER.info(result);
+ } catch (HttpClientErrorException e) {
+ LOGGER.error("Post to livy ERROR. \n response status : " + e.getMessage()
+ + "\n response header : " + e.getResponseHeaders()
+ + "\n response body : " + e.getResponseBodyAsString());
} catch (JsonProcessingException e) {
- LOGGER.error("Post to livy ERROR. \n {}", e.getMessage());
+ LOGGER.error("Json Parsing failed, {}", e.getMessage(), e);
+ } catch (Exception e) {
+ LOGGER.error("Post to livy ERROR. \n {}", e);
}
return result;
} else {
@@ -261,8 +268,14 @@
HttpEntity<String> springEntity = null;
try {
springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap), headers);
+ } catch (HttpClientErrorException e) {
+ LOGGER.error("Post to livy ERROR. \n response status : " + e.getMessage()
+ + "\n response header : " + e.getResponseHeaders()
+ + "\n response body : " + e.getResponseBodyAsString());
} catch (JsonProcessingException e) {
LOGGER.error("Json Parsing failed, {}", e.getMessage(), e);
+ } catch (Exception e) {
+ LOGGER.error("Post to livy ERROR. {}", e.getMessage(), e);
}
String result = restTemplate.postForObject(uri, springEntity, String.class);
LOGGER.info(result);
diff --git a/service/src/main/resources/sparkProperties.json b/service/src/main/resources/sparkProperties.json
index 8fb023c..3bd4e21 100644
--- a/service/src/main/resources/sparkProperties.json
+++ b/service/src/main/resources/sparkProperties.json
@@ -1,7 +1,6 @@
{
"file": "hdfs:///griffin/griffin-measure.jar",
"className": "org.apache.griffin.measure.Application",
- "name": "griffin",
"queue": "default",
"numExecutors": 2,
"executorCores": 1,
@@ -12,4 +11,4 @@
},
"files": [
]
-}
\ No newline at end of file
+}