APEXCORE-736 Using YARN client api to fetch the application master container report, this closes #534
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index c4e76a5..510a146 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -84,6 +84,9 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -168,7 +171,6 @@
import com.datatorrent.stram.util.FSJsonLineFile;
import com.datatorrent.stram.util.MovingAverage.MovingAverageLong;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
-import com.datatorrent.stram.util.WebServicesClient;
import com.datatorrent.stram.webapp.ContainerInfo;
import com.datatorrent.stram.webapp.LogicalOperatorInfo;
import com.datatorrent.stram.webapp.OperatorAggregationInfo;
@@ -485,11 +487,12 @@
String nodeHttpAddress = nmHost + ":" + nmHttpPort;
if (allocatedMemoryMB == 0) {
String url = ConfigUtils.getSchemePrefix(conf) + nodeHttpAddress + "/ws/v1/node/containers/" + ci.id;
- WebServicesClient webServicesClient = new WebServicesClient();
- try {
- String content = webServicesClient.process(url, String.class, new WebServicesClient.GetWebServicesHandler<String>());
- JSONObject json = new JSONObject(content);
- int totalMemoryNeededMB = json.getJSONObject("container").getInt("totalMemoryNeededMB");
+ try (YarnClient rmClient = YarnClient.createYarnClient()) {
+ rmClient.init(conf);
+ rmClient.start();
+ ContainerReport content = rmClient.getContainerReport(ContainerId.fromString(ci.id));
+ int totalMemoryNeededMB = content.getAllocatedResource().getMemory();
+ LOG.debug("App Master allocated memory is {}", totalMemoryNeededMB);
if (totalMemoryNeededMB > 0) {
allocatedMemoryMB = totalMemoryNeededMB;
} else {