Added ability to get ipAddress -> nodeName map while contributing scheduling constraints

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_aqua_changes@419 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index a431939..0987bd2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -16,6 +16,7 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
 
@@ -25,4 +26,6 @@
     public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory);
 
     public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+    
+    public ICCContext getCCContext();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
new file mode 100644
index 0000000..266ebe9
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface ICCContext {
+    public Map<String, Set<String>> getIPAddressNodeMap();
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 362306a..3dcf620 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -93,7 +94,8 @@
      * @param plan
      *            - Job Plan
      */
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan);
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx);
 
     /**
      * Translate this connector descriptor to JSON.
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index e58ac99..9e8dbcb 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -73,7 +74,8 @@
      * @param plan
      *            - Job Plan
      */
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan);
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx);
 
     /**
      * Translates this operator descriptor to JSON.
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index a4aa482..e5efe92 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -23,6 +23,7 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
@@ -33,9 +34,11 @@
 
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
 import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.control.CCConfig;
 import edu.uci.ics.hyracks.api.control.IClusterController;
 import edu.uci.ics.hyracks.api.control.INodeController;
+import edu.uci.ics.hyracks.api.control.NCConfig;
 import edu.uci.ics.hyracks.api.control.NodeParameters;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -76,6 +79,8 @@
 
     private final Map<String, NodeControllerState> nodeRegistry;
 
+    private final Map<String, Set<String>> ipAddressNodeNameMap;
+
     private final Map<String, CCApplicationContext> applications;
 
     private final ServerContext serverCtx;
@@ -94,9 +99,12 @@
 
     private final Timer timer;
 
+    private final ICCContext ccContext;
+
     public ClusterControllerService(CCConfig ccConfig) throws Exception {
         this.ccConfig = ccConfig;
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+        ipAddressNodeNameMap = new HashMap<String, Set<String>>();
         applications = new Hashtable<String, CCApplicationContext>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
                 ClusterControllerService.class.getName()));
@@ -106,6 +114,12 @@
         jobQueue = new JobQueue();
         scheduler = new NaiveScheduler(this);
         this.timer = new Timer(true);
+        ccContext = new ICCContext() {
+            @Override
+            public Map<String, Set<String>> getIPAddressNodeMap() {
+                return ipAddressNodeNameMap;
+            }
+        };
     }
 
     @Override
@@ -153,6 +167,10 @@
         return nodeRegistry;
     }
 
+    public Map<String, Set<String>> getIPAddressNodeNameMap() {
+        return ipAddressNodeNameMap;
+    }
+
     public CCConfig getConfig() {
         return ccConfig;
     }
@@ -169,7 +187,8 @@
     @Override
     public NodeParameters registerNode(INodeController nodeController) throws Exception {
         String id = nodeController.getId();
-        NodeControllerState state = new NodeControllerState(nodeController);
+        NCConfig ncConfig = nodeController.getConfiguration();
+        NodeControllerState state = new NodeControllerState(nodeController, ncConfig);
         jobQueue.scheduleAndSync(new RegisterNodeEvent(this, id, state));
         nodeController.notifyRegistration(this);
         LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
@@ -242,7 +261,7 @@
             if (applications.containsKey(appName)) {
                 throw new HyracksException("Duplicate application with name: " + appName + " being created.");
             }
-            CCApplicationContext appCtx = new CCApplicationContext(serverCtx, appName);
+            CCApplicationContext appCtx = new CCApplicationContext(serverCtx, ccContext, appName);
             applications.put(appName, appCtx);
         }
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index e69281b..40eb9ae 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -5,16 +5,20 @@
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.control.INodeController;
+import edu.uci.ics.hyracks.api.control.NCConfig;
 
 public class NodeControllerState {
     private final INodeController nodeController;
 
+    private final NCConfig ncConfig;
+
     private final Set<UUID> activeJobIds;
 
     private int lastHeartbeatDuration;
 
-    public NodeControllerState(INodeController nodeController) {
+    public NodeControllerState(INodeController nodeController, NCConfig ncConfig) {
         this.nodeController = nodeController;
+        this.ncConfig = ncConfig;
         activeJobIds = new HashSet<UUID>();
     }
 
@@ -34,6 +38,10 @@
         return nodeController;
     }
 
+    public NCConfig getNCConfig() {
+        return ncConfig;
+    }
+
     public Set<UUID> getActiveJobIds() {
         return activeJobIds;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 5ca0269..96e29d3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -8,6 +8,7 @@
 
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
@@ -17,12 +18,15 @@
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 
 public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
+    private final ICCContext ccContext;
+
     private IJobSpecificationFactory jobSpecFactory;
 
     private List<IJobLifecycleListener> jobLifecycleListeners;
 
-    public CCApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+    public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext, String appName) throws IOException {
         super(serverCtx, appName);
+        this.ccContext = ccContext;
         jobSpecFactory = DeserializingJobSpecificationFactory.INSTANCE;
         jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
     }
@@ -33,6 +37,10 @@
         bootstrap.start();
     }
 
+    public ICCContext getCCContext() {
+        return ccContext;
+    }
+
     @Override
     public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory) {
         this.jobSpecFactory = jobSpecFactory;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
index 3b03d25..d7f244e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
@@ -60,7 +60,7 @@
             throw new HyracksException("No application with id " + appName + " found");
         }
         JobSpecification spec = appCtx.createJobSpecification(jobId, jobSpec);
-        JobRun run = plan(jobId, spec, jobFlags);
+        JobRun run = plan(jobId, spec, appCtx, jobFlags);
         run.setStatus(JobStatus.INITIALIZED);
 
         ccs.getRunMap().put(jobId, run);
@@ -71,7 +71,8 @@
         return jobId;
     }
 
-    private JobRun plan(UUID jobId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+    private JobRun plan(UUID jobId, JobSpecification jobSpec, final CCApplicationContext appCtx,
+            EnumSet<JobFlag> jobFlags) throws Exception {
         final JobPlanBuilder builder = new JobPlanBuilder();
         builder.init(appName, jobId, jobSpec, jobFlags);
         PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
@@ -92,13 +93,13 @@
         PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
             @Override
             public void visit(IOperatorDescriptor op) {
-                op.contributeSchedulingConstraints(acceptor, plan);
+                op.contributeSchedulingConstraints(acceptor, plan, appCtx);
             }
         });
         PlanUtils.visit(jobSpec, new IConnectorDescriptorVisitor() {
             @Override
             public void visit(IConnectorDescriptor conn) {
-                conn.contributeSchedulingConstraints(acceptor, plan);
+                conn.contributeSchedulingConstraints(acceptor, plan, appCtx);
             }
         });
         contributedConstraints.addAll(jobSpec.getUserConstraints());
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
index 0754dc1..3df302f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
@@ -14,7 +14,9 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -38,5 +40,13 @@
             throw new Exception("Node with this name already registered.");
         }
         nodeMap.put(nodeId, state);
+        Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
+        String ipAddress = state.getNCConfig().dataIPAddress;
+        Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
+        if (nodes == null) {
+            nodes = new HashSet<String>();
+            ipAddressNodeNameMap.put(ipAddress, nodes);
+        }
+        nodes.add(nodeId);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 38fb7ae..dca0dee 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -44,6 +44,7 @@
                 LOGGER.info(e.getKey() + " considered dead");
             }
         }
+        Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
         for (String deadNode : deadNodes) {
             NodeControllerState state = nodeMap.remove(deadNode);
             for (final UUID jid : state.getActiveJobIds()) {
@@ -52,6 +53,13 @@
                 LOGGER.info("Aborting: " + jid);
                 ccs.getJobQueue().schedule(new JobAbortEvent(ccs, jid, lastAttempt));
             }
+            String ipAddress = state.getNCConfig().dataIPAddress;
+            Set<String> ipNodes = ipAddressNodeNameMap.get(ipAddress);
+            if (ipNodes != null) {
+                if (ipNodes.remove(deadNode) && ipNodes.isEmpty()) {
+                    ipAddressNodeNameMap.remove(ipAddress);
+                }
+            }
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 0eebd93..e4613dd 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -50,7 +51,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx) {
         // do nothing
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 7e97975..4453412 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -68,7 +69,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx) {
         // do nothing
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index a038a40..6ee9ab37 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.connectors;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -52,7 +53,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx) {
         JobSpecification jobSpec = plan.getJobSpecification();
         IOperatorDescriptor consumer = jobSpec.getConsumer(this);
         IOperatorDescriptor producer = jobSpec.getProducer(this);