APEXCORE-602: group events by cause
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index ed9248a..09478eb 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -49,6 +49,8 @@
import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
+import org.apache.apex.stram.GroupingManager;
+import org.apache.apex.stram.GroupingRequest.EventGroupId;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
@@ -170,6 +172,7 @@
private StramDelegationTokenManager delegationTokenManager = null;
private AppDataPushAgent appDataPushAgent;
private ApexPluginDispatcher apexPluginDispatcher;
+ private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance();
public StreamingAppMasterService(ApplicationAttemptId appAttemptID)
{
@@ -972,7 +975,8 @@
launchContainer.run(); // communication with NMs is now async
// record container start event
- StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(), allocatedContainer.getNodeId().toString());
+ StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(),
+ allocatedContainer.getNodeId().toString(), groupingManager.getEventGroupIdForAffectedContainer(allocatedContainer.getId().toString()));
ev.setTimestamp(timestamp);
dnmgr.recordEventAsync(ev);
}
@@ -997,6 +1001,7 @@
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
delegationTokenManager.cancelToken(allocatedContainer.delegationToken, ugi.getUserName());
}
+ EventGroupId groupId = null;
int exitStatus = containerStatus.getExitStatus();
if (0 != exitStatus) {
if (allocatedContainer != null) {
@@ -1039,7 +1044,9 @@
// Recoverable failure or process killed (externally or via stop request by AM)
// also occurs when a container was released by the application but never assigned/launched
LOG.debug("Container {} failed or killed.", containerStatus.getContainerId());
- dnmgr.scheduleContainerRestart(containerStatus.getContainerId().toString());
+ String containerIdStr = containerStatus.getContainerId().toString();
+ dnmgr.scheduleContainerRestart(containerIdStr);
+ groupId = groupingManager.getEventGroupIdForAffectedContainer(containerIdStr);
// }
} else {
// container completed successfully
@@ -1057,7 +1064,7 @@
dnmgr.removeContainerAgent(containerIdStr);
// record container stop event
- StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus());
+ StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus(), groupId);
ev.setReason(containerStatus.getDiagnostics());
dnmgr.recordEventAsync(ev);
}
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 510a146..8d2406f 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -69,6 +69,8 @@
import org.apache.apex.engine.plugin.ApexPluginDispatcher;
import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
import org.apache.apex.engine.util.CascadeStorageAgent;
+import org.apache.apex.stram.GroupingManager;
+import org.apache.apex.stram.GroupingRequest.EventGroupId;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -260,6 +262,7 @@
//logical operator name to latest counters. exists for backward compatibility.
private final Map<String, Object> latestLogicalCounters = Maps.newHashMap();
public transient ApexPluginDispatcher apexPluginDispatcher = new NoOpApexPluginDispatcher();
+ private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance();
private final LinkedHashMap<String, ContainerInfo> completedContainers = new LinkedHashMap<String, ContainerInfo>()
{
@@ -799,7 +802,7 @@
if (sca.lastHeartbeatMillis != -1) {
String msg = String.format("Container %s@%s heartbeat timeout (%d%n ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis);
LOG.warn(msg);
- StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(), msg, null);
+ StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(), msg, null, null);
stramEvent.setReason(msg);
recordEventAsync(stramEvent);
sca.lastHeartbeatMillis = -1;
@@ -1163,6 +1166,9 @@
}
includeLocalUpstreamOperators(ctx);
+ groupingManager.addOrModifyGroupingRequest(containerId, ctx.visited);
+ groupingManager.removeProcessedGroupingRequests();
+
// redeploy cycle for all affected operators
LOG.info("Affected operators {}", ctx.visited);
deploy(Collections.<PTContainer>emptySet(), ctx.visited, Sets.newHashSet(cs.container), ctx.visited);
@@ -1204,7 +1210,7 @@
if (containerAgent != null) {
// record operator stop for this container
for (PTOperator oper : containerAgent.container.getOperators()) {
- StramEvent ev = new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), containerId);
+ StramEvent ev = new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), containerId, groupingManager.getEventGroupIdForContainer(containerId));
recordEventAsync(ev);
}
containerAgent.container.setFinishedTime(System.currentTimeMillis());
@@ -1279,6 +1285,8 @@
return null;
}
+ groupingManager.addNewContainerToGroupingRequest(container.getExternalId(), resource.containerId);
+
pendingAllocation.remove(container);
container.setState(PTContainer.State.ALLOCATED);
if (container.getExternalId() != null) {
@@ -1381,13 +1389,16 @@
sca.undeployOpers.add(oper.getId());
slowestUpstreamOp.remove(oper);
// record operator stop event
- recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
+ recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId(), null));
break;
case FAILED:
processOperatorFailure(oper);
sca.undeployOpers.add(oper.getId());
slowestUpstreamOp.remove(oper);
- recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
+
+ EventGroupId groupId = groupingManager.getEventGroupIdForContainer(oper.getContainer().getExternalId());
+ recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(),
+ oper.getContainer().getExternalId(), groupId));
break;
case ACTIVE:
default:
@@ -1397,8 +1408,9 @@
break;
case PENDING_UNDEPLOY:
if (ds == null) {
+ EventGroupId groupId = groupingManager.moveOperatorFromUndeployListToDeployList(oper);
// operator no longer deployed in container
- recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
+ recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId(), groupId));
oper.setState(State.PENDING_DEPLOY);
sca.deployOpers.add(oper);
} else {
@@ -1418,7 +1430,9 @@
oper.setState(PTOperator.State.ACTIVE);
oper.stats.lastHeartbeat = null; // reset on redeploy
oper.stats.lastWindowIdChangeTms = clock.getTime();
- recordEventAsync(new StramEvent.StartOperatorEvent(oper.getName(), oper.getId(), container.getExternalId()));
+ EventGroupId groupId = groupingManager.getEventGroupIdForOperatorToDeploy(oper.getId());
+ recordEventAsync(new StramEvent.StartOperatorEvent(oper.getName(), oper.getId(), container.getExternalId(), groupId));
+ groupingManager.removeOperatorFromGroupingRequest(oper.getId());
}
break;
default:
@@ -1427,7 +1441,7 @@
// operator was removed and needs to be undeployed from container
sca.undeployOpers.add(oper.getId());
slowestUpstreamOp.remove(oper);
- recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
+ recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId(), null));
}
}
}
@@ -2421,6 +2435,7 @@
// operator will be deployed after it has been undeployed, if still referenced by the container
if (oper.getState() != PTOperator.State.PENDING_UNDEPLOY) {
oper.setState(PTOperator.State.PENDING_DEPLOY);
+ groupingManager.addOperatorToDeploy(oper.getContainer().getExternalId(), oper);
}
}
}
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
index 76f89bd..8401931 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
@@ -20,12 +20,15 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.log.LogFileInformation;
-
+import org.apache.apex.stram.GroupingManager;
+import org.apache.apex.stram.GroupingRequest;
+import org.apache.apex.stram.GroupingRequest.EventGroupId;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -176,20 +179,29 @@
@Override
public void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException
{
+ EventGroupId groupId = getGroupIdForNewGroupingRequest(containerId);
if (operators == null || operators.length == 0) {
- dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg, logFileInfo));
+ dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg, logFileInfo, groupId));
} else {
for (int operator : operators) {
OperatorInfo operatorInfo = dagManager.getOperatorInfo(operator);
if (operatorInfo != null) {
- dagManager.recordEventAsync(new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg,
- logFileInfo));
+ dagManager.recordEventAsync(
+ new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg, logFileInfo, groupId));
}
}
}
log(containerId, msg);
}
+ //create new group the deploy request, request data will be populated when sub-dag restart happens
+ private EventGroupId getGroupIdForNewGroupingRequest(String containerId)
+ {
+ GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance();
+ GroupingRequest groupingRequest = groupingManager.addOrModifyGroupingRequest(containerId, Collections.EMPTY_SET);
+ return groupingRequest.getEventGroupId();
+ }
+
@Override
public StreamingContainerContext getInitContext(String containerId)
throws IOException
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
index 8af90bc..6224856 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
@@ -21,6 +21,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.apex.log.LogFileInformation;
+import org.apache.apex.stram.GroupingRequest.EventGroupId;
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
@@ -38,6 +39,7 @@
private String reason;
private LogLevel logLevel;
private LogFileInformation logFileInformation;
+ private EventGroupId groupId;
public abstract String getType();
@@ -48,9 +50,15 @@
protected StramEvent(LogLevel logLevel, LogFileInformation logFileInformation)
{
+ this(logLevel, logFileInformation, null);
+ }
+
+ protected StramEvent(LogLevel logLevel, LogFileInformation logFileInformation, EventGroupId groupId)
+ {
id = nextId.getAndIncrement();
this.logLevel = logLevel;
this.logFileInformation = logFileInformation;
+ this.groupId = groupId;
}
public long getId()
@@ -98,6 +106,16 @@
this.logFileInformation = logFileInformation;
}
+ public EventGroupId getGroupId()
+ {
+ return groupId;
+ }
+
+ public void setGroupId(EventGroupId groupId)
+ {
+ this.groupId = groupId;
+ }
+
public static enum LogLevel
{
TRACE,
@@ -114,12 +132,12 @@
public OperatorEvent(String operatorName, LogLevel logLevel)
{
- this(operatorName, logLevel, null);
+ this(operatorName, logLevel, null, null);
}
- public OperatorEvent(String operatorName, LogLevel logLevel, LogFileInformation logFileInformation)
+ public OperatorEvent(String operatorName, LogLevel logLevel, LogFileInformation logFileInformation, EventGroupId groupId)
{
- super(logLevel, logFileInformation);
+ super(logLevel, logFileInformation, groupId);
this.operatorName = operatorName;
}
@@ -231,13 +249,18 @@
public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel)
{
- this(operatorName, operatorId, logLevel, null);
+ this(operatorName, operatorId, logLevel, null, null);
+ }
+
+ public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel, EventGroupId groupId)
+ {
+ this(operatorName, operatorId, logLevel, null, groupId);
}
public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel,
- LogFileInformation logFileInformation)
+ LogFileInformation logFileInformation, EventGroupId groupId)
{
- super(operatorName, logLevel, logFileInformation);
+ super(operatorName, logLevel, logFileInformation, groupId);
this.operatorId = operatorId;
}
@@ -292,14 +315,14 @@
{
private String containerId;
- public StartOperatorEvent(String operatorName, int operatorId, String containerId)
+ public StartOperatorEvent(String operatorName, int operatorId, String containerId, EventGroupId groupId)
{
- this(operatorName, operatorId, containerId, LogLevel.INFO);
+ this(operatorName, operatorId, containerId, LogLevel.INFO, groupId);
}
- public StartOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel logLevel)
+ public StartOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel logLevel, EventGroupId groupId)
{
- super(operatorName, operatorId, logLevel);
+ super(operatorName, operatorId, logLevel, groupId);
this.containerId = containerId;
}
@@ -325,14 +348,14 @@
{
private String containerId;
- public StopOperatorEvent(String operatorName, int operatorId, String containerId)
+ public StopOperatorEvent(String operatorName, int operatorId, String containerId, EventGroupId groupId)
{
- this(operatorName, operatorId, containerId, LogLevel.WARN);
+ this(operatorName, operatorId, containerId, LogLevel.WARN, groupId);
}
- public StopOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel logLevel)
+ public StopOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel logLevel, EventGroupId groupId)
{
- super(operatorName, operatorId, logLevel);
+ super(operatorName, operatorId, logLevel, groupId);
this.containerId = containerId;
}
@@ -404,14 +427,14 @@
String containerId;
String containerNodeId;
- public StartContainerEvent(String containerId, String containerNodeId)
+ public StartContainerEvent(String containerId, String containerNodeId, EventGroupId groupId)
{
- this(containerId, containerNodeId, LogLevel.INFO);
+ this(containerId, containerNodeId, LogLevel.INFO, groupId);
}
- public StartContainerEvent(String containerId, String containerNodeId, LogLevel logLevel)
+ public StartContainerEvent(String containerId, String containerNodeId, LogLevel logLevel, EventGroupId groupId)
{
- super(logLevel);
+ super(logLevel, null, groupId);
this.containerId = containerId;
this.containerNodeId = containerNodeId;
}
@@ -449,14 +472,14 @@
String containerId;
int exitStatus;
- public StopContainerEvent(String containerId, int exitStatus)
+ public StopContainerEvent(String containerId, int exitStatus, EventGroupId groupId)
{
- this(containerId, exitStatus, LogLevel.WARN);
+ this(containerId, exitStatus, LogLevel.WARN, groupId);
}
- public StopContainerEvent(String containerId, int exitStatus, LogLevel logLevel)
+ public StopContainerEvent(String containerId, int exitStatus, LogLevel logLevel, EventGroupId groupId)
{
- super(logLevel);
+ super(logLevel, null, groupId);
this.containerId = containerId;
this.exitStatus = exitStatus;
}
@@ -528,15 +551,15 @@
private String errorMessage;
public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage,
- LogFileInformation logFileInformation)
+ LogFileInformation logFileInformation, EventGroupId groupId)
{
- this(operatorName, operatorId, containerId, errorMessage, logFileInformation, LogLevel.ERROR);
+ this(operatorName, operatorId, containerId, errorMessage, logFileInformation, groupId, LogLevel.ERROR);
}
public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage,
- LogFileInformation logFileInformation, LogLevel logLevel)
+ LogFileInformation logFileInformation, EventGroupId groupId, LogLevel logLevel)
{
- super(operatorName, operatorId, logLevel, logFileInformation);
+ super(operatorName, operatorId, logLevel, logFileInformation, groupId);
this.containerId = containerId;
this.errorMessage = errorMessage;
}
@@ -574,15 +597,15 @@
private String containerId;
private String errorMessage;
- public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation)
+ public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation, EventGroupId groupId)
{
- this(containerId, errorMessage, logFileInformation, LogLevel.ERROR);
+ this(containerId, errorMessage, logFileInformation, groupId, LogLevel.ERROR);
}
- public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation,
+ public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation, EventGroupId groupId,
LogLevel logLevel)
{
- super(logLevel, logFileInformation);
+ super(logLevel, logFileInformation, groupId);
this.containerId = containerId;
this.errorMessage = errorMessage;
}
diff --git a/engine/src/main/java/org/apache/apex/stram/GroupingManager.java b/engine/src/main/java/org/apache/apex/stram/GroupingManager.java
new file mode 100644
index 0000000..b160dd6
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/stram/GroupingManager.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.stram;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.stram.GroupingRequest.EventGroupId;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.stram.plan.physical.PTOperator;
+
+/**
+ * This class manages tracking ids of deploy/undeploy for containers and
+ * operators.
+ *
+ */
+public class GroupingManager
+{
+ private static final GroupingManager groupingManager = new GroupingManager();
+ private Map<String, GroupingRequest> groupingRequests = Maps.newHashMap();
+
+ public static GroupingManager getGroupingManagerInstance()
+ {
+ return groupingManager;
+ }
+
+ /**
+ * Retruns all available grouping requests with StrAM
+ * @return groupingRequests
+ */
+ public Map<String, GroupingRequest> getGroupingRequests()
+ {
+ return groupingRequests;
+ }
+
+ /**
+ * Returns grouping request for container
+ * @param containerId
+ * @return groupingRequest
+ */
+ public GroupingRequest getGroupingRequest(String containerId)
+ {
+ return groupingRequests.get(containerId);
+ }
+
+ /**
+ * Returns deploy/undeploy group Id for container
+ * @param containerId
+ * @return groupId <br/>
+ * <b>Note:</b> groupId 0 indicates and independent event, with no
+ * group
+ */
+ public EventGroupId getEventGroupIdForContainer(String containerId)
+ {
+ EventGroupId groupId = null;
+ if (groupingRequests.get(containerId) != null) {
+ groupId = groupingRequests.get(containerId).getEventGroupId();
+ }
+ return groupId;
+ }
+
+ /**
+ * Returns deploy/undeploy group Id for container This could be a new
+ * container allocated during redeploy process
+ * @param containerId
+ * @return groupId <br/>
+ * <b>Note:</b> groupId 0 indicates and indipendent event, with no
+ * group
+ */
+ public EventGroupId getEventGroupIdForAffectedContainer(String containerId)
+ {
+ EventGroupId groupId = getEventGroupIdForContainer(containerId);
+ if (groupId != null) {
+ return groupId;
+ }
+ for (GroupingRequest request : getGroupingRequests().values()) {
+ if (request.getAffectedContainers().contains(containerId)) {
+ groupId = request.getEventGroupId();
+ }
+ }
+ return groupId;
+ }
+
+ /**
+ * Returns grouping groupId for operator which is to undergo deploy. Operators
+ * undergoing deploy for first time will have groupId as 0
+ * @param operatorId
+ * @return groupId <br/>
+ * <b>Note:</b> groupId 0 indicates and indipendent event, with no
+ * group
+ */
+ public EventGroupId getEventGroupIdForOperatorToDeploy(int operatorId)
+ {
+ for (GroupingRequest request : getGroupingRequests().values()) {
+ if (request.getOperatorsToDeploy().contains(operatorId)) {
+ return request.getEventGroupId();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Adds operator to deploy. The operator is added to request associated with containerId
+ * @param containerIs
+ * @param operator
+ */
+ public void addOperatorToDeploy(String containerId, PTOperator oper)
+ {
+ GroupingRequest request = getGroupingRequest(containerId);
+ if (request != null) {
+ request.addOperatorToDeploy(oper.getId());
+ }
+ }
+
+ /**
+ * Removes operator from grouping request
+ */
+ public boolean removeOperatorFromGroupingRequest(int operatorId)
+ {
+ for (GroupingRequest request : getGroupingRequests().values()) {
+ if (request.getOperatorsToDeploy().contains((operatorId))) {
+ return request.removeOperatorToDeploy(operatorId);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Remove groupingRequest from StrAM if it has no more pending operators to deploy
+ * @param containerId
+ */
+ public void removeProcessedGroupingRequests()
+ {
+ for (Entry<String, GroupingRequest> request : groupingRequests.entrySet()) {
+ if (request.getValue().getOperatorsToDeploy().size() == 0
+ && request.getValue().getOperatorsToUndeploy().size() == 0) {
+ LOG.info("Removing for :" + request.getKey());
+ groupingRequests.remove(request.getKey());
+ }
+ }
+
+ }
+
+ /**
+ * Create groupingRequest to group deploy/undeploy of related container/operator
+ * events under one groupId to find related events.
+ * To start will all related operators are added to opertorsToUndeploy list,
+ * they will eventually move to operatorsToDeploy when operator undergo redeploy cycle.
+ * @param containerId
+ * @param affectedOperators
+ */
+ public GroupingRequest addOrModifyGroupingRequest(String containerId, Set<PTOperator> affectedOperators)
+ {
+ GroupingRequest request = groupingRequests.get(containerId);
+ if (request == null) {
+ request = new GroupingRequest();
+ groupingRequests.put(containerId, request);
+ }
+ for (PTOperator oper : affectedOperators) {
+ request.addOperatorToUndeploy(oper.getId());
+ request.addAffectedContainer(oper.getContainer().getExternalId());
+ }
+ return request;
+ }
+
+ /**
+ * Add affectedContainerId to deploy request, if container is deployed as part
+ * of redeploy process of groupLeaderContainer
+ * @param groupLeaderContainerId
+ * @param affectedContainerId
+ */
+ public void addNewContainerToGroupingRequest(String groupLeaderContainerId, String affectedContainerId)
+ {
+ if (groupLeaderContainerId != null && affectedContainerId != null) {
+ GroupingRequest request = getGroupingRequest(groupLeaderContainerId);
+ if (request != null) {
+ request.addAffectedContainer(affectedContainerId);
+ }
+ }
+ }
+
+ /**
+ * When operator state changes from PENDING_UNDEPLOY to PENDING_DEPLOY move
+ * operator from operatorsToUndeploy to operatorsToDeploy
+ * @param operator
+ * @return groupId
+ */
+ public EventGroupId moveOperatorFromUndeployListToDeployList(PTOperator oper)
+ {
+ EventGroupId groupId = null;
+ for (GroupingRequest request : groupingRequests.values()) {
+ if (request.getOperatorsToUndeploy().contains(oper.getId())) {
+ groupId = request.getEventGroupId();
+ request.removeOperatorToUndeploy(oper.getId());
+ request.addOperatorToDeploy(oper.getId());
+ }
+ }
+ return groupId;
+ }
+
+ /**
+ * Clear all grouping requests
+ */
+ public void clearAllGroupingRequests()
+ {
+ groupingRequests.clear();
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(GroupingManager.class);
+}
diff --git a/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java b/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java
new file mode 100644
index 0000000..d107a38
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.stram;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.stram.util.AbstractWritableAdapter;
+
+/**
+ * Grouping request keeps track of operators whose start/stop events should be grouped.
+ */
+public class GroupingRequest
+{
+ private EventGroupId eventGroupId;
+ private Set<Integer> operatorsToDeploy = Sets.newHashSet();
+ private Set<Integer> operatorsToUndeploy = Sets.newHashSet();
+ private Set<String> affectedContainers = Sets.newHashSet();
+
+ public GroupingRequest()
+ {
+ eventGroupId = EventGroupId.newEventGroupId();
+ }
+
+ public GroupingRequest(EventGroupId groupId)
+ {
+ this.eventGroupId = groupId;
+ }
+
+ /**
+ * Gets EventGroupId
+ * @return eventGroupId
+ */
+ public EventGroupId getEventGroupId()
+ {
+ return eventGroupId;
+ }
+
+ /**
+ * Gets operators to deploy as part of deploy request
+ * @return operatorsToDeploy
+ */
+ public Set<Integer> getOperatorsToDeploy()
+ {
+ return operatorsToDeploy;
+ }
+
+ /**
+ * Gets operators to undeploy as part of deploy request
+ * @return operatorsToUndeploy
+ */
+ public Set<Integer> getOperatorsToUndeploy()
+ {
+ return operatorsToUndeploy;
+ }
+
+ /**
+ * Gets containers affected by deploy request
+ * @return affectedContainers
+ */
+ public Set<String> getAffectedContainers()
+ {
+ return affectedContainers;
+ }
+
+ /**
+ * Adds operator to deploy request's list of operators to deploy
+ * @param operatorId
+ */
+ public void addOperatorToDeploy(int operatorId)
+ {
+ operatorsToDeploy.add(operatorId);
+ }
+
+ /**
+ * Removes operator from deploy request's list of operators to deploy
+ * @param operatorId
+ * @return ifRemoved
+ */
+ public boolean removeOperatorToDeploy(int operatorId)
+ {
+ return operatorsToDeploy.remove(operatorId);
+ }
+
+ /**
+ * Adds operator to deploy request's list of operators to undeploy
+ * @param operatorId
+ */
+ public void addOperatorToUndeploy(int operatorId)
+ {
+ operatorsToUndeploy.add(operatorId);
+ }
+
+ /**
+ * Removes operator from deploy request's list of operators to undeploy
+ * @param operatorId
+ * @return ifRemoved
+ */
+ public boolean removeOperatorToUndeploy(int operatorId)
+ {
+ return operatorsToUndeploy.remove(operatorId);
+ }
+
+ /**
+ * Adds container to deploy request's list of affected containers.
+ * @param containerId
+ */
+ public void addAffectedContainer(String containerId)
+ {
+ affectedContainers.add(containerId);
+ }
+
+ /**
+ * EventGroupId is used to club relevant events. Events triggered by common
+ * cause are considered as relevant events.
+ *
+ */
+ public static class EventGroupId extends AbstractWritableAdapter
+ {
+ private static final long serialVersionUID = 1L;
+ private static final AtomicInteger idSequence = new AtomicInteger();
+ private int groupId;
+
+ public static EventGroupId newEventGroupId()
+ {
+ EventGroupId id = new EventGroupId();
+ id.groupId = idSequence.incrementAndGet();
+ return id;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + groupId;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ EventGroupId other = (EventGroupId)obj;
+ if (groupId != other.groupId) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "EventGroupId [groupId=" + groupId + "]";
+ }
+
+ }
+}
diff --git a/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java b/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java
new file mode 100644
index 0000000..7e33e97
--- /dev/null
+++ b/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apach.apex.stram;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.apex.stram.GroupingManager;
+import org.apache.apex.stram.GroupingRequest;
+
+import com.google.common.collect.ImmutableSet;
+
+import com.datatorrent.stram.plan.physical.PTContainer;
+import com.datatorrent.stram.plan.physical.PTOperator;
+
+import static org.mockito.Mockito.when;
+
+public class GroupingManagerTest
+{
+
+ @Mock
+ private PTOperator oper1;
+ @Mock
+ private PTOperator oper2;
+ @Mock
+ private PTContainer testContainer;
+ private String affectedContainerId = "container_4";
+ private GroupingManager underTest;
+
+ @Before
+ public void setup()
+ {
+ underTest = GroupingManager.getGroupingManagerInstance();
+ MockitoAnnotations.initMocks(this);
+
+ when(oper1.getId()).thenReturn(1);
+ when(oper2.getId()).thenReturn(2);
+ when(oper1.getContainer()).thenReturn(testContainer);
+ when(oper2.getContainer()).thenReturn(testContainer);
+ when(testContainer.getExternalId()).thenReturn(affectedContainerId);
+ }
+
+ @Test
+ public void testAddNewDeploy()
+ {
+ String failedContainerId = "container_1";
+ underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1, oper2));
+ Assert.assertEquals(1, underTest.getGroupingRequests().size());
+ GroupingRequest request = underTest.getGroupingRequest(failedContainerId);
+ Assert.assertTrue(request.getAffectedContainers().contains(affectedContainerId));
+ Assert.assertTrue(request.getOperatorsToUndeploy().contains(oper1.getId()));
+ Assert.assertTrue(request.getOperatorsToUndeploy().contains(oper2.getId()));
+ }
+
+ @Test
+ public void testAddOperatorToGroupingRequest()
+ {
+ String failedContainerId = "container_1";
+ underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1));
+ GroupingRequest request = underTest.getGroupingRequest(failedContainerId);
+ Assert.assertFalse(request.getOperatorsToDeploy().contains(oper2.getId()));
+ underTest.addOperatorToDeploy(failedContainerId, oper2);
+ Assert.assertTrue(request.getOperatorsToDeploy().contains(oper2.getId()));
+ }
+
+ @Test
+ public void testGetDeployGroupIdForContainer()
+ {
+ String failedContainerId = "container_1";
+ underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1));
+ GroupingRequest request = underTest.getGroupingRequest(failedContainerId);
+
+ Assert.assertEquals(request.getEventGroupId(), underTest.getEventGroupIdForContainer(failedContainerId));
+ }
+
+ @Test
+ public void testGetDeployGroupIdForOperator()
+ {
+ String failedContainerId = "container_1";
+ underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1));
+ underTest.addOperatorToDeploy(failedContainerId, oper1); //consider operator moved from PENDING_UNDEPLOY to DENDING_DEPLOY state
+ GroupingRequest request = underTest.getGroupingRequest(failedContainerId);
+
+ Assert.assertEquals(request.getEventGroupId(), underTest.getEventGroupIdForOperatorToDeploy(oper1.getId()));
+ }
+
+ @Test
+ public void testMoveOperatorFromUndeployListToDeployList()
+ {
+ String failedContainerId = "container_1";
+ underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1));
+ underTest.moveOperatorFromUndeployListToDeployList(oper1);
+ GroupingRequest request = underTest.getGroupingRequest(failedContainerId);
+
+ Assert.assertFalse(request.getOperatorsToUndeploy().contains(oper1.getId()));
+ Assert.assertTrue(request.getOperatorsToDeploy().contains(oper1.getId()));
+ }
+
+ @Test
+ public void testAddNewContainerToGroupingRequest()
+ {
+ String groupLeaderContainerId = "container_1";
+ String newAffectedContainerId = "container_11";
+ underTest.addOrModifyGroupingRequest(groupLeaderContainerId, ImmutableSet.of(oper1));
+ underTest.addNewContainerToGroupingRequest(groupLeaderContainerId, newAffectedContainerId);
+
+ GroupingRequest request = underTest.getGroupingRequest(groupLeaderContainerId);
+ Assert.assertTrue(request.getAffectedContainers().contains(newAffectedContainerId));
+ }
+
+ @Test
+ public void testRemoveProcessedGroupingRequest()
+ {
+ underTest.addOrModifyGroupingRequest(affectedContainerId, ImmutableSet.of(oper1));
+ Assert.assertEquals(1, underTest.getGroupingRequests().size());
+ underTest.moveOperatorFromUndeployListToDeployList(oper1); //move from updeploy to deploy list
+ underTest.removeOperatorFromGroupingRequest(oper1.getId());
+ underTest.removeProcessedGroupingRequests();
+ Assert.assertEquals(0, underTest.getGroupingRequests().size());
+
+ }
+
+ @After
+ public void teardown()
+ {
+ underTest.clearAllGroupingRequests();
+ }
+}
diff --git a/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java b/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java
new file mode 100644
index 0000000..3417715
--- /dev/null
+++ b/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apach.apex.stram;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.apex.stram.GroupingRequest;
+
+public class GroupingRequestTest
+{
+ private GroupingRequest underTest;
+
+ @Before
+ public void setup()
+ {
+ underTest = new GroupingRequest();
+ }
+
+ @Test
+ public void testAddAffectedContainer()
+ {
+ String affectedContainerId = "container_000001";
+ underTest.addAffectedContainer(affectedContainerId);
+ Assert.assertTrue(underTest.getAffectedContainers().contains(affectedContainerId));
+ }
+
+ @Test
+ public void testAddOperatorToUndeploy()
+ {
+ int operatorId = 1;
+ underTest.addOperatorToUndeploy(operatorId);
+ Assert.assertTrue(underTest.getOperatorsToUndeploy().contains(operatorId));
+ }
+
+ @Test
+ public void testAddOperatorToDeploy()
+ {
+ int operatorId = 1;
+ underTest.addOperatorToDeploy(operatorId);
+ Assert.assertTrue(underTest.getOperatorsToDeploy().contains(operatorId));
+ }
+
+ @Test
+ public void testRemoveOperatorToUndeploy()
+ {
+ int operatorId = 1;
+ underTest.addOperatorToUndeploy(operatorId);
+ Assert.assertTrue(underTest.getOperatorsToUndeploy().contains(operatorId));
+ underTest.removeOperatorToUndeploy(operatorId);
+ Assert.assertFalse(underTest.getOperatorsToUndeploy().contains(operatorId));
+ }
+
+ @Test
+ public void testRemoveOperatorToDeploy()
+ {
+ int operatorId = 1;
+ underTest.addOperatorToDeploy(operatorId);
+ Assert.assertTrue(underTest.getOperatorsToDeploy().contains(operatorId));
+ underTest.removeOperatorToDeploy(operatorId);
+ Assert.assertFalse(underTest.getOperatorsToDeploy().contains(operatorId));
+ }
+
+}